]> git.stg.codes - stg.git/commitdiff
Brand new connection handling.
authorMaxim Mamontov <faust.madf@gmail.com>
Sun, 21 Sep 2014 20:09:57 +0000 (23:09 +0300)
committerMaxim Mamontov <faust.madf@gmail.com>
Fri, 9 Jan 2015 21:24:18 +0000 (23:24 +0200)
Conflicts:
projects/stargazer/plugins/configuration/sgconfig/configproto.cpp
projects/stargazer/plugins/configuration/sgconfig/configproto.h

projects/stargazer/plugins/configuration/sgconfig/configproto.cpp
projects/stargazer/plugins/configuration/sgconfig/configproto.h
projects/stargazer/plugins/configuration/sgconfig/conn.cpp
projects/stargazer/plugins/configuration/sgconfig/conn.h

index 088bbbcc66915ec93c78f67d4d3e0c2716036ddd..d28ece173c4aba4ecd6d5d50b764d6c17528b068 100644 (file)
  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  */
 
-/*
- *    Date: 27.10.2002
- */
-
 /*
  *    Author : Boris Mikhailenko <stg34@stargazer.dp.ua>
- */
-
- /*
- $Revision: 1.22 $
- $Date: 2010/10/04 20:24:14 $
- $Author: faust $
+ *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
  */
 
 
-#include "parser.h"
-#include "parser_auth_by.h"
-#include "parser_user_info.h"
+#include "conn.h"
 
-#include "stg/users.h"
-#include "stg/admins.h"
-#include "stg/tariffs.h"
-#include "stg/logger.h"
 #include "stg/common.h"
+#include "stg/logger.h"
 
-#include <unistd.h>
+#include <algorithm>
+#include <functional>
+#include <csignal>
+#include <cstring>
+#include <cerrno>
+#include <cassert>
 
-#include "configproto.h"
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
 
-//-----------------------------------------------------------------------------
-void ParseXMLStart(void *data, const char *el, const char **attr)
+namespace
 {
-CONFIGPROTO * cp = static_cast<CONFIGPROTO *>(data);
 
-if (cp->currParser)
+struct IsFinished : public std::unary_function<STG::Conn *, bool>
+{
+    result_type operator()(const argument_type & arg)
     {
-    //cp->currParser->SetCurrAdmin(*cp->currAdmin);
-    cp->currParser->Start(data, el, attr);
+        return (arg->IsDone() && !arg->IsKeepAlive()) || !arg->IsOk();
     }
-else
+};
+
+struct RemoveConn : public std::unary_function<STG::Conn *, void>
+{
+    result_type operator()(const argument_type & arg)
     {
-    for (size_t i = 0; i < cp->dataParser.size(); i++)
-        {
-        //cp->dataParser[i]->SetCurrAdmin(*cp->currAdmin);
-        //cp->dataParser[i]->Reset();
-        if (cp->dataParser[i]->Start(data, el, attr) == 0)
-            {
-            cp->currParser = cp->dataParser[i];
-            break;
-            }
-        else
-            {
-            //cp->dataParser[i]->Reset();
-            }
-        }
+        delete arg;
     }
+};
+
+}
+
+CONFIGPROTO::CONFIGPROTO(PLUGIN_LOGGER & l)
+    : m_settings(NULL),
+      m_admins(NULL),
+      m_tariffs(NULL),
+      m_users(NULL),
+      m_port(0),
+      m_running(false),
+      m_stopped(true),
+      m_logger(l),
+      m_listenSocket(-1)
+{
+    std::for_each(m_conns.begin(), m_conns.end(), RemoveConn());
 }
-//-----------------------------------------------------------------------------
-void ParseXMLEnd(void *data, const char *el)
+
+int CONFIGPROTO::Prepare()
 {
-CONFIGPROTO * cp = static_cast<CONFIGPROTO *>(data);
-if (cp->currParser)
+    sigset_t sigmask, oldmask;
+    sigemptyset(&sigmask);
+    sigaddset(&sigmask, SIGINT);
+    sigaddset(&sigmask, SIGTERM);
+    sigaddset(&sigmask, SIGUSR1);
+    sigaddset(&sigmask, SIGHUP);
+    pthread_sigmask(SIG_BLOCK, &sigmask, &oldmask);
+
+    m_listenSocket = socket(PF_INET, SOCK_STREAM, 0);
+
+    if (m_listenSocket < 0)
     {
-    if (cp->currParser->End(data, el) == 0)
-        {
-        cp->dataAnswer = cp->currParser->GetAnswer();
-        cp->currParser = NULL;
-        }
+        m_errorStr = std::string("Cannot create listen socket: '") + strerror(errno) + "'.";
+        m_logger(m_errorStr);
+        return -1;
     }
-else
+
+    struct sockaddr_in listenAddr;
+    listenAddr.sin_family = PF_INET;
+    listenAddr.sin_port = htons(m_port);
+    listenAddr.sin_addr.s_addr = inet_addr("0.0.0.0"); // TODO: arbitrary address
+
+    int dummy = 1;
+
+    if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &dummy, 4) != 0)
     {
-    for (size_t i = 0; i < cp->dataParser.size(); i++)
-        {
-        if (cp->dataParser[i]->End(data, el) == 0)
-            {
-            cp->dataAnswer = cp->currParser->GetAnswer();
-            cp->currParser = NULL;
-            break;
-            }
-        }
+        m_errorStr = std::string("Failed to set SO_REUSEADDR to the listen socket: '") + strerror(errno) + "'.";
+        m_logger(m_errorStr);
+        return -1;
     }
-}
-//-----------------------------------------------------------------------------
-CONFIGPROTO::CONFIGPROTO(PLUGIN_LOGGER & l)
-    : answerList(),
-      requestList(),
-      adminIP(0),
-      adminLogin(),
-      adminPassword(),
-      port(0),
-      thrReciveSendConf(),
-      nonstop(true),
-      state(0),
-      currAdmin(NULL),
-      logger(l),
-      listenSocket(-1),
-      parserGetServInfo(),
-      parserGetUsers(),
-      parserGetUser(),
-      parserChgUser(),
-      parserAddUser(),
-      parserDelUser(),
-      parserCheckUser(),
-      parserSendMessage(),
-      parserAuthBy(),
-      parserGetAdmins(),
-      parserAddAdmin(),
-      parserDelAdmin(),
-      parserChgAdmin(),
-      parserGetTariffs(),
-      parserAddTariff(),
-      parserDelTariff(),
-      parserChgTariff(),
-      admins(NULL),
-      currParser(NULL),
-      dataParser(),
-      xmlParser(),
-      errorStr()
-{
-/*dataParser.push_back(new PARSER_GET_SERVER_INFO);
-
-dataParser.push_back(new PARSER_GET_USERS);
-dataParser.push_back(new PARSER_GET_USER);
-dataParser.push_back(new PARSER_CHG_USER);
-dataParser.push_back(new PARSER_ADD_USER);
-dataParser.push_back(new PARSER_DEL_USER);
-dataParser.push_back(new PARSER_CHECK_USER);
-dataParser.push_back(new PARSER_SEND_MESSAGE);
-dataParser.push_back(new PARSER_AUTH_BY);
-dataParser.push_back(new PARSER_USER_INFO);
-
-dataParser.push_back(new PARSER_GET_TARIFFS);
-dataParser.push_back(new PARSER_ADD_TARIFF);
-dataParser.push_back(new PARSER_DEL_TARIFF);
-dataParser.push_back(new PARSER_CHG_TARIFF);
-
-dataParser.push_back(new PARSER_GET_ADMINS);
-dataParser.push_back(new PARSER_CHG_ADMIN);
-dataParser.push_back(new PARSER_DEL_ADMIN);
-dataParser.push_back(new PARSER_ADD_ADMIN);*/
-
-xmlParser = XML_ParserCreate(NULL);
-
-if (!xmlParser)
+
+    if (bind(m_listenSocket, reinterpret_cast<sockaddr *>(&listenAddr), sizeof(listenAddr)) == -1)
     {
-    logger("Couldn't allocate memory for parser.");
-    exit(1);
+        m_errorStr = std::string("Cannot bind listen socket: '") + strerror(errno) + "'.";
+        m_logger(m_errorStr);
+        return -1;
     }
 
-}
-//-----------------------------------------------------------------------------
-CONFIGPROTO::~CONFIGPROTO()
-{
-for (size_t i = 0; i < dataParser.size(); ++i)
-    delete dataParser[i];
-XML_ParserFree(xmlParser);
-}
-//-----------------------------------------------------------------------------
-int CONFIGPROTO::ParseCommand()
-{
-std::list<std::string>::iterator n;
-int done = 0;
-char str[9];
+    if (listen(m_listenSocket, 64) == -1) // TODO: backlog length
+    {
+        m_errorStr = std::string("Failed to start listening for connections: '") + strerror(errno) + "'.";
+        m_logger(m_errorStr);
+        return -1;
+    }
 
-if (requestList.empty())
+    m_running = true;
+    m_stopped = false;
     return 0;
+}
 
-n = requestList.begin();
+int CONFIGPROTO::Stop()
+{
+    m_running = false;
+    for (int i = 0; i < 5 && !m_stopped; ++i)
+    {
+        struct timespec ts = {0, 200000000};
+        nanosleep(&ts, NULL);
+    }
 
-strncpy(str, (*n).c_str(), 8);
-str[8] = 0;
+    if (!m_stopped)
+    {
+        m_errorStr = "Cannot stop listenign thread.";
+        m_logger(m_errorStr);
+        return -1;
+    }
 
-XML_ParserReset(xmlParser, NULL);
-XML_SetElementHandler(xmlParser, ParseXMLStart, ParseXMLEnd);
-XML_SetUserData(xmlParser, this);
+    shutdown(m_listenSocket, SHUT_RDWR);
+    close(m_listenSocket);
+    return 0;
+}
 
-while(nonstop)
+void CONFIGPROTO::Run()
+{
+    while (m_running)
     {
-    strncpy(str, (*n).c_str(), 8);
-    str[8] = 0;
-    size_t len = strlen(str);
+        fd_set fds;
 
-    ++n;
-    if (n == requestList.end())
-        done = 1;
-    --n;
+        BuildFDSet(fds);
 
-    if (XML_Parse(xmlParser, (*n).c_str(), static_cast<int>(len), done) == XML_STATUS_ERROR)
-        {
-        logger("Invalid configuration request");
-        printfd(__FILE__, "Parse error at line %d:\n%s\n",
-           XML_GetCurrentLineNumber(xmlParser),
-           XML_ErrorString(XML_GetErrorCode(xmlParser)));
-        if (currParser)
-            {
-            printfd(__FILE__, "Parser reset\n");
-            //currParser->Reset();
-            currParser = NULL;
-            }
+        struct timeval tv;
+        tv.tv_sec = 0;
+        tv.tv_usec = 500000;
 
-        return -1;
+        int res = select(MaxFD() + 1, &fds, NULL, NULL, &tv);
+        if (res < 0)
+        {
+            m_errorStr = std::string("'select' is failed: '") + strerror(errno) + "'.";
+            m_logger(m_errorStr);
+            break;
         }
+        if (!m_running)
+            break;
+        if (res > 0)
+            HandleEvents(fds);
 
-    if (done)
-        return 0;
-
-    ++n;
+        CleanupConns();
     }
+    m_stopped = true;
+}
 
-return 0;
+int CONFIGPROTO::MaxFD() const
+{
+    int maxFD = m_listenSocket;
+    for (size_t i = 0; i < m_conns.size(); ++i)
+        if (maxFD < m_conns[i]->Sock())
+            maxFD = m_conns[i]->Sock();
+    return maxFD;
+}
+
+void CONFIGPROTO::BuildFDSet(fd_set & fds) const
+{
+    for (size_t i = 0; i < m_conns.size(); ++i)
+        FD_SET(m_conns[i]->Sock(), &fds);
+}
+
+void CONFIGPROTO::CleanupConns()
+{
+    std::vector<STG::Conn *>::iterator pos;
+    pos = std::remove_if(m_conns.begin(), m_conns.end(), IsFinished());
+    if (pos == m_conns.end())
+        return;
+    std::for_each(pos, m_conns.end(), RemoveConn());
+    m_conns.erase(pos, m_conns.end());
+}
+
+void CONFIGPROTO::HandleEvents(const fd_set & fds)
+{
+    if (FD_ISSET(m_listenSocket, &fds))
+        AcceptConnection();
+    else
+    {
+        for (size_t i = 0; i < m_conns.size(); ++i)
+            if (FD_ISSET(m_conns[i]->Sock(), &fds))
+                m_conns[i]->Read();
+    }
 }
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetPort(uint16_t p)
+
+void CONFIGPROTO::AcceptConnection()
 {
-port = p;
+    struct sockaddr_in outerAddr;
+    socklen_t outerAddrLen(sizeof(outerAddr));
+    int sock = accept(m_listenSocket, reinterpret_cast<sockaddr *>(&outerAddr), &outerAddrLen);
+
+    if (sock < 0)
+    {
+        m_errorStr = std::string("Failed to accept connection: '") + strerror(errno) + "'.";
+        printfd(__FILE__, "%s", m_errorStr.c_str());
+        m_logger(m_errorStr);
+        return;
+    }
+
+    assert(m_settings != NULL);
+    assert(m_admins != NULL);
+    assert(m_users != NULL);
+    assert(m_tariffs != NULL);
+
+    m_conns.push_back(new STG::Conn(*m_settings, *m_admins, *m_users, *m_tariffs, sock, outerAddr));
+
+    printfd(__FILE__, "New connection from %s:%d\n", inet_ntostring(m_conns.back()->IP()).c_str(), m_conns.back()->Port());
 }
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetAdmins(ADMINS * a)
+/*
+void CONFIGPROTO::WriteLogAccessFailed(uint32_t ip)
 {
-admins = a;
+    m_logger("Admin's connection failed. IP %s", inet_ntostring(ip).c_str());
 }
-//-----------------------------------------------------------------------------
+*/
index af924928a9b1490df86ae5c4072caffe02bd344c..b10cc938f6800ae15b840093614852f334347a2a 100644 (file)
 
 /*
  *    Author : Boris Mikhailenko <stg34@stargazer.dp.ua>
+ *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
  */
 
- /*
- $Revision: 1.14 $
- $Date: 2010/10/04 20:24:14 $
- $Author: faust $
- */
-
-
 #ifndef CONFIGPROTO_H
 #define CONFIGPROTO_H
 
-#include <string>
-#include <list>
-#include <vector>
-
 #include "stg/module_settings.h"
 #include "stg/os_int.h"
 
-#include <expat.h>
-#include <pthread.h>
+#include <string>
+#include <vector>
 
-#define  STG_HEADER     "SG04"
-#define  OK_HEADER      "OKHD"
-#define  ERR_HEADER     "ERHD"
-#define  OK_LOGIN       "OKLG"
-#define  ERR_LOGIN      "ERLG"
-#define  OK_LOGINS      "OKLS"
-#define  ERR_LOGINS     "ERLS"
+#include <sys/select.h>
+#include <sys/types.h>
+#include <unistd.h>
 
-class BASE_PARSER;
-class USERS;
+class SETTINGS;
 class ADMINS;
-class ADMIN;
 class TARIFFS;
+class USERS;
 class PLUGIN_LOGGER;
-class STORE;
-class SETTINGS;
 
-//-----------------------------------------------------------------------------
+namespace STG
+{
+
+class Conn;
+
+}
+
 class CONFIGPROTO {
 public:
     CONFIGPROTO(PLUGIN_LOGGER & l);
-    ~CONFIGPROTO();
 
-    void            SetPort(uint16_t port);
-    void            SetAdmins(ADMINS * a);
-    uint32_t        GetAdminIP() const { return adminIP; }
+    void            SetPort(uint16_t port) { m_port = port; }
+    void            SetSettings(const SETTINGS * settings) { m_settings = settings; }
+    void            SetAdmins(ADMINS * admins) { m_admins = admins; }
+    void            SetTariffs(TARIFFS * tariffs) { m_tariffs = tariffs; }
+    void            SetUsers(USERS * users) { m_users = users; }
+
     int             Prepare();
     int             Stop();
-    const std::string & GetStrError() const { return errorStr; }
+    const std::string & GetStrError() const { return m_errorStr; }
     void            Run();
 
 private:
     CONFIGPROTO(const CONFIGPROTO & rvalue);
     CONFIGPROTO & operator=(const CONFIGPROTO & rvalue);
 
-    int             RecvHdr(int sock);
-    int             RecvLogin(int sock);
-    int             SendLoginAnswer(int sock);
-    int             SendHdrAnswer(int sock, int err);
-    int             RecvLoginS(int sock);
-    int             SendLoginSAnswer(int sock, int err);
-    int             RecvData(int sock);
-    int             SendDataAnswer(int sock, const std::string & answer);
-    int             SendError(int sock, const std::string & text);
-    void            WriteLogAccessFailed(uint32_t ip);
-    const std::string & GetDataAnswer() const { return dataAnswer; }
-
-    int             ParseCommand();
-
-    std::list<std::string>      requestList;
-    uint32_t                    adminIP;
-    std::string                 adminLogin;
-    std::string                 adminPassword;
-    uint16_t                    port;
-    pthread_t                   thrReciveSendConf;
-    bool                        nonstop;
-    int                         state;
-    ADMIN *                     currAdmin;
-    PLUGIN_LOGGER &             logger;
-    std::string                 dataAnswer;
-
-    int                         listenSocket;
-
-    ADMINS *                    admins;
-
-    BASE_PARSER *               currParser;
-    std::vector<BASE_PARSER *>  dataParser;
-
-    XML_Parser                  xmlParser;
-
-    std::string                 errorStr;
-
-    friend void ParseXMLStart(void *data, const char *el, const char **attr);
-    friend void ParseXMLEnd(void *data, const char *el);
+    const SETTINGS * m_settings;
+    ADMINS *         m_admins;
+    TARIFFS *        m_tariffs;
+    USERS *          m_users;
+
+    uint16_t         m_port;
+    bool             m_running;
+    bool             m_stopped;
+    PLUGIN_LOGGER &  m_logger;
+    int              m_listenSocket;
+
+    std::string      m_errorStr;
+
+    std::vector<STG::Conn *> m_conns;
+
+    int MaxFD() const;
+    void BuildFDSet(fd_set & fds) const;
+    void CleanupConns();
+    void HandleEvents(const fd_set & fds);
+    void AcceptConnection();
+
+    //void WriteLogAccessFailed(uint32_t ip);
 };
-//-----------------------------------------------------------------------------
+
 #endif //CONFIGPROTO_H
index 718e7c565af929dfd9359d65ee56d263fc9e7b7b..ead809f0b9ac8b88793cb54099dee5ba496312a4 100644 (file)
@@ -128,6 +128,17 @@ bool Conn::Read()
     return HandleBuffer(res);
 }
 
