From 6559a5e4376e705b8052162a67f79d6dff31a469 Mon Sep 17 00:00:00 2001 From: Maxim Mamontov Date: Sun, 6 Sep 2015 12:26:10 +0300 Subject: [PATCH] Implemeted reconnect in rlm_stg when the connection is lost. --- projects/rlm_stg/iface.cpp | 6 ++++ projects/rlm_stg/stg_client.cpp | 62 ++++++++++++++++++++++++++++++++- projects/rlm_stg/stg_client.h | 3 ++ 3 files changed, 70 insertions(+), 1 deletion(-) diff --git a/projects/rlm_stg/iface.cpp b/projects/rlm_stg/iface.cpp index 0cb30b93..7394e288 100644 --- a/projects/rlm_stg/iface.cpp +++ b/projects/rlm_stg/iface.cpp @@ -89,6 +89,12 @@ STG_RESULT stgRequest(STG_CLIENT::TYPE type, const char* userName, const char* p return emptyResult(); } try { + if (!client->connected()) + { + if (!STG_CLIENT::reconnect()) + return emptyResult(); + client = STG_CLIENT::get(); + } response.done = false; client->request(type, toString(userName), toString(password), fromSTGPairs(pairs)); pthread_mutex_lock(&response.mutex); diff --git a/projects/rlm_stg/stg_client.cpp b/projects/rlm_stg/stg_client.cpp index 0e6bdc7b..137fb61a 100644 --- a/projects/rlm_stg/stg_client.cpp +++ b/projects/rlm_stg/stg_client.cpp @@ -209,9 +209,11 @@ class STG_CLIENT::Impl { public: Impl(const std::string& address, Callback callback, void* data); + Impl(const Impl& rhs); ~Impl(); bool stop(); + bool connected() const { return m_connected; } bool request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs); @@ -234,6 +236,8 @@ private: ProtoParser m_parser; + bool m_connected; + void m_writeHeader(TYPE type, const std::string& userName, const std::string& password); void m_writePairBlock(const PAIRS& source); PAIRS m_readPairBlock(); @@ -298,6 +302,11 @@ STG_CLIENT::STG_CLIENT(const std::string& address, Callback callback, void* data { } +STG_CLIENT::STG_CLIENT(const STG_CLIENT& rhs) + : m_impl(new Impl(*rhs.m_impl)) +{ +} + STG_CLIENT::~STG_CLIENT() { } @@ -307,6 +316,11 @@ bool STG_CLIENT::stop() return m_impl->stop(); } +bool STG_CLIENT::connected() const +{ + return m_impl->connected(); +} + bool STG_CLIENT::request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs) { return m_impl->request(type, userName, password, pairs); @@ -331,6 +345,30 @@ bool STG_CLIENT::configure(const std::string& address, Callback callback, void* return false; } +bool STG_CLIENT::reconnect() +{ + if (stgClient == NULL) + { + RadLog("Connection is not configured."); + return false; + } + if (!stgClient->stop()) + { + RadLog("Failed to drop previous connection."); + return false; + } + try { + STG_CLIENT* old = stgClient; + stgClient = new STG_CLIENT(*old); + delete old; + return true; + } catch (const ChannelConfig::Error& ex) { + // TODO: Log it + RadLog("Client configuration error: %s.", ex.what()); + } + return false; +} + STG_CLIENT::Impl::Impl(const std::string& address, Callback callback, void* data) : m_config(address), m_sock(connect()), @@ -340,7 +378,25 @@ STG_CLIENT::Impl::Impl(const std::string& address, Callback callback, void* data m_lastActivity(m_lastPing), m_callback(callback), m_data(data), - m_parser(&STG_CLIENT::Impl::process, this) + m_parser(&STG_CLIENT::Impl::process, this), + m_connected(true) +{ + int res = pthread_create(&m_thread, NULL, &STG_CLIENT::Impl::run, this); + if (res != 0) + throw Error("Failed to create thread: " + std::string(strerror(errno))); +} + +STG_CLIENT::Impl::Impl(const Impl& rhs) + : m_config(rhs.m_config), + m_sock(connect()), + m_running(false), + m_stopped(true), + m_lastPing(time(NULL)), + m_lastActivity(m_lastPing), + m_callback(rhs.m_callback), + m_data(rhs.m_data), + m_parser(&STG_CLIENT::Impl::process, this), + m_connected(true) { int res = pthread_create(&m_thread, NULL, &STG_CLIENT::Impl::run, this); if (res != 0) @@ -356,6 +412,8 @@ STG_CLIENT::Impl::~Impl() bool STG_CLIENT::Impl::stop() { + m_connected = false; + if (m_stopped) return true; @@ -428,6 +486,7 @@ void STG_CLIENT::Impl::runImpl() m_running = tick(); } + m_connected = false; m_stopped = true; } @@ -612,6 +671,7 @@ bool STG_CLIENT::Impl::write(void* data, const char* buf, size_t size) ssize_t res = ::send(impl.m_sock, buf, size, MSG_NOSIGNAL); if (res < 0) { + impl.m_connected = false; RadLog("Failed to write data: %s.", strerror(errno)); //conn.m_logger("Failed to write pong to '" + conn.m_remote + "': " + strerror(errno)); return false; diff --git a/projects/rlm_stg/stg_client.h b/projects/rlm_stg/stg_client.h index ee15774f..b90b0e60 100644 --- a/projects/rlm_stg/stg_client.h +++ b/projects/rlm_stg/stg_client.h @@ -68,12 +68,15 @@ public: typedef bool (*Callback)(void* /*data*/, const RESULT& /*result*/, bool /*status*/); STG_CLIENT(const std::string& address, Callback callback, void* data); + STG_CLIENT(const STG_CLIENT& rhs); ~STG_CLIENT(); bool stop(); + bool connected() const; static STG_CLIENT* get(); static bool configure(const std::string& address, Callback callback, void* data); + static bool reconnect(); bool request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs); -- 2.44.2