]> git.stg.codes - stg.git/commitdiff
Implemeted reconnect in rlm_stg when the connection is lost.
authorMaxim Mamontov <faust.madf@gmail.com>
Sun, 6 Sep 2015 09:26:10 +0000 (12:26 +0300)
committerMaxim Mamontov <faust.madf@gmail.com>
Sun, 6 Sep 2015 09:26:10 +0000 (12:26 +0300)
projects/rlm_stg/iface.cpp
projects/rlm_stg/stg_client.cpp
projects/rlm_stg/stg_client.h

index 0cb30b932090e7a0721e17212009455d81bdd539..7394e288bd9be3f79cd5d2d6be563a3a708ec8d2 100644 (file)
@@ -89,6 +89,12 @@ STG_RESULT stgRequest(STG_CLIENT::TYPE type, const char* userName, const char* p
         return emptyResult();
     }
     try {
         return emptyResult();
     }
     try {
+        if (!client->connected())
+        {
+            if (!STG_CLIENT::reconnect())
+                return emptyResult();
+            client = STG_CLIENT::get();
+        }
         response.done = false;
         client->request(type, toString(userName), toString(password), fromSTGPairs(pairs));
         pthread_mutex_lock(&response.mutex);
         response.done = false;
         client->request(type, toString(userName), toString(password), fromSTGPairs(pairs));
         pthread_mutex_lock(&response.mutex);
index 0e6bdc7b90858a4eede6443d366761d8f67018e5..137fb61a22a4a1039029f43904f35aaf658741d7 100644 (file)
@@ -209,9 +209,11 @@ class STG_CLIENT::Impl
 {
 public:
     Impl(const std::string& address, Callback callback, void* data);
 {
 public:
     Impl(const std::string& address, Callback callback, void* data);
+    Impl(const Impl& rhs);
     ~Impl();
 
     bool stop();
     ~Impl();
 
     bool stop();
+    bool connected() const { return m_connected; }
 
     bool request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs);
 
 
     bool request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs);
 
@@ -234,6 +236,8 @@ private:
 
     ProtoParser m_parser;
 
 
     ProtoParser m_parser;
 
+    bool m_connected;
+
     void m_writeHeader(TYPE type, const std::string& userName, const std::string& password);
     void m_writePairBlock(const PAIRS& source);
     PAIRS m_readPairBlock();
     void m_writeHeader(TYPE type, const std::string& userName, const std::string& password);
     void m_writePairBlock(const PAIRS& source);
     PAIRS m_readPairBlock();
@@ -298,6 +302,11 @@ STG_CLIENT::STG_CLIENT(const std::string& address, Callback callback, void* data
 {
 }
 
 {
 }
 
+STG_CLIENT::STG_CLIENT(const STG_CLIENT& rhs)
+    : m_impl(new Impl(*rhs.m_impl))
+{
+}
+
 STG_CLIENT::~STG_CLIENT()
 {
 }
 STG_CLIENT::~STG_CLIENT()
 {
 }
@@ -307,6 +316,11 @@ bool STG_CLIENT::stop()
     return m_impl->stop();
 }
 
     return m_impl->stop();
 }
 
+bool STG_CLIENT::connected() const
+{
+    return m_impl->connected();
+}
+
 bool STG_CLIENT::request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs)
 {
     return m_impl->request(type, userName, password, pairs);
 bool STG_CLIENT::request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs)
 {
     return m_impl->request(type, userName, password, pairs);
@@ -331,6 +345,30 @@ bool STG_CLIENT::configure(const std::string& address, Callback callback, void*
     return false;
 }
 
     return false;
 }
 
+bool STG_CLIENT::reconnect()
+{
+    if (stgClient == NULL)
+    {
+        RadLog("Connection is not configured.");
+        return false;
+    }
+    if (!stgClient->stop())
+    {
+        RadLog("Failed to drop previous connection.");
+        return false;
+    }
+    try {
+        STG_CLIENT* old = stgClient;
+        stgClient = new STG_CLIENT(*old);
+        delete old;
+        return true;
+    } catch (const ChannelConfig::Error& ex) {
+        // TODO: Log it
+        RadLog("Client configuration error: %s.", ex.what());
+    }
+    return false;
+}
+
 STG_CLIENT::Impl::Impl(const std::string& address, Callback callback, void* data)
     : m_config(address),
       m_sock(connect()),
 STG_CLIENT::Impl::Impl(const std::string& address, Callback callback, void* data)
     : m_config(address),
       m_sock(connect()),