+bool Conn::WriteAnswer(const void* buffer, size_t size)
+{
+    ssize_t res = write(m_sock, buffer, size);
+    if (res < 0)
+    {
+        // TODO: log it
+        return false;
+    }
+    return true;
+}
+
 BASE_PARSER * Conn::GetParser(const std::string & tag)
 {
     if (strcasecmp(tag.c_str(), "getserverinfo") == 0)
@@ -166,6 +177,7 @@ bool Conn::HandleHeader()
 {
     if (strncmp(m_header, STG_HEADER, sizeof(m_header)) != 0)
     {
+        WriteAnswer(ERR_HEADER, sizeof(ERR_HEADER));
         // TODO: log it
         m_state = ERROR;
         return false;
@@ -173,13 +185,14 @@ bool Conn::HandleHeader()
     m_state = LOGIN;
     m_buffer = m_login;
     m_bufferSize = sizeof(m_login);
-    return true;
+    return WriteAnswer(OK_HEADER, sizeof(OK_HEADER));
 }
 
 bool Conn::HandleLogin()
 {
     if (m_admins.Find(m_login, &m_admin)) // ADMINS::Find returns true on error.
     {
+        WriteAnswer(ERR_LOGIN, sizeof(ERR_LOGIN));
         // TODO: log it
         m_state = ERROR;
         return false;
@@ -188,7 +201,7 @@ bool Conn::HandleLogin()
     m_state = CRYPTO_LOGIN;
     m_buffer = m_cryptoLogin;
     m_bufferSize = sizeof(m_cryptoLogin);
-    return true;
+    return WriteAnswer(OK_LOGIN, sizeof(OK_LOGIN));
 }
 
 bool Conn::HandleCryptoLogin()
@@ -200,6 +213,7 @@ bool Conn::HandleCryptoLogin()
 
     if (strncmp(m_login, login, sizeof(login)) != 0)
     {
+        WriteAnswer(ERR_LOGINS, sizeof(ERR_LOGINS));
         // TODO: log it
         m_state = ERROR;
         return false;
@@ -209,7 +223,7 @@ bool Conn::HandleCryptoLogin()
     m_buffer = m_data;
     m_bufferSize = sizeof(m_data);
     m_stream = new STG::DECRYPT_STREAM(m_admin->GetPassword(), DataCallback, &m_dataState);
-    return true;
+    return WriteAnswer(OK_LOGINS, sizeof(OK_LOGINS));
 }
 
 bool Conn::HandleData(size_t size)
@@ -235,6 +249,17 @@ bool Conn::DataCallback(const void * block, size_t size, void * data)
         return false;
     }
 
+    if (state.final)
+    {
+        if (!state.conn.WriteResponse())
+        {
+            // TODO: log it
+            state.conn.m_state = ERROR;
+            return false;
+        }
+        state.conn.m_state = DONE;
+    }
+
     return true;
 }
 
@@ -270,3 +295,18 @@ void Conn::ParseXMLEnd(void * data, const char * el)
 
     conn.m_parser->End(data, el);
 }
+
+bool Conn::WriteResponse()
+{
+    STG::ENCRYPT_STREAM stream(m_admin->GetPassword(), WriteCallback, this);
+    const std::string & answer = m_parser->GetAnswer();
+    stream.Put(answer.c_str(), answer.length() + 1 /* including \0 */, true /* final */);
+    return stream.IsOk();
+}
+
+bool Conn::WriteCallback(const void * block, size_t size, void * data)
+{
+    assert(data != NULL);
+    Conn & conn = *static_cast<Conn *>(data);
+    return WriteAll(conn.m_sock, block, size);;
+}
index 396e9753602afc04e0098b27425214a004cde6e9..902cd39461cbafea74a2c4cc398bc298933adf06 100644 (file)
@@ -61,11 +61,12 @@ class Conn
 
         int Sock() const { return m_sock; }
         uint32_t IP() const { return *(uint32_t *)(&m_addr.sin_addr); }
-        uint16_t Port() const { return m_addr.sin_port; }
+        uint16_t Port() const { return ntohs(m_addr.sin_port); }
 
         bool Read();
 
         bool IsOk() const { return m_state != ERROR; }
+        bool IsDone() const { return m_state == DONE; }
         bool IsKeepAlive() const { return m_keepAlive; }
 
     private:
@@ -93,7 +94,7 @@ class Conn
 
         XML_Parser m_xmlParser;
 
-        enum { HEADER, LOGIN, CRYPTO_LOGIN, DATA, ERROR } m_state;
+        enum { HEADER, LOGIN, CRYPTO_LOGIN, DATA, DONE, ERROR } m_state;
 
         void * m_buffer;
         size_t m_bufferSize;
@@ -112,6 +113,9 @@ class Conn
         bool HandleCryptoLogin();
         bool HandleData(size_t size);
 
+        bool WriteAnswer(const void* buffer, size_t size);
+        bool WriteResponse();
+
         struct DataState
         {
             DataState(bool f, Conn & c) : final(f), conn(c) {}
@@ -122,6 +126,7 @@ class Conn
         static bool DataCallback(const void * block, size_t size, void * data);
         static void ParseXMLStart(void * data, const char * el, const char ** attr);
         static void ParseXMLEnd(void * data, const char * el);
+        static bool WriteCallback(const void * block, size_t size, void * data);
 };
 
 }