From fd41f8dfd1133e89f6b5ac675fe2b5068b6bb652 Mon Sep 17 00:00:00 2001 From: Maxim Mamontov Date: Mon, 31 Aug 2015 21:56:45 +0300 Subject: [PATCH] Implemented async parser in rlm_stg. --- projects/rlm_stg/stg_client.cpp | 113 ++++++++++++++++++++------------ projects/rlm_stg/stg_client.h | 2 +- 2 files changed, 72 insertions(+), 43 deletions(-) diff --git a/projects/rlm_stg/stg_client.cpp b/projects/rlm_stg/stg_client.cpp index 399b971d..0e6bdc7b 100644 --- a/projects/rlm_stg/stg_client.cpp +++ b/projects/rlm_stg/stg_client.cpp @@ -20,6 +20,8 @@ #include "stg_client.h" +#include "radlog.h" + #include "stg/json_parser.h" #include "stg/json_generator.h" #include "stg/common.h" @@ -46,8 +48,8 @@ using STG::JSON::StringGen; namespace { -double CONN_TIMEOUT = 5; -double PING_TIMEOUT = 1; +double CONN_TIMEOUT = 60; +double PING_TIMEOUT = 10; STG_CLIENT* stgClient = NULL; @@ -104,11 +106,13 @@ class ResultParser : public EnumParser class TopParser : public NodeParser { public: - TopParser() + typedef void (*Callback) (void* /*data*/); + TopParser(Callback callback, void* data) : m_packetParser(this, m_packet, m_packetStr), m_resultParser(this, m_result, m_resultStr), m_replyParser(this, m_reply), - m_modifyParser(this, m_modify) + m_modifyParser(this, m_modify), + m_callback(callback), m_data(data) {} virtual NodeParser* parseStartMap() { return this; } @@ -127,7 +131,7 @@ class TopParser : public NodeParser return this; } - virtual NodeParser* parseEndMap() { return this; } + virtual NodeParser* parseEndMap() { m_callback(m_data); return this; } const std::string& packetStr() const { return m_packetStr; } Packet packet() const { return m_packet; } @@ -148,12 +152,18 @@ class TopParser : public NodeParser ResultParser m_resultParser; PairsParser m_replyParser; PairsParser m_modifyParser; + + Callback m_callback; + void* m_data; }; class ProtoParser : public Parser { public: - ProtoParser() : Parser( &m_topParser ) {} + ProtoParser(TopParser::Callback callback, void* data) + : Parser( &m_topParser ), + m_topParser(callback, data) + {} const std::string& packetStr() const { return m_topParser.packetStr(); } Packet packet() const { return m_topParser.packet(); } @@ -183,7 +193,7 @@ class PacketGen : public Gen m_gen.add(key, new StringGen(value)); return *this; } - PacketGen& add(const std::string& key, MapGen* map) + PacketGen& add(const std::string& key, MapGen& map) { m_gen.add(key, map); return *this; @@ -239,10 +249,10 @@ private: bool read(); bool tick(); - bool process(); - bool processPing(); - bool processPong(); - bool processData(); + static void process(void* data); + void processPing(); + void processPong(); + void processData(); bool sendPing(); bool sendPong(); @@ -316,6 +326,7 @@ bool STG_CLIENT::configure(const std::string& address, Callback callback, void* return true; } catch (const ChannelConfig::Error& ex) { // TODO: Log it + RadLog("Client configuration error: %s.", ex.what()); } return false; } @@ -328,9 +339,10 @@ STG_CLIENT::Impl::Impl(const std::string& address, Callback callback, void* data m_lastPing(time(NULL)), m_lastActivity(m_lastPing), m_callback(callback), - m_data(data) + m_data(data), + m_parser(&STG_CLIENT::Impl::process, this) { - int res = pthread_create(&m_thread, NULL, run, this); + int res = pthread_create(&m_thread, NULL, &STG_CLIENT::Impl::run, this); if (res != 0) throw Error("Failed to create thread: " + std::string(strerror(errno))); } @@ -364,15 +376,15 @@ bool STG_CLIENT::Impl::stop() bool STG_CLIENT::Impl::request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs) { - boost::scoped_ptr map(new MapGen); + MapGen map; for (PAIRS::const_iterator it = pairs.begin(); it != pairs.end(); ++it) - map->add(it->first, new StringGen(it->second)); - map->add("Radius-Username", new StringGen(userName)); - map->add("Radius-Userpass", new StringGen(password)); + map.add(it->first, new StringGen(it->second)); + map.add("Radius-Username", new StringGen(userName)); + map.add("Radius-Userpass", new StringGen(password)); PacketGen gen("data"); gen.add("stage", toStage(type)) - .add("pairs", map.get()); + .add("pairs", map); m_lastPing = time(NULL); @@ -396,6 +408,9 @@ void STG_CLIENT::Impl::runImpl() int res = select(m_sock + 1, &fds, NULL, NULL, &tv); if (res < 0) { + if (errno == EINTR) + continue; + RadLog("'select' is failed: %s", strerror(errno)); //m_error = std::string("'select' is failed: '") + strerror(errno) + "'."; //m_logger(m_error); break; @@ -456,6 +471,7 @@ int STG_CLIENT::Impl::connectTCP() { shutdown(fd, SHUT_RDWR); close(fd); + RadLog("'connect' is failed: %s", strerror(errno)); // TODO: log it. continue; } @@ -479,7 +495,7 @@ int STG_CLIENT::Impl::connectUNIX() strncpy(addr.sun_path, m_config.address.c_str(), m_config.address.length()); if (::connect(fd, reinterpret_cast(&addr), sizeof(addr)) == -1) { - Error error(std::string("Error binding UNIX socket: ") + strerror(errno)); + Error error(std::string("Error connecting UNIX socket: ") + strerror(errno)); shutdown(fd, SHUT_RDWR); close(fd); throw error; @@ -493,18 +509,16 @@ bool STG_CLIENT::Impl::read() ssize_t res = ::read(m_sock, buffer.data(), buffer.size()); if (res < 0) { + RadLog("Failed to read data: ", strerror(errno)); //m_logger("Failed to read data from '" + m_remote + "': " + strerror(errno)); return false; } m_lastActivity = time(NULL); + RadLog("Read %d bytes.\n%s\n", res, std::string(buffer.data(), res).c_str()); if (res == 0) { - if (!m_parser.done()) - { - //m_logger("Failed to read data from '" + m_remote + "': " + strerror(errno)); - return false; - } - return process(); + m_parser.last(); + return false; } return m_parser.append(buffer.data(), res); } @@ -514,48 +528,59 @@ bool STG_CLIENT::Impl::tick() time_t now = time(NULL); if (difftime(now, m_lastActivity) > CONN_TIMEOUT) { + int delta = difftime(now, m_lastActivity); + RadLog("Connection timeout: %d sec.", delta); //m_logger("Connection to " + m_remote + " timed out."); return false; } if (difftime(now, m_lastPing) > PING_TIMEOUT) + { + int delta = difftime(now, m_lastPing); + RadLog("Ping timeout: %d sec. Sending ping...", delta); sendPing(); + } return true; } -bool STG_CLIENT::Impl::process() +void STG_CLIENT::Impl::process(void* data) { - switch (m_parser.packet()) + Impl& impl = *static_cast(data); + switch (impl.m_parser.packet()) { case PING: - return processPing(); + impl.processPing(); + return; case PONG: - return processPong(); + impl.processPong(); + return; case DATA: - return processData(); + impl.processData(); + return; } - //m_logger("Received invalid packet type: " + m_parser.packetStr()); - return false; + RadLog("Received invalid packet type: '%s'.", impl.m_parser.packetStr().c_str()); } -bool STG_CLIENT::Impl::processPing() +void STG_CLIENT::Impl::processPing() { - return sendPong(); + RadLog("Got ping, sending pong."); + sendPong(); } -bool STG_CLIENT::Impl::processPong() +void STG_CLIENT::Impl::processPong() { + RadLog("Got pong."); m_lastActivity = time(NULL); - return true; } -bool STG_CLIENT::Impl::processData() +void STG_CLIENT::Impl::processData() { - RESULT result; + RESULT data; + RadLog("Got data."); for (PairsParser::Pairs::const_iterator it = m_parser.reply().begin(); it != m_parser.reply().end(); ++it) - result.reply.push_back(std::make_pair(it->first, it->second)); + data.reply.push_back(std::make_pair(it->first, it->second)); for (PairsParser::Pairs::const_iterator it = m_parser.modify().begin(); it != m_parser.modify().end(); ++it) - result.modify.push_back(std::make_pair(it->first, it->second)); - return m_callback(m_data, result); + data.modify.push_back(std::make_pair(it->first, it->second)); + m_callback(m_data, data, m_parser.result()); } bool STG_CLIENT::Impl::sendPing() @@ -578,12 +603,16 @@ bool STG_CLIENT::Impl::sendPong() bool STG_CLIENT::Impl::write(void* data, const char* buf, size_t size) { + RadLog("Sending JSON:"); + std::string json(buf, size); + RadLog("%s", json.c_str()); STG_CLIENT::Impl& impl = *static_cast(data); while (size > 0) { - ssize_t res = ::write(impl.m_sock, buf, size); + ssize_t res = ::send(impl.m_sock, buf, size, MSG_NOSIGNAL); if (res < 0) { + RadLog("Failed to write data: %s.", strerror(errno)); //conn.m_logger("Failed to write pong to '" + conn.m_remote + "': " + strerror(errno)); return false; } diff --git a/projects/rlm_stg/stg_client.h b/projects/rlm_stg/stg_client.h index 82d2287b..ee15774f 100644 --- a/projects/rlm_stg/stg_client.h +++ b/projects/rlm_stg/stg_client.h @@ -65,7 +65,7 @@ public: Error(const std::string& message) : runtime_error(message) {} }; - typedef bool (*Callback)(void* data, const RESULT& result); + typedef bool (*Callback)(void* /*data*/, const RESULT& /*result*/, bool /*status*/); STG_CLIENT(const std::string& address, Callback callback, void* data); ~STG_CLIENT(); -- 2.44.2