@@ -340,7 +378,25 @@ STG_CLIENT::Impl::Impl(const std::string& address, Callback callback, void* data
       m_lastActivity(m_lastPing),
       m_callback(callback),
       m_data(data),
       m_lastActivity(m_lastPing),
       m_callback(callback),
       m_data(data),
-      m_parser(&STG_CLIENT::Impl::process, this)
+      m_parser(&STG_CLIENT::Impl::process, this),
+      m_connected(true)
+{
+    int res = pthread_create(&m_thread, NULL, &STG_CLIENT::Impl::run, this);
+    if (res != 0)
+        throw Error("Failed to create thread: " + std::string(strerror(errno)));
+}
+
+STG_CLIENT::Impl::Impl(const Impl& rhs)
+    : m_config(rhs.m_config),
+      m_sock(connect()),
+      m_running(false),
+      m_stopped(true),
+      m_lastPing(time(NULL)),
+      m_lastActivity(m_lastPing),
+      m_callback(rhs.m_callback),
+      m_data(rhs.m_data),
+      m_parser(&STG_CLIENT::Impl::process, this),
+      m_connected(true)
 {
     int res = pthread_create(&m_thread, NULL, &STG_CLIENT::Impl::run, this);
     if (res != 0)
 {
     int res = pthread_create(&m_thread, NULL, &STG_CLIENT::Impl::run, this);
     if (res != 0)
@@ -356,6 +412,8 @@ STG_CLIENT::Impl::~Impl()
 
 bool STG_CLIENT::Impl::stop()
 {
 
 bool STG_CLIENT::Impl::stop()
 {
+    m_connected = false;
+
     if (m_stopped)
         return true;
 
     if (m_stopped)
         return true;
 
@@ -428,6 +486,7 @@ void STG_CLIENT::Impl::runImpl()
             m_running = tick();
     }
 
             m_running = tick();
     }
 
+    m_connected = false;
     m_stopped = true;
 }
 
     m_stopped = true;
 }
 
@@ -612,6 +671,7 @@ bool STG_CLIENT::Impl::write(void* data, const char* buf, size_t size)
         ssize_t res = ::send(impl.m_sock, buf, size, MSG_NOSIGNAL);
         if (res < 0)
         {
         ssize_t res = ::send(impl.m_sock, buf, size, MSG_NOSIGNAL);
         if (res < 0)
         {
+            impl.m_connected = false;
             RadLog("Failed to write data: %s.", strerror(errno));
             //conn.m_logger("Failed to write pong to '" + conn.m_remote + "': " + strerror(errno));
             return false;
             RadLog("Failed to write data: %s.", strerror(errno));
             //conn.m_logger("Failed to write pong to '" + conn.m_remote + "': " + strerror(errno));
             return false;
index ee15774f1a8b720b7243aab7c8908cd73bac4da8..b90b0e605265d32e2d7e07b05effb5939b75f1a5 100644 (file)
@@ -68,12 +68,15 @@ public:
     typedef bool (*Callback)(void* /*data*/, const RESULT& /*result*/, bool /*status*/);
 
     STG_CLIENT(const std::string& address, Callback callback, void* data);
     typedef bool (*Callback)(void* /*data*/, const RESULT& /*result*/, bool /*status*/);
 
     STG_CLIENT(const std::string& address, Callback callback, void* data);
+    STG_CLIENT(const STG_CLIENT& rhs);
     ~STG_CLIENT();
 
     bool stop();
     ~STG_CLIENT();
 
     bool stop();
+    bool connected() const;
 
     static STG_CLIENT* get();
     static bool configure(const std::string& address, Callback callback, void* data);
 
     static STG_CLIENT* get();
     static bool configure(const std::string& address, Callback callback, void* data);
+    static bool reconnect();
 
     bool request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs);
 
 
     bool request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs);