#include <cerrno>
#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
using STG::Conn;
using STG::Config;
namespace
{
-double CONN_TIMEOUT = 5;
-double PING_TIMEOUT = 1;
+double CONN_TIMEOUT = 60;
+double PING_TIMEOUT = 10;
enum Packet
{
class TopParser : public NodeParser
{
public:
- TopParser()
+ typedef void (*Callback) (void* /*data*/);
+ TopParser(Callback callback, void* data)
: m_packetParser(this, m_packet, m_packetStr),
m_stageParser(this, m_stage, m_stageStr),
- m_pairsParser(this, m_data)
+ m_pairsParser(this, m_data),
+ m_callback(callback), m_callbackData(data)
{}
virtual NodeParser* parseStartMap() { return this; }
return this;
}
- virtual NodeParser* parseEndMap() { return this; }
+ virtual NodeParser* parseEndMap() { m_callback(m_callbackData); return this; }
const std::string& packetStr() const { return m_packetStr; }
Packet packet() const { return m_packet; }
PacketParser m_packetParser;
StageParser m_stageParser;
PairsParser m_pairsParser;
+
+ Callback m_callback;
+ void* m_callbackData;
};
class ProtoParser : public Parser
{
public:
- ProtoParser() : Parser( &m_topParser ) {}
+ ProtoParser(TopParser::Callback callback, void* data)
+ : Parser( &m_topParser ),
+ m_topParser(callback, data)
+ {}
const std::string& packetStr() const { return m_topParser.packetStr(); }
Packet packet() const { return m_topParser.packet(); }
m_gen.add(key, map);
return *this;
}
+ PacketGen& add(const std::string& key, MapGen& map)
+ {
+ m_gen.add(key, map);
+ return *this;
+ }
private:
MapGen m_gen;
StringGen m_type;
time_t m_lastActivity;
ProtoParser m_parser;
- bool process();
- bool processPing();
- bool processPong();
- bool processData();
+ static void process(void* data);
+ void processPing();
+ void processPong();
+ void processData();
bool answer(const USER& user);
bool answerNo();
bool sendPing();
m_remote(remote),
m_ok(true),
m_lastPing(time(NULL)),
- m_lastActivity(m_lastPing)
+ m_lastActivity(m_lastPing),
+ m_parser(&Conn::Impl::process, this)
{
}
m_ok = false;
return false;
}
+ printfd(__FILE__, "Read %d bytes.\n%s\n", res, std::string(buffer.data(), res).c_str());
m_lastActivity = time(NULL);
if (res == 0)
{
- if (!m_parser.done())
- {
- m_ok = false;
- m_logger("Failed to read data from '" + m_remote + "': " + strerror(errno));
- return false;
- }
- return process();
+ m_ok = false;
+ return true;
}
return m_parser.append(buffer.data(), res);
}
time_t now = time(NULL);
if (difftime(now, m_lastActivity) > CONN_TIMEOUT)
{
+ int delta = difftime(now, m_lastActivity);
+ printfd(__FILE__, "Connection to '%s' timed out: %d sec.\n", m_remote.c_str(), delta);
m_logger("Connection to " + m_remote + " timed out.");
m_ok = false;
return false;
}
if (difftime(now, m_lastPing) > PING_TIMEOUT)
+ {
+ int delta = difftime(now, m_lastPing);
+ printfd(__FILE__, "Ping timeout: %d sec. Sending ping...\n", delta);
sendPing();
+ }
return true;
}
-bool Conn::Impl::process()
+void Conn::Impl::process(void* data)
{
- switch (m_parser.packet())
+ Impl& impl = *static_cast<Impl*>(data);
+ switch (impl.m_parser.packet())
{
case PING:
- return processPing();
+ impl.processPing();
+ return;
case PONG:
- return processPong();
+ impl.processPong();
+ return;
case DATA:
- return processData();
+ impl.processData();
+ return;
}
- m_logger("Received invalid packet type: " + m_parser.packetStr());
- return false;
+ printfd(__FILE__, "Received invalid packet type: '%s'.\n", impl.m_parser.packetStr().c_str());
+ impl.m_logger("Received invalid packet type: " + impl.m_parser.packetStr());
}
-bool Conn::Impl::processPing()
+void Conn::Impl::processPing()
{
- return sendPong();
+ printfd(__FILE__, "Got ping. Sending pong...\n");
+ sendPong();
}
-bool Conn::Impl::processPong()
+void Conn::Impl::processPong()
{
+ printfd(__FILE__, "Got pong.\n");
m_lastActivity = time(NULL);
- return true;
}
-bool Conn::Impl::processData()
+void Conn::Impl::processData()
{
+ printfd(__FILE__, "Got data.\n");
int handle = m_users.OpenSearch();
USER_PTR user = NULL;
- bool match = true;
- while (m_users.SearchNext(handle, &user))
+ bool match = false;
+ while (m_users.SearchNext(handle, &user) == 0)
{
if (user == NULL)
continue;
answerNo();
m_users.CloseSearch(handle);
-
- return true;
}
bool Conn::Impl::answer(const USER& user)
{
- boost::scoped_ptr<MapGen> reply(new MapGen);
+ printfd(__FILE__, "Got match. Sending answer...\n");
+ MapGen reply;
for (Config::Pairs::const_iterator it = m_config.reply.begin(); it != m_config.reply.end(); ++it)
- reply->add(it->first, new StringGen(user.GetParamValue(it->second)));
+ reply.add(it->first, new StringGen(user.GetParamValue(it->second)));
- boost::scoped_ptr<MapGen> modify(new MapGen);
+ MapGen modify;
for (Config::Pairs::const_iterator it = m_config.modify.begin(); it != m_config.modify.end(); ++it)
- modify->add(it->first, new StringGen(user.GetParamValue(it->second)));
+ modify.add(it->first, new StringGen(user.GetParamValue(it->second)));
PacketGen gen("data");
gen.add("result", "ok")
- .add("reply", reply.get())
- .add("modify", modify.get());
+ .add("reply", reply)
+ .add("modify", modify);
m_lastPing = time(NULL);
bool Conn::Impl::answerNo()
{
+ printfd(__FILE__, "No match. Sending answer...\n");
PacketGen gen("data");
- gen.add("result", "ok");
+ gen.add("result", "no");
m_lastPing = time(NULL);
bool Conn::Impl::write(void* data, const char* buf, size_t size)
{
+ std::string json(buf, size);
+ printfd(__FILE__, "Writing JSON:\n%s\n", json.c_str());
Conn::Impl& conn = *static_cast<Conn::Impl*>(data);
while (size > 0)
{
- ssize_t res = ::write(conn.m_sock, buf, size);
+ ssize_t res = ::send(conn.m_sock, buf, size, MSG_NOSIGNAL);
if (res < 0)
{
conn.m_logger("Failed to write pong to '" + conn.m_remote + "': " + strerror(errno));