#include "stg/common.h"
+#include <boost/bind.hpp>
+
#include <stdexcept>
namespace {
STG_CLIENT::STG_CLIENT(const std::string& address)
: m_config(address),
- m_proto(m_config.transport, m_config.key)
+ m_proto(m_config.transport, m_config.key),
+ m_thread(boost::bind(&STG_CLIENT::m_run, this))
{
- try {
- m_proto.connect(m_config.address, m_config.port);
- } catch (const STG::SGCP::Proto::Error& ex) {
- throw Error(ex.what());
- }
}
STG_CLIENT::~STG_CLIENT()
{
+ stop();
+}
+
+bool STG_CLIENT::stop()
+{
+ return m_proto.stop();
}
RESULT STG_CLIENT::request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs)
bool STG_CLIENT::configure(const std::string& address)
{
- if ( stgClient != NULL )
+ if ( stgClient != NULL && stgClient->stop() )
delete stgClient;
try {
stgClient = new STG_CLIENT(address);
throw Error(ex.what());
}
}
+
+void STG_CLIENT::m_run()
+{
+ m_proto.connect(m_config.address, m_config.port);
+ m_proto.run();
+}
#include "stg/sgcp_types.h" // TransportType
#include "stg/os_int.h"
+#include <boost/thread.hpp>
+
#include <string>
#include <vector>
#include <utility>
STG_CLIENT(const std::string& address);
~STG_CLIENT();
+ bool stop();
+
static STG_CLIENT* get();
static bool configure(const std::string& address);
private:
ChannelConfig m_config;
STG::SGCP::Proto m_proto;
+ boost::thread m_thread;
void m_writeHeader(TYPE type, const std::string& userName, const std::string& password);
void m_writePairBlock(const PAIRS& source);
PAIRS m_readPairBlock();
+
+ void m_run();
};
#endif
--- /dev/null
+#ifndef __STG_SGCP_CONN_H__
+#define __STG_SGCP_CONN_H__
+
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/*
+ * Author : Maxim Mamontov <faust@stargazer.dp.ua>
+ */
+
+#include "stg/os_int.h"
+
+#include <boost/asio/basic_stream_socket.hpp>
+#include <boost/function.hpp>
+
+#include <string>
+
+namespace STG
+{
+namespace SGCP
+{
+
+class Connection : public boost::enable_shared_from_this<Connection>
+{
+ public:
+ struct Chunk;
+ typedef boost::function<Chunk (uint16_t /*type*/, uint32_t /*size*/)> Dispatcher;
+ typedef boost::function<Chunk (const std::string& /*error*/)> Continuation;
+ typedef boost::function<void (const std::string& /*error*/)> ErrorHandler;
+ struct Chunk
+ {
+ void* buffer;
+ size_t size;
+ Continuation continuation;
+ };
+
+ Connection(Dispatcher dispatcher, ErrorHandler errorHandler) : m_dispatcher(dispatcher), m_errorHandler(errorHandler) {}
+ virtual ~Connection() {}
+
+ virtual boost::asio::basic_stream_socket& socket() = 0;
+
+ virtual void send(const void* data, size_t size) = 0;
+
+ virtual void start() = 0;
+ virtual void stop() = 0;
+
+ protected:
+ Dispatcher m_dispatcher;
+ ErrorHandler m_errorHandler;
+};
+
+} // namespace SGCP
+} // namespace STG
+
+#endif
#include "stg/os_int.h"
+#include <boost/asio/basic_stream_socket.hpp>
+#include <boost/function.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/system/error_code.hpp>
+
#include <string>
#include <vector>
#include <stdexcept>
class Proto
{
public:
+ enum { CONTEXT = 0 };
+ enum PacketType {
+ INFO = 0,
+ PING,
+ PONG,
+ DATA
+ };
+
struct Error : public std::runtime_error
{
- Error(const std::string& mesage);
- Error();
+ Error(const std::string& mesage) : runtime_error(message) {}
};
+ typedef boost::function<void (ConnectionPtr /*conn*/, const std::string& /*enpoint*/, const std::string& /*error*/)> AcceptHandler;
+
Proto(TransportType transport, const std::string& key);
~Proto();
- void connect(const std::string& address, uint16_t port);
-
- void writeAllBuf(const void* buf, size_t size);
- void readAllBuf(void* buf, size_t size);
-
- template <typename T>
- void writeAll(const T& value);
+ ConnectionPtr connect(const std::string& address, uint16_t port);
+ void bind(const std::string& address, uint16_t port, AcceptHandler handler);
- template <typename T>
- T readAll();
+ void run();
+ bool stop();
private:
- TransportProto* m_transport;
+ class Impl;
+ boost::scoped_ptr<Impl> m_impl;
};
-template <>
-inline
-void Proto::writeAll<uint64_t>(const uint64_t& value)
-{
- uint64_t temp = hton(value);
- writeAllBuf(&temp, sizeof(temp));
-}
-
-template <>
-inline
-void Proto::writeAll<std::string>(const std::string& value)
-{
- uint64_t size = hton(value.size());
- writeAllBuf(&size, sizeof(size));
- writeAllBuf(value.c_str(), value.size());
-}
-
-template <>
-inline
-uint64_t Proto::readAll<uint64_t>()
-{
- uint64_t temp = 0;
- readAllBuf(&temp, sizeof(temp));
- return ntoh(temp);
-}
-
-template <>
-inline
-std::string Proto::readAll<std::string>()
-{
- uint64_t size = 0;
- readAllBuf(&size, sizeof(size));
- size = ntoh(size);
- std::vector<char> res(size);
- readAllBuf(res.data(), res.size());
- return res.data();
-}
-
} // namespace SGCP
} // namespace STG
};
static TransportProto* create(TransportType transport, const std::string& key);
- static TransportProto* create(TransportType transport);
virtual ~TransportProto() {}
- virtual void connect(const std::string& address, uint16_t port) = 0;
- virtual ssize_t write(const void* buf, size_t size) = 0;
- virtual ssize_t read(void* buf, size_t size) = 0;
+ virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0;
+ virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0;
};
} // namespace SGCP
enum TransportType
{
UNIX,
- UDP,
- TCP
+ TCP,
+ SSL
};
} // namespace SGCP
#include <vector>
#include <arpa/inet.h> // hton*
-#include <netinet/in.h> // in_addr
namespace STG
{
template <> inline uint64_t hton(uint64_t value) { return htonll(value); }
template <> inline int64_t hton(int64_t value) { return htonll(value); }
-std::vector<in_addr> resolve(const std::string& address);
-
} // namespace SGCP
} // namespace STG
--- /dev/null
+#include "packet.h"
+
+#include "stg/sgcp_utils.h"
+
+#include <ctime>
+
+using STG::SGCP::Packet;
+
+uint64_t Packet::MAGIC = 0x5f8edc0fdb6d3113; // Carefully picked random 64-bit number :)
+uint16_t Packet::VERSION = 1;
+
+Packet::Packet(uint16_t ver, uint16_t t, uint16_t sz)
+ : magic(MAGIC),
+ senderTime(time(NULL)),
+ version(ver),
+ type(t),
+ size(s)
+{
+}
+
+Packet hton(Packet value)
+{
+ value.magic = hton(value.magic);
+ value.senderTime = hton(value.senderTime);
+ value.version = hton(value.version);
+ value.type = hton(value.type);
+ value.size = hton(value.size);
+}
--- /dev/null
+#ifndef __STG_SGCP_PACKET_H__
+#define __STG_SGCP_PACKET_H__
+
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/*
+ * Author : Maxim Mamontov <faust@stargazer.dp.ua>
+ */
+
+#include <boost/stdint.hpp>
+
+namespace STG
+{
+namespace SGCP
+{
+
+struct __attribute__ ((__packed__)) Packet
+{
+ Packet(uint16_t type, uint16_t size);
+
+ bool valid() const { return magic == MAGIC; }
+
+ static uint64_t MAGIC;
+ static uint16_t VERSION;
+
+ enum Types { PING, PONG, DATA };
+
+ uint64_t magic;
+ uint64_t senderTime;
+ uint16_t version;
+ uint16_t type;
+ uint32_t size;
+};
+
+Packet hton(Packet value);
+
+} // namespace SGCP
+} // namespace STG
+
+#endif
using STG::SGCP::Proto;
-Proto::Error::Error(const std::string& message)
- : runtime_error(message)
-{}
-
-Proto::Error::Error()
- : runtime_error(strerror(errno))
-{}
-
Proto::Proto(TransportType transport, const std::string& key)
- : m_transport(TransportProto::create(transport, key))
+ : m_impl(new Impl(transport, key))
{
}
Proto::~Proto()
{
- delete m_transport;
}
-void Proto::connect(const std::string& address, uint16_t port)
+ConnectionPtr Proto::connect(const std::string& address, uint16_t port)
{
- try {
- m_transport->connect(address, port);
- } catch (const TransportProto::Error& ex) {
- throw Error(ex.what());
- }
+ m_impl->connect(adress, port);
}
-void Proto::writeAllBuf(const void* buf, size_t size)
+void Proto::bind(const std::string& address, uint16_t port, AcceptHandler handler)
{
- const char* pos = static_cast<const char*>(buf);
- while (size > 0) {
- ssize_t res = m_transport->write(pos, size);
- if (res < 0)
- throw Error();
- if (res == 0)
- return;
- size -= res;
- pos += res;
- }
+ m_impl->bind(address, port, handler);
+}
+
+void Proto::run()
+{
+ m_impl->run();
+}
+
+bool Proto::stop()
+{
+ return m_impl->stop();
+}
+
+class Proto::Impl
+{
+ public:
+ Impl(TransportType transport, const std::string& key);
+ ~Impl();
+
+ Connection& connect(const std::string& address, uint16_t port);
+ void bind(const std::string& address, uint16_t port, AcceptHandler handler);
+
+ void run();
+ bool stop();
+
+ private:
+ ba::io_service m_ios;
+ boost::scoped_ptr<Transport> m_transport;
+ std::vector<ConnectionPtr> m_conns;
+ bool m_running;
+ bool m_stopped;
+};
+
+Proto::Impl::Impl(TransportType transport, const std::string& key)
+ : m_transport(makeTransport(transport, key)),
+ m_running(false),
+ m_stopped(true)
+{
+}
+
+Proto::Impl::~Impl()
+{
+ stop();
+}
+
+ConnectionPtr Proto::Impl::connect(const std::string& address, uint16_t port)
+{
+ return m_transport->connect(address, port);
+}
+
+void Proto::Impl::bind(const std::string& address, uint16_t port, AcceptHandler handler)
+{
+ m_transport->bind(address, port, handler);
+}
+
+void Proto::Impl::run()
+{
+ m_stopped = false;
+ m_running = true;
+ while (m_running)
+ m_ios.run_once();
+ m_stopped = true;
}
-void Proto::readAllBuf(void* buf, size_t size)
+bool Proto::Impl::stop()
{
- char* pos = static_cast<char*>(buf);
- while (size > 0) {
- ssize_t res = m_transport->read(pos, size);
- if (res < 0)
- throw Error();
- if (res == 0)
- return;
- size -= res;
- pos += res;
+ for (size_t i = 0; i < m_conns.size(); ++i)
+ m_conns[i]->stop();
+ m_ios.stop();
+ for (size_t i = 0; i < 10 && !m_ios.stopped(); ++i) {
+ timspec ts;
+ ts.tv_sec = 0;
+ ts.tv_nsec = 10000000; // 10 msec
}
+ return m_ios.stopped();
}
--- /dev/null
+#ifndef __STG_SGCP_STREAM_CONN_H__
+#define __STG_SGCP_STREAM_CONN_H__
+
+/*
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+/*
+ * Author : Maxim Mamontov <faust@stargazer.dp.ua>
+ */
+
+namespace STG
+{
+namespace SGCP
+{
+namespace Impl
+{
+
+template <typename Stream>
+class StreamConn : public Connection
+{
+ public:
+ StreamConn(ba::io_service& ios, Dispatcher dispatcher, ErrorHandler errorHandler)
+ : Connection(dispatcher, errorHandler),
+ m_socket(ios)
+ {
+ }
+
+ socket::endpoint_type& endpoint() { return m_endpoint; }
+
+ virtual ba::basic_stream_socket& socket() { return m_socket; }
+
+ virtual void start()
+ {
+ ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
+ boost::bind(&StreamConn::m_handleReadHeader, this));
+ }
+ virtual void stop() { m_socket.shutdown(socket::shutdown_both); }
+
+ virtual void send(const void* data, size_t size)
+ {
+ Packet* packet = new Packet(Packet::DATA, size));
+ *packet = hton(*packet);
+ boost::array<ba::const_buffer, 2> data = {
+ ba::buffer(packet, sizeof(*packet)),
+ ba::buffer(data, size)
+ };
+ ba::write(m_socket, data, boost::bind(&StreamConn::m_handleWrite, this, packet, boost::_1, boost::_2));
+ }
+
+ private:
+ typedef Stream::socket socket;
+ socket m_socket;
+ Packet m_packet;
+
+ void m_handleReadHeader(const boost::system::error_code& ec, size_t size)
+ {
+ if (ec) {
+ // TODO: Handle errors.
+ /*if (ec != ba::error::operation_aborted)
+ m_errorHandler(ec);*/
+ return;
+ }
+ Packet packet = ntoh(m_packet);
+ Chunk chunk = m_dispatcher(packet.type, packet.size);
+ if (chunk.size == 0) {
+ // TODO: Discard current data.
+ ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
+ boost::bind(&StreamConn::m_handleReadHeader, this));
+ return;
+ }
+ ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size),
+ boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2));
+ }
+
+ void m_handleReadData(Packet packet, Chunk chunk, const boost::system::error_code& ec, size_t size)
+ {
+ if (ec) {
+ // TODO: Handle errors.
+ /*if (ec != ba::error::operation_aborted)
+ m_errorHandler(ec);*/
+ return;
+ }
+ chunk = chunk.continuation(""); // TODO: Handle errors.
+ if (chunk.size == 0) {
+ // TODO: Discard current data.
+ ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
+ boost::bind(&StreamConn::m_handleReadHeader, this));
+ return;
+ }
+ ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size),
+ boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2));
+ }
+
+ void m_handleWrite(Packet* packet, const boost::system::error_code& ec, size_t size)
+ {
+ delete packet;
+ if (ec) {
+ // TODO: Handle errors.
+ /*if (ec != ba::error::operation_aborted)
+ m_errorHandler(ec);*/
+ return;
+ }
+ }
+};
+
+typedef StreamConn<ba::ip::tcp> TCPConn;
+typedef StreamConn<ba::local::stream_protocol> UNIXConn;
+
+} // namespace Impl
+} // namespace SGCP
+} // namespace STG
+
+#endif
using STG::SGCP::TCPProto;
-TCPProto::TCPProto()
- : m_sock(socket(AF_INET, SOCK_STREAM, 0))
+TCPProto::TCPProto(ba::io_service& ios)
+ : m_ios(ios),
+ m_acceptor(m_ios)
{
}
close(m_sock);
}
-void TCPProto::connect(const std::string& address, uint16_t port)
+ConnectionPtr TCPProto::connect(const std::string& address, uint16_t port)
{
- std::vector<in_addr> addrs = resolve(address);
-
- for (size_t i = 0; i < addrs.size(); ++i) {
- sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = hton(port);
- addr.sin_addr = addrs[i];
-
- if (::connect(m_sock, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == 0)
- return;
-
- close(m_sock);
- m_sock = socket(AF_INET, SOCK_STREAM, 0);
- }
- throw Error("No more addresses to connect to.");
+ bs::error_code ec;
+ ConnectionPtr conn = boost::make_shared(m_ios);
+ conn.socket().connect(ba::local::stream_protocol::enpoint(address, port), ec);
+ if (ec)
+ throw Error(ec.message());
+ conn->start();
+ return conn;
}
-ssize_t TCPProto::write(const void* buf, size_t size)
+void TCPProto::bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler)
{
- return ::write(m_sock, buf, size);
+ bs::error_code ec;
+ m_acceptor.bind(address, ec);
+ if (ec)
+ throw Error(ec.message());
+
+ TCPConn* conn = new TCPConn(m_ios);
+ m_acceptor.async_accept(conn->socket(), conn->endpoint(), boost::bind(&TCPProto::m_handleAccept, this, conn, handler, boost::_1);
}
-ssize_t TCPProto::read(void* buf, size_t size)
+void TCPProto::m_handleAccept(TCPConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
{
- return ::read(m_sock, buf, size);
+ if (ec) {
+ delete conn;
+ handler(NULL, "", ec.message());
+ return;
+ }
+ handler(conn, conn->enpoint().address(), "");
}
class TCPProto : public TransportProto
{
public:
- TCPProto();
+ TCPProto(boost::asio::io_service& ios);
virtual ~TCPProto();
- virtual void connect(const std::string& address, uint16_t port);
- virtual ssize_t write(const void* buf, size_t size);
- virtual ssize_t read(void* buf, size_t size);
+ virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0;
+ virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0;
+ typedef boost::asio::ip::tcp protocol;
private:
- int m_sock;
+ ba::io_service& m_ios;
+ protocol::acceptor m_acceptor;
+
+ void m_handleAccept(TCPConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
};
} // namespace SGCP
#include "crypto.h"
#include "unix.h"
-#include "udp.h"
#include "tcp.h"
+#include "ssl.h"
using STG::SGCP::TransportProto;
TransportProto* TransportProto::create(TransportType transport, const std::string& key)
-{
- TransportProto* underlying = create(transport);
- if (key.empty())
- return underlying;
- else
- return new CryptoProto(key, underlying);
-}
-
-TransportProto* TransportProto::create(TransportType transport)
{
switch (transport) {
case UNIX: return new UnixProto;
- case UDP: return new UDPProto;
case TCP: return new TCPProto;
+ case SSL: return new SSLProto(key);
};
return NULL;
}
#include "unix.h"
-#include <cerrno>
-#include <cstring>
+using STG::SGCP::UNIXProto;
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-
-using STG::SGCP::UnixProto;
-
-UnixProto::UnixProto()
- : m_sock(socket(AF_UNIX, SOCK_STREAM, 0))
+UNIXProto::UNIXProto(ba::io_service& ios)
+ : m_ios(ios),
+ m_acceptor(m_ios)
{
}
-UnixProto::~UnixProto()
+UNIXProto::~UNIXProto()
{
close(m_sock);
}
-void UnixProto::connect(const std::string& address, uint16_t /*port*/)
+ConnectionPtr UNIXProto::connect(const std::string& address, uint16_t /*port*/)
{
- sockaddr_un addr;
- addr.sun_family = AF_UNIX;
- size_t max = sizeof(addr.sun_path);
- strncpy(addr.sun_path, address.c_str(), max - 1);
- addr.sun_path[max - 1] = 0; // Just in case.
- if (::connect(m_sock, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) < 0)
- throw Error(strerror(errno));
+ bs::error_code ec;
+ ConnectionPtr conn = boost::make_shared(m_ios);
+ conn.socket().connect(ba::local::stream_protocol::enpoint(address), ec);
+ if (ec)
+ throw Error(ec.message());
+ conn->start();
+ return conn;
}
-ssize_t UnixProto::write(const void* buf, size_t size)
+void UNIXProto::bind(const std::string& address, uint16_t /*port*/, Proto::AcceptHandler handler)
{
- return ::write(m_sock, buf, size);
+ bs::error_code ec;
+ m_acceptor.bind(address, ec);
+ if (ec)
+ throw Error(ec.message());
+
+ UNIXConn* conn = new UNIXConn(m_ios);
+ m_acceptor.async_accept(conn->socket(), conn->endpoint(), boost::bind(&UNIXProto::m_handleAccept, this, conn, handler, boost::_1);
}
-ssize_t UnixProto::read(void* buf, size_t size)
+void UNIXProto::m_handleAccept(UNIXConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
{
- return ::read(m_sock, buf, size);
+ if (ec) {
+ delete conn;
+ handler(NULL, "", ec.message());
+ return;
+ }
+ handler(conn, conn->enpoint().path(), "");
}
namespace SGCP
{
-class UnixProto : public TransportProto
+class UNIXProto : public TransportProto
{
public:
- UnixProto();
- virtual ~UnixProto();
+ UNIXProto(boost::asio::io_service& ios);
+ virtual ~UNIXProto();
- virtual void connect(const std::string& address, uint16_t /*port*/);
- virtual ssize_t write(const void* buf, size_t size);
- virtual ssize_t read(void* buf, size_t size);
+ virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0;
+ virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0;
+ typedef boost::asio::local::stream_protocol protocol;
private:
- int m_sock;
+ ba::io_service& m_ios;
+ protocol::acceptor m_acceptor;
+
+ void m_handleAccept(UNIXConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
};
} // namespace SGCP