From d903c02d2bba584ff806ae9d00973fa4418f77b8 Mon Sep 17 00:00:00 2001 From: Maxim Mamontov Date: Sun, 21 Sep 2014 23:09:57 +0300 Subject: [PATCH] Brand new connection handling. Conflicts: projects/stargazer/plugins/configuration/sgconfig/configproto.cpp projects/stargazer/plugins/configuration/sgconfig/configproto.h --- .../configuration/sgconfig/configproto.cpp | 360 +++++++++--------- .../configuration/sgconfig/configproto.h | 116 +++--- .../plugins/configuration/sgconfig/conn.cpp | 46 ++- .../plugins/configuration/sgconfig/conn.h | 9 +- 4 files changed, 277 insertions(+), 254 deletions(-) diff --git a/projects/stargazer/plugins/configuration/sgconfig/configproto.cpp b/projects/stargazer/plugins/configuration/sgconfig/configproto.cpp index 088bbbcc..d28ece17 100644 --- a/projects/stargazer/plugins/configuration/sgconfig/configproto.cpp +++ b/projects/stargazer/plugins/configuration/sgconfig/configproto.cpp @@ -14,225 +14,229 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ -/* - * Date: 27.10.2002 - */ - /* * Author : Boris Mikhailenko - */ - - /* - $Revision: 1.22 $ - $Date: 2010/10/04 20:24:14 $ - $Author: faust $ + * Author : Maxim Mamontov */ -#include "parser.h" -#include "parser_auth_by.h" -#include "parser_user_info.h" +#include "conn.h" -#include "stg/users.h" -#include "stg/admins.h" -#include "stg/tariffs.h" -#include "stg/logger.h" #include "stg/common.h" +#include "stg/logger.h" -#include +#include +#include +#include +#include +#include +#include -#include "configproto.h" +#include +#include +#include +#include -//----------------------------------------------------------------------------- -void ParseXMLStart(void *data, const char *el, const char **attr) +namespace { -CONFIGPROTO * cp = static_cast(data); -if (cp->currParser) +struct IsFinished : public std::unary_function +{ + result_type operator()(const argument_type & arg) { - //cp->currParser->SetCurrAdmin(*cp->currAdmin); - cp->currParser->Start(data, el, attr); + return (arg->IsDone() && !arg->IsKeepAlive()) || !arg->IsOk(); } -else +}; + +struct RemoveConn : public std::unary_function +{ + result_type operator()(const argument_type & arg) { - for (size_t i = 0; i < cp->dataParser.size(); i++) - { - //cp->dataParser[i]->SetCurrAdmin(*cp->currAdmin); - //cp->dataParser[i]->Reset(); - if (cp->dataParser[i]->Start(data, el, attr) == 0) - { - cp->currParser = cp->dataParser[i]; - break; - } - else - { - //cp->dataParser[i]->Reset(); - } - } + delete arg; } +}; + +} + +CONFIGPROTO::CONFIGPROTO(PLUGIN_LOGGER & l) + : m_settings(NULL), + m_admins(NULL), + m_tariffs(NULL), + m_users(NULL), + m_port(0), + m_running(false), + m_stopped(true), + m_logger(l), + m_listenSocket(-1) +{ + std::for_each(m_conns.begin(), m_conns.end(), RemoveConn()); } -//----------------------------------------------------------------------------- -void ParseXMLEnd(void *data, const char *el) + +int CONFIGPROTO::Prepare() { -CONFIGPROTO * cp = static_cast(data); -if (cp->currParser) + sigset_t sigmask, oldmask; + sigemptyset(&sigmask); + sigaddset(&sigmask, SIGINT); + sigaddset(&sigmask, SIGTERM); + sigaddset(&sigmask, SIGUSR1); + sigaddset(&sigmask, SIGHUP); + pthread_sigmask(SIG_BLOCK, &sigmask, &oldmask); + + m_listenSocket = socket(PF_INET, SOCK_STREAM, 0); + + if (m_listenSocket < 0) { - if (cp->currParser->End(data, el) == 0) - { - cp->dataAnswer = cp->currParser->GetAnswer(); - cp->currParser = NULL; - } + m_errorStr = std::string("Cannot create listen socket: '") + strerror(errno) + "'."; + m_logger(m_errorStr); + return -1; } -else + + struct sockaddr_in listenAddr; + listenAddr.sin_family = PF_INET; + listenAddr.sin_port = htons(m_port); + listenAddr.sin_addr.s_addr = inet_addr("0.0.0.0"); // TODO: arbitrary address + + int dummy = 1; + + if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &dummy, 4) != 0) { - for (size_t i = 0; i < cp->dataParser.size(); i++) - { - if (cp->dataParser[i]->End(data, el) == 0) - { - cp->dataAnswer = cp->currParser->GetAnswer(); - cp->currParser = NULL; - break; - } - } + m_errorStr = std::string("Failed to set SO_REUSEADDR to the listen socket: '") + strerror(errno) + "'."; + m_logger(m_errorStr); + return -1; } -} -//----------------------------------------------------------------------------- -CONFIGPROTO::CONFIGPROTO(PLUGIN_LOGGER & l) - : answerList(), - requestList(), - adminIP(0), - adminLogin(), - adminPassword(), - port(0), - thrReciveSendConf(), - nonstop(true), - state(0), - currAdmin(NULL), - logger(l), - listenSocket(-1), - parserGetServInfo(), - parserGetUsers(), - parserGetUser(), - parserChgUser(), - parserAddUser(), - parserDelUser(), - parserCheckUser(), - parserSendMessage(), - parserAuthBy(), - parserGetAdmins(), - parserAddAdmin(), - parserDelAdmin(), - parserChgAdmin(), - parserGetTariffs(), - parserAddTariff(), - parserDelTariff(), - parserChgTariff(), - admins(NULL), - currParser(NULL), - dataParser(), - xmlParser(), - errorStr() -{ -/*dataParser.push_back(new PARSER_GET_SERVER_INFO); - -dataParser.push_back(new PARSER_GET_USERS); -dataParser.push_back(new PARSER_GET_USER); -dataParser.push_back(new PARSER_CHG_USER); -dataParser.push_back(new PARSER_ADD_USER); -dataParser.push_back(new PARSER_DEL_USER); -dataParser.push_back(new PARSER_CHECK_USER); -dataParser.push_back(new PARSER_SEND_MESSAGE); -dataParser.push_back(new PARSER_AUTH_BY); -dataParser.push_back(new PARSER_USER_INFO); - -dataParser.push_back(new PARSER_GET_TARIFFS); -dataParser.push_back(new PARSER_ADD_TARIFF); -dataParser.push_back(new PARSER_DEL_TARIFF); -dataParser.push_back(new PARSER_CHG_TARIFF); - -dataParser.push_back(new PARSER_GET_ADMINS); -dataParser.push_back(new PARSER_CHG_ADMIN); -dataParser.push_back(new PARSER_DEL_ADMIN); -dataParser.push_back(new PARSER_ADD_ADMIN);*/ - -xmlParser = XML_ParserCreate(NULL); - -if (!xmlParser) + + if (bind(m_listenSocket, reinterpret_cast(&listenAddr), sizeof(listenAddr)) == -1) { - logger("Couldn't allocate memory for parser."); - exit(1); + m_errorStr = std::string("Cannot bind listen socket: '") + strerror(errno) + "'."; + m_logger(m_errorStr); + return -1; } -} -//----------------------------------------------------------------------------- -CONFIGPROTO::~CONFIGPROTO() -{ -for (size_t i = 0; i < dataParser.size(); ++i) - delete dataParser[i]; -XML_ParserFree(xmlParser); -} -//----------------------------------------------------------------------------- -int CONFIGPROTO::ParseCommand() -{ -std::list::iterator n; -int done = 0; -char str[9]; + if (listen(m_listenSocket, 64) == -1) // TODO: backlog length + { + m_errorStr = std::string("Failed to start listening for connections: '") + strerror(errno) + "'."; + m_logger(m_errorStr); + return -1; + } -if (requestList.empty()) + m_running = true; + m_stopped = false; return 0; +} -n = requestList.begin(); +int CONFIGPROTO::Stop() +{ + m_running = false; + for (int i = 0; i < 5 && !m_stopped; ++i) + { + struct timespec ts = {0, 200000000}; + nanosleep(&ts, NULL); + } -strncpy(str, (*n).c_str(), 8); -str[8] = 0; + if (!m_stopped) + { + m_errorStr = "Cannot stop listenign thread."; + m_logger(m_errorStr); + return -1; + } -XML_ParserReset(xmlParser, NULL); -XML_SetElementHandler(xmlParser, ParseXMLStart, ParseXMLEnd); -XML_SetUserData(xmlParser, this); + shutdown(m_listenSocket, SHUT_RDWR); + close(m_listenSocket); + return 0; +} -while(nonstop) +void CONFIGPROTO::Run() +{ + while (m_running) { - strncpy(str, (*n).c_str(), 8); - str[8] = 0; - size_t len = strlen(str); + fd_set fds; - ++n; - if (n == requestList.end()) - done = 1; - --n; + BuildFDSet(fds); - if (XML_Parse(xmlParser, (*n).c_str(), static_cast(len), done) == XML_STATUS_ERROR) - { - logger("Invalid configuration request"); - printfd(__FILE__, "Parse error at line %d:\n%s\n", - XML_GetCurrentLineNumber(xmlParser), - XML_ErrorString(XML_GetErrorCode(xmlParser))); - if (currParser) - { - printfd(__FILE__, "Parser reset\n"); - //currParser->Reset(); - currParser = NULL; - } + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 500000; - return -1; + int res = select(MaxFD() + 1, &fds, NULL, NULL, &tv); + if (res < 0) + { + m_errorStr = std::string("'select' is failed: '") + strerror(errno) + "'."; + m_logger(m_errorStr); + break; } + if (!m_running) + break; + if (res > 0) + HandleEvents(fds); - if (done) - return 0; - - ++n; + CleanupConns(); } + m_stopped = true; +} -return 0; +int CONFIGPROTO::MaxFD() const +{ + int maxFD = m_listenSocket; + for (size_t i = 0; i < m_conns.size(); ++i) + if (maxFD < m_conns[i]->Sock()) + maxFD = m_conns[i]->Sock(); + return maxFD; +} + +void CONFIGPROTO::BuildFDSet(fd_set & fds) const +{ + for (size_t i = 0; i < m_conns.size(); ++i) + FD_SET(m_conns[i]->Sock(), &fds); +} + +void CONFIGPROTO::CleanupConns() +{ + std::vector::iterator pos; + pos = std::remove_if(m_conns.begin(), m_conns.end(), IsFinished()); + if (pos == m_conns.end()) + return; + std::for_each(pos, m_conns.end(), RemoveConn()); + m_conns.erase(pos, m_conns.end()); +} + +void CONFIGPROTO::HandleEvents(const fd_set & fds) +{ + if (FD_ISSET(m_listenSocket, &fds)) + AcceptConnection(); + else + { + for (size_t i = 0; i < m_conns.size(); ++i) + if (FD_ISSET(m_conns[i]->Sock(), &fds)) + m_conns[i]->Read(); + } } -//----------------------------------------------------------------------------- -void CONFIGPROTO::SetPort(uint16_t p) + +void CONFIGPROTO::AcceptConnection() { -port = p; + struct sockaddr_in outerAddr; + socklen_t outerAddrLen(sizeof(outerAddr)); + int sock = accept(m_listenSocket, reinterpret_cast(&outerAddr), &outerAddrLen); + + if (sock < 0) + { + m_errorStr = std::string("Failed to accept connection: '") + strerror(errno) + "'."; + printfd(__FILE__, "%s", m_errorStr.c_str()); + m_logger(m_errorStr); + return; + } + + assert(m_settings != NULL); + assert(m_admins != NULL); + assert(m_users != NULL); + assert(m_tariffs != NULL); + + m_conns.push_back(new STG::Conn(*m_settings, *m_admins, *m_users, *m_tariffs, sock, outerAddr)); + + printfd(__FILE__, "New connection from %s:%d\n", inet_ntostring(m_conns.back()->IP()).c_str(), m_conns.back()->Port()); } -//----------------------------------------------------------------------------- -void CONFIGPROTO::SetAdmins(ADMINS * a) +/* +void CONFIGPROTO::WriteLogAccessFailed(uint32_t ip) { -admins = a; + m_logger("Admin's connection failed. IP %s", inet_ntostring(ip).c_str()); } -//----------------------------------------------------------------------------- +*/ diff --git a/projects/stargazer/plugins/configuration/sgconfig/configproto.h b/projects/stargazer/plugins/configuration/sgconfig/configproto.h index af924928..b10cc938 100644 --- a/projects/stargazer/plugins/configuration/sgconfig/configproto.h +++ b/projects/stargazer/plugins/configuration/sgconfig/configproto.h @@ -16,102 +16,76 @@ /* * Author : Boris Mikhailenko + * Author : Maxim Mamontov */ - /* - $Revision: 1.14 $ - $Date: 2010/10/04 20:24:14 $ - $Author: faust $ - */ - - #ifndef CONFIGPROTO_H #define CONFIGPROTO_H -#include -#include -#include - #include "stg/module_settings.h" #include "stg/os_int.h" -#include -#include +#include +#include -#define STG_HEADER "SG04" -#define OK_HEADER "OKHD" -#define ERR_HEADER "ERHD" -#define OK_LOGIN "OKLG" -#define ERR_LOGIN "ERLG" -#define OK_LOGINS "OKLS" -#define ERR_LOGINS "ERLS" +#include +#include +#include -class BASE_PARSER; -class USERS; +class SETTINGS; class ADMINS; -class ADMIN; class TARIFFS; +class USERS; class PLUGIN_LOGGER; -class STORE; -class SETTINGS; -//----------------------------------------------------------------------------- +namespace STG +{ + +class Conn; + +} + class CONFIGPROTO { public: CONFIGPROTO(PLUGIN_LOGGER & l); - ~CONFIGPROTO(); - void SetPort(uint16_t port); - void SetAdmins(ADMINS * a); - uint32_t GetAdminIP() const { return adminIP; } + void SetPort(uint16_t port) { m_port = port; } + void SetSettings(const SETTINGS * settings) { m_settings = settings; } + void SetAdmins(ADMINS * admins) { m_admins = admins; } + void SetTariffs(TARIFFS * tariffs) { m_tariffs = tariffs; } + void SetUsers(USERS * users) { m_users = users; } + int Prepare(); int Stop(); - const std::string & GetStrError() const { return errorStr; } + const std::string & GetStrError() const { return m_errorStr; } void Run(); private: CONFIGPROTO(const CONFIGPROTO & rvalue); CONFIGPROTO & operator=(const CONFIGPROTO & rvalue); - int RecvHdr(int sock); - int RecvLogin(int sock); - int SendLoginAnswer(int sock); - int SendHdrAnswer(int sock, int err); - int RecvLoginS(int sock); - int SendLoginSAnswer(int sock, int err); - int RecvData(int sock); - int SendDataAnswer(int sock, const std::string & answer); - int SendError(int sock, const std::string & text); - void WriteLogAccessFailed(uint32_t ip); - const std::string & GetDataAnswer() const { return dataAnswer; } - - int ParseCommand(); - - std::list requestList; - uint32_t adminIP; - std::string adminLogin; - std::string adminPassword; - uint16_t port; - pthread_t thrReciveSendConf; - bool nonstop; - int state; - ADMIN * currAdmin; - PLUGIN_LOGGER & logger; - std::string dataAnswer; - - int listenSocket; - - ADMINS * admins; - - BASE_PARSER * currParser; - std::vector dataParser; - - XML_Parser xmlParser; - - std::string errorStr; - - friend void ParseXMLStart(void *data, const char *el, const char **attr); - friend void ParseXMLEnd(void *data, const char *el); + const SETTINGS * m_settings; + ADMINS * m_admins; + TARIFFS * m_tariffs; + USERS * m_users; + + uint16_t m_port; + bool m_running; + bool m_stopped; + PLUGIN_LOGGER & m_logger; + int m_listenSocket; + + std::string m_errorStr; + + std::vector m_conns; + + int MaxFD() const; + void BuildFDSet(fd_set & fds) const; + void CleanupConns(); + void HandleEvents(const fd_set & fds); + void AcceptConnection(); + + //void WriteLogAccessFailed(uint32_t ip); }; -//----------------------------------------------------------------------------- + #endif //CONFIGPROTO_H diff --git a/projects/stargazer/plugins/configuration/sgconfig/conn.cpp b/projects/stargazer/plugins/configuration/sgconfig/conn.cpp index 718e7c56..ead809f0 100644 --- a/projects/stargazer/plugins/configuration/sgconfig/conn.cpp +++ b/projects/stargazer/plugins/configuration/sgconfig/conn.cpp @@ -128,6 +128,17 @@ bool Conn::Read() return HandleBuffer(res); } +bool Conn::WriteAnswer(const void* buffer, size_t size) +{ + ssize_t res = write(m_sock, buffer, size); + if (res < 0) + { + // TODO: log it + return false; + } + return true; +} + BASE_PARSER * Conn::GetParser(const std::string & tag) { if (strcasecmp(tag.c_str(), "getserverinfo") == 0) @@ -166,6 +177,7 @@ bool Conn::HandleHeader() { if (strncmp(m_header, STG_HEADER, sizeof(m_header)) != 0) { + WriteAnswer(ERR_HEADER, sizeof(ERR_HEADER)); // TODO: log it m_state = ERROR; return false; @@ -173,13 +185,14 @@ bool Conn::HandleHeader() m_state = LOGIN; m_buffer = m_login; m_bufferSize = sizeof(m_login); - return true; + return WriteAnswer(OK_HEADER, sizeof(OK_HEADER)); } bool Conn::HandleLogin() { if (m_admins.Find(m_login, &m_admin)) // ADMINS::Find returns true on error. { + WriteAnswer(ERR_LOGIN, sizeof(ERR_LOGIN)); // TODO: log it m_state = ERROR; return false; @@ -188,7 +201,7 @@ bool Conn::HandleLogin() m_state = CRYPTO_LOGIN; m_buffer = m_cryptoLogin; m_bufferSize = sizeof(m_cryptoLogin); - return true; + return WriteAnswer(OK_LOGIN, sizeof(OK_LOGIN)); } bool Conn::HandleCryptoLogin() @@ -200,6 +213,7 @@ bool Conn::HandleCryptoLogin() if (strncmp(m_login, login, sizeof(login)) != 0) { + WriteAnswer(ERR_LOGINS, sizeof(ERR_LOGINS)); // TODO: log it m_state = ERROR; return false; @@ -209,7 +223,7 @@ bool Conn::HandleCryptoLogin() m_buffer = m_data; m_bufferSize = sizeof(m_data); m_stream = new STG::DECRYPT_STREAM(m_admin->GetPassword(), DataCallback, &m_dataState); - return true; + return WriteAnswer(OK_LOGINS, sizeof(OK_LOGINS)); } bool Conn::HandleData(size_t size) @@ -235,6 +249,17 @@ bool Conn::DataCallback(const void * block, size_t size, void * data) return false; } + if (state.final) + { + if (!state.conn.WriteResponse()) + { + // TODO: log it + state.conn.m_state = ERROR; + return false; + } + state.conn.m_state = DONE; + } + return true; } @@ -270,3 +295,18 @@ void Conn::ParseXMLEnd(void * data, const char * el) conn.m_parser->End(data, el); } + +bool Conn::WriteResponse() +{ + STG::ENCRYPT_STREAM stream(m_admin->GetPassword(), WriteCallback, this); + const std::string & answer = m_parser->GetAnswer(); + stream.Put(answer.c_str(), answer.length() + 1 /* including \0 */, true /* final */); + return stream.IsOk(); +} + +bool Conn::WriteCallback(const void * block, size_t size, void * data) +{ + assert(data != NULL); + Conn & conn = *static_cast(data); + return WriteAll(conn.m_sock, block, size);; +} diff --git a/projects/stargazer/plugins/configuration/sgconfig/conn.h b/projects/stargazer/plugins/configuration/sgconfig/conn.h index 396e9753..902cd394 100644 --- a/projects/stargazer/plugins/configuration/sgconfig/conn.h +++ b/projects/stargazer/plugins/configuration/sgconfig/conn.h @@ -61,11 +61,12 @@ class Conn int Sock() const { return m_sock; } uint32_t IP() const { return *(uint32_t *)(&m_addr.sin_addr); } - uint16_t Port() const { return m_addr.sin_port; } + uint16_t Port() const { return ntohs(m_addr.sin_port); } bool Read(); bool IsOk() const { return m_state != ERROR; } + bool IsDone() const { return m_state == DONE; } bool IsKeepAlive() const { return m_keepAlive; } private: @@ -93,7 +94,7 @@ class Conn XML_Parser m_xmlParser; - enum { HEADER, LOGIN, CRYPTO_LOGIN, DATA, ERROR } m_state; + enum { HEADER, LOGIN, CRYPTO_LOGIN, DATA, DONE, ERROR } m_state; void * m_buffer; size_t m_bufferSize; @@ -112,6 +113,9 @@ class Conn bool HandleCryptoLogin(); bool HandleData(size_t size); + bool WriteAnswer(const void* buffer, size_t size); + bool WriteResponse(); + struct DataState { DataState(bool f, Conn & c) : final(f), conn(c) {} @@ -122,6 +126,7 @@ class Conn static bool DataCallback(const void * block, size_t size, void * data); static void ParseXMLStart(void * data, const char * el, const char ** attr); static void ParseXMLEnd(void * data, const char * el); + static bool WriteCallback(const void * block, size_t size, void * data); }; } -- 2.43.2