]> git.stg.codes - stg.git/blobdiff - projects/stargazer/plugins/configuration/sgconfig/configproto.cpp
Ticket 37. ALTER TABLE tariffs query added for change_policy_timeout
[stg.git] / projects / stargazer / plugins / configuration / sgconfig / configproto.cpp
index dee6dbb914db53a44a492846dda03d36289e25fd..567c537ae7f6a5aaef04ef641ff1ff77da829db6 100644 (file)
  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  */
 
-/*
- *    Date: 27.10.2002
- */
-
 /*
  *    Author : Boris Mikhailenko <stg34@stargazer.dp.ua>
+ *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
  */
 
- /*
- $Revision: 1.22 $
- $Date: 2010/10/04 20:24:14 $
- $Author: faust $
- */
+#include "configproto.h"
 
+#include "conn.h"
 
-#include <unistd.h>
+#include "parser_server_info.h"
+#include "parser_admins.h"
+#include "parser_tariffs.h"
+#include "parser_users.h"
+#include "parser_message.h"
+#include "parser_auth_by.h"
 
-#include "configproto.h"
+#include "stg/common.h"
+#include "stg/logger.h"
 
-//-----------------------------------------------------------------------------
-void ParseXMLStart(void *data, const char *el, const char **attr)
+#include <algorithm>
+#include <functional>
+#include <vector>
+#include <csignal>
+#include <cstring>
+#include <cerrno>
+#include <cassert>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+
+namespace SP = STG::PARSER;
+
+CONFIGPROTO::CONFIGPROTO(PLUGIN_LOGGER & l)
+    : m_settings(NULL),
+      m_admins(NULL),
+      m_tariffs(NULL),
+      m_users(NULL),
+      m_store(NULL),
+      m_port(0),
+      m_bindAddress("0.0.0.0"),
+      m_running(false),
+      m_stopped(true),
+      m_logger(l),
+      m_listenSocket(-1)
 {
-CONFIGPROTO * cp = static_cast<CONFIGPROTO *>(data);
+}
 
-if (cp->currParser)
+CONFIGPROTO::~CONFIGPROTO()
+{
+    std::deque<STG::Conn *>::iterator it;
+    for (it = m_conns.begin(); it != m_conns.end(); ++it)
+        delete *it;
+}
+
+int CONFIGPROTO::Prepare()
+{
+    sigset_t sigmask, oldmask;
+    sigemptyset(&sigmask);
+    sigaddset(&sigmask, SIGINT);
+    sigaddset(&sigmask, SIGTERM);
+    sigaddset(&sigmask, SIGUSR1);
+    sigaddset(&sigmask, SIGHUP);
+    pthread_sigmask(SIG_BLOCK, &sigmask, &oldmask);
+
+    m_listenSocket = socket(PF_INET, SOCK_STREAM, 0);
+
+    if (m_listenSocket < 0)
     {
-    cp->currParser->SetAnswerList(&cp->answerList);
-    cp->currParser->SetCurrAdmin(*cp->currAdmin);
-    cp->currParser->ParseStart(data, el, attr);
+        m_errorStr = std::string("Cannot create listen socket: '") + strerror(errno) + "'.";
+        m_logger(m_errorStr);
+        return -1;
     }
-else
+
+    int dummy = 1;
+
+    if (setsockopt(m_listenSocket, SOL_SOCKET, SO_REUSEADDR, &dummy, 4) != 0)
     {
-    for (unsigned int i = 0; i < cp->dataParser.size(); i++)
-        {
-        cp->dataParser[i]->SetAnswerList(&cp->answerList);
-        cp->dataParser[i]->SetCurrAdmin(*cp->currAdmin);
-        cp->dataParser[i]->Reset();
-        if (cp->dataParser[i]->ParseStart(data, el, attr) == 0)
-            {
-            cp->currParser = cp->dataParser[i];
-            break;
-            }
-        else
-            {
-            cp->dataParser[i]->Reset();
-            }
-        }
+        m_errorStr = std::string("Failed to set SO_REUSEADDR to the listen socket: '") + strerror(errno) + "'.";
+        m_logger(m_errorStr);
+        return -1;
+    }
+
+    if (!Bind())
+        return -1;
+
+    if (listen(m_listenSocket, 64) == -1) // TODO: backlog length
+    {
+        m_errorStr = std::string("Failed to start listening for connections: '") + strerror(errno) + "'.";
+        m_logger(m_errorStr);
+        return -1;
     }
+
+    RegisterParsers();
+
+    m_running = true;
+    m_stopped = false;
+    return 0;
 }
-//-----------------------------------------------------------------------------
-void ParseXMLEnd(void *data, const char *el)
+
+int CONFIGPROTO::Stop()
 {
-CONFIGPROTO * cp = static_cast<CONFIGPROTO *>(data);
-if (cp->currParser)
+    m_running = false;
+    for (int i = 0; i < 5 && !m_stopped; ++i)
     {
-    if (cp->currParser->ParseEnd(data, el) == 0)
-        {
-        cp->currParser = NULL;
-        }
+        struct timespec ts = {0, 200000000};
+        nanosleep(&ts, NULL);
     }
-else
+
+    if (!m_stopped)
     {
-    for (unsigned int i = 0; i < cp->dataParser.size(); i++)
-        {
-        if (cp->dataParser[i]->ParseEnd(data, el) == 0)
-            {
-            break;
-            }
-        }
+        m_errorStr = "Cannot stop listenign thread.";
+        m_logger(m_errorStr);
+        return -1;
     }
+
+    shutdown(m_listenSocket, SHUT_RDWR);
+    close(m_listenSocket);
+    return 0;
 }
-//-----------------------------------------------------------------------------
-CONFIGPROTO::CONFIGPROTO()
-    : adminIP(0),
-      port(0),
-      nonstop(1),
-      state(0),
-      currAdmin(),
-      WriteServLog(GetStgLogger()),
-      listenSocket(0),
-      admins(NULL),
-      currParser(NULL)
+
+void CONFIGPROTO::Run()
 {
-dataParser.push_back(&parserGetServInfo);
+    while (m_running)
+    {
+        fd_set fds;
 
-dataParser.push_back(&parserGetUsers);
-dataParser.push_back(&parserGetUser);
-dataParser.push_back(&parserChgUser);
-dataParser.push_back(&parserAddUser);
-dataParser.push_back(&parserDelUser);
-dataParser.push_back(&parserCheckUser);
-dataParser.push_back(&parserSendMessage);
+        BuildFDSet(fds);
 
-dataParser.push_back(&parserGetTariffs);
-dataParser.push_back(&parserAddTariff);
-dataParser.push_back(&parserDelTariff);
-dataParser.push_back(&parserChgTariff);
+        struct timeval tv;
+        tv.tv_sec = 0;
+        tv.tv_usec = 500000;
 
-dataParser.push_back(&parserGetAdmins);
-dataParser.push_back(&parserChgAdmin);
-dataParser.push_back(&parserDelAdmin);
-dataParser.push_back(&parserAddAdmin);
+        int res = select(MaxFD() + 1, &fds, NULL, NULL, &tv);
+        if (res < 0)
+        {
+            m_errorStr = std::string("'select' is failed: '") + strerror(errno) + "'.";
+            printfd(__FILE__, "%s\n", m_errorStr.c_str());
+            m_logger(m_errorStr);
+            break;
+        }
+        if (!m_running)
+            break;
+        if (res > 0)
+            HandleEvents(fds);
 
-xmlParser = XML_ParserCreate(NULL);
+        CleanupConns();
+    }
+    m_stopped = true;
+}
 
-if (!xmlParser)
+bool CONFIGPROTO::Bind()
+{
+    const hostent * he = gethostbyname(m_bindAddress.c_str());
+    if (he == NULL)
     {
-    WriteServLog("Couldn't allocate memory for parser.");
-    exit(1);
+        m_errorStr = "Failed to resolve name '" + m_bindAddress + "': '" + hstrerror(h_errno) + "'.";
+        printfd(__FILE__, "%s\n", m_errorStr.c_str());
+        m_logger(m_errorStr);
+        return false;
     }
 
-}
-//-----------------------------------------------------------------------------
-CONFIGPROTO::~CONFIGPROTO()
-{
-XML_ParserFree(xmlParser);
-}
-//-----------------------------------------------------------------------------
-int CONFIGPROTO::ParseCommand()
-{
-list<string>::iterator n;
-int done = 0;
-char str[9];
-int len;
+    char ** ptr = he->h_addr_list;
+    while (*ptr != NULL)
+    {
+        struct sockaddr_in listenAddr;
+        listenAddr.sin_family = PF_INET;
+        listenAddr.sin_port = htons(m_port);
+        listenAddr.sin_addr.s_addr = *reinterpret_cast<in_addr_t *>(*ptr);
 
-if (requestList.empty())
-    return 0;
+        printfd(__FILE__, "Trying to bind to %s:%d\n", inet_ntostring(listenAddr.sin_addr.s_addr).c_str(), m_port);
 
-n = requestList.begin();
+        if (bind(m_listenSocket, reinterpret_cast<sockaddr *>(&listenAddr), sizeof(listenAddr)) == 0)
+            return true;
 
-strncpy(str, (*n).c_str(), 8);
-str[8] = 0;
+        m_errorStr = std::string("Cannot bind listen socket: '") + strerror(errno) + "'.";
+        printfd(__FILE__, "%s\n", m_errorStr.c_str());
+        m_logger(m_errorStr);
 
-XML_ParserReset(xmlParser, NULL);
-XML_SetElementHandler(xmlParser, ParseXMLStart, ParseXMLEnd);
-XML_SetUserData(xmlParser, this);
+        ++ptr;
+    }
 
-while(nonstop)
-    {
-    strncpy(str, (*n).c_str(), 8);
-    str[8] = 0;
-    len = strlen(str);
+    return false;
+}
 
-    n++;
-    if (n == requestList.end())
-        done = 1;
-    n--;
+void CONFIGPROTO::RegisterParsers()
+{
+    assert(m_settings != NULL);
+    assert(m_store != NULL);
+    assert(m_admins != NULL);
+    assert(m_users != NULL);
+    assert(m_tariffs != NULL);
 
-    if (XML_Parse(xmlParser, (*n).c_str(), len, done) == XML_STATUS_ERROR)
-        {
-        WriteServLog("Invalid configuration request");
-        printfd(__FILE__, "Parse error at line %d:\n%s\n",
-           XML_GetCurrentLineNumber(xmlParser),
-           XML_ErrorString(XML_GetErrorCode(xmlParser)));
-        if (currParser)
-            {
-            printfd(__FILE__, "Parser reset\n");
-            currParser->Reset();
-            currParser = NULL;
-            }
+    SP::GET_SERVER_INFO::FACTORY::Register(m_registry, *m_settings, *m_users, *m_tariffs);
 
-        return -1;
-        }
+    SP::GET_ADMINS::FACTORY::Register(m_registry, *m_admins);
+    SP::ADD_ADMIN::FACTORY::Register(m_registry, *m_admins);
+    SP::DEL_ADMIN::FACTORY::Register(m_registry, *m_admins);
+    SP::CHG_ADMIN::FACTORY::Register(m_registry, *m_admins);
 
-    if (done)
-        return 0;
+    SP::GET_TARIFFS::FACTORY::Register(m_registry, *m_tariffs);
+    SP::ADD_TARIFF::FACTORY::Register(m_registry, *m_tariffs);
+    SP::DEL_TARIFF::FACTORY::Register(m_registry, *m_tariffs, *m_users);
+    SP::CHG_TARIFF::FACTORY::Register(m_registry, *m_tariffs);
 
-    n++;
-    }
+    SP::GET_USERS::FACTORY::Register(m_registry, *m_users);
+    SP::GET_USER::FACTORY::Register(m_registry, *m_users);
+    SP::ADD_USER::FACTORY::Register(m_registry, *m_users);
+    SP::DEL_USER::FACTORY::Register(m_registry, *m_users);
+    SP::CHG_USER::FACTORY::Register(m_registry, *m_users, *m_store, *m_tariffs);
+    SP::CHECK_USER::FACTORY::Register(m_registry, *m_users);
 
-return 0;
+    SP::SEND_MESSAGE::FACTORY::Register(m_registry, *m_users);
+
+    SP::AUTH_BY::FACTORY::Register(m_registry, *m_users);
 }
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetPort(uint16_t p)
+
+int CONFIGPROTO::MaxFD() const
 {
-port = p;
+    int maxFD = m_listenSocket;
+    std::deque<STG::Conn *>::const_iterator it;
+    for (it = m_conns.begin(); it != m_conns.end(); ++it)
+        if (maxFD < (*it)->Sock())
+            maxFD = (*it)->Sock();
+    return maxFD;
 }
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetAdmins(ADMINS * a)
-{
-admins = a;
-for (unsigned int i = 0; i < dataParser.size(); i++)
-    {
-    dataParser[i]->SetAdmins(a);
-    }
 
+void CONFIGPROTO::BuildFDSet(fd_set & fds) const
+{
+    FD_ZERO(&fds);
+    FD_SET(m_listenSocket, &fds);
+    std::deque<STG::Conn *>::const_iterator it;
+    for (it = m_conns.begin(); it != m_conns.end(); ++it)
+        FD_SET((*it)->Sock(), &fds);
 }
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetUsers(USERS * u)
+
+void CONFIGPROTO::CleanupConns()
 {
-for (unsigned int i = 0; i < dataParser.size(); i++)
-    {
-    dataParser[i]->SetUsers(u);
-    }
+    std::deque<STG::Conn *>::iterator pos;
+    for (pos = m_conns.begin(); pos != m_conns.end(); ++pos)
+        if (((*pos)->IsDone() && !(*pos)->IsKeepAlive()) || !(*pos)->IsOk())
+        {
+            delete *pos;
+            *pos = NULL;
+        }
 
+    pos = std::remove(m_conns.begin(), m_conns.end(), static_cast<STG::Conn *>(NULL));
+    m_conns.erase(pos, m_conns.end());
 }
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetTariffs(TARIFFS * t)
+
+void CONFIGPROTO::HandleEvents(const fd_set & fds)
 {
-for (unsigned int i = 0; i < dataParser.size(); i++)
+    if (FD_ISSET(m_listenSocket, &fds))
+        AcceptConnection();
+    else
     {
-    dataParser[i]->SetTariffs(t);
+        std::deque<STG::Conn *>::iterator it;
+        for (it = m_conns.begin(); it != m_conns.end(); ++it)
+            if (FD_ISSET((*it)->Sock(), &fds))
+                (*it)->Read();
     }
 }
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetStore(BASE_STORE * s)
+
+void CONFIGPROTO::AcceptConnection()
 {
-for (unsigned int i = 0; i < dataParser.size(); i++)
+    struct sockaddr_in outerAddr;
+    socklen_t outerAddrLen(sizeof(outerAddr));
+    int sock = accept(m_listenSocket, reinterpret_cast<sockaddr *>(&outerAddr), &outerAddrLen);
+
+    if (sock < 0)
     {
-    dataParser[i]->SetStore(s);
+        m_errorStr = std::string("Failed to accept connection: '") + strerror(errno) + "'.";
+        printfd(__FILE__, "%s\n", m_errorStr.c_str());
+        m_logger(m_errorStr);
+        return;
     }
-}
-//-----------------------------------------------------------------------------
-void CONFIGPROTO::SetStgSettings(const SETTINGS * s)
-{
-for (unsigned int i = 0; i < dataParser.size(); i++)
+
+    assert(m_admins != NULL);
+
+    try
     {
-    dataParser[i]->SetStgSettings(s);
+        m_conns.push_back(new STG::Conn(m_registry, *m_admins, sock, outerAddr, m_logger));
+        printfd(__FILE__, "New connection from %s:%d. Total connections: %d\n", inet_ntostring(m_conns.back()->IP()).c_str(), m_conns.back()->Port(), m_conns.size());
+    }
+    catch (const STG::Conn::Error & error)
+    {
+        // Unlikely.
+        m_logger(std::string("Failed to create new client connection: '") + error.what() + "'.");
     }
 }
-//-----------------------------------------------------------------------------
-const string & CONFIGPROTO::GetStrError() const
-{
-return errorStr;
-}
-//-----------------------------------------------------------------------------
-uint32_t CONFIGPROTO::GetAdminIP() const
-{
-return adminIP;
-}
-//-----------------------------------------------------------------------------