From: Maxim Mamontov <faust.madf@gmail.com> Date: Wed, 22 Jul 2015 18:00:10 +0000 (+0300) Subject: Experimental implementation of SGCP using Boost.Asio. X-Git-Url: https://git.stg.codes/stg.git/commitdiff_plain/20072b367cf034ab9124560e4a06d8e32a388d93 Experimental implementation of SGCP using Boost.Asio. --- diff --git a/projects/rlm_stg/stg_client.cpp b/projects/rlm_stg/stg_client.cpp index 199aca90..6987976b 100644 --- a/projects/rlm_stg/stg_client.cpp +++ b/projects/rlm_stg/stg_client.cpp @@ -22,6 +22,8 @@ #include "stg/common.h" +#include <boost/bind.hpp> + #include <stdexcept> namespace { @@ -80,17 +82,19 @@ ChannelConfig::ChannelConfig(std::string addr) STG_CLIENT::STG_CLIENT(const std::string& address) : m_config(address), - m_proto(m_config.transport, m_config.key) + m_proto(m_config.transport, m_config.key), + m_thread(boost::bind(&STG_CLIENT::m_run, this)) { - try { - m_proto.connect(m_config.address, m_config.port); - } catch (const STG::SGCP::Proto::Error& ex) { - throw Error(ex.what()); - } } STG_CLIENT::~STG_CLIENT() { + stop(); +} + +bool STG_CLIENT::stop() +{ + return m_proto.stop(); } RESULT STG_CLIENT::request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs) @@ -110,7 +114,7 @@ STG_CLIENT* STG_CLIENT::get() bool STG_CLIENT::configure(const std::string& address) { - if ( stgClient != NULL ) + if ( stgClient != NULL && stgClient->stop() ) delete stgClient; try { stgClient = new STG_CLIENT(address); @@ -161,3 +165,9 @@ PAIRS STG_CLIENT::m_readPairBlock() throw Error(ex.what()); } } + +void STG_CLIENT::m_run() +{ + m_proto.connect(m_config.address, m_config.port); + m_proto.run(); +} diff --git a/projects/rlm_stg/stg_client.h b/projects/rlm_stg/stg_client.h index 6315a0a5..d57af7da 100644 --- a/projects/rlm_stg/stg_client.h +++ b/projects/rlm_stg/stg_client.h @@ -25,6 +25,8 @@ #include "stg/sgcp_types.h" // TransportType #include "stg/os_int.h" +#include <boost/thread.hpp> + #include <string> #include <vector> #include <utility> @@ -67,6 +69,8 @@ public: STG_CLIENT(const std::string& address); ~STG_CLIENT(); + bool stop(); + static STG_CLIENT* get(); static bool configure(const std::string& address); @@ -75,10 +79,13 @@ public: private: ChannelConfig m_config; STG::SGCP::Proto m_proto; + boost::thread m_thread; void m_writeHeader(TYPE type, const std::string& userName, const std::string& password); void m_writePairBlock(const PAIRS& source); PAIRS m_readPairBlock(); + + void m_run(); }; #endif diff --git a/stglibs/sgcp.lib/include/stg/sgcp_conn.h b/stglibs/sgcp.lib/include/stg/sgcp_conn.h new file mode 100644 index 00000000..9deb7c7a --- /dev/null +++ b/stglibs/sgcp.lib/include/stg/sgcp_conn.h @@ -0,0 +1,68 @@ +#ifndef __STG_SGCP_CONN_H__ +#define __STG_SGCP_CONN_H__ + +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/* + * Author : Maxim Mamontov <faust@stargazer.dp.ua> + */ + +#include "stg/os_int.h" + +#include <boost/asio/basic_stream_socket.hpp> +#include <boost/function.hpp> + +#include <string> + +namespace STG +{ +namespace SGCP +{ + +class Connection : public boost::enable_shared_from_this<Connection> +{ + public: + struct Chunk; + typedef boost::function<Chunk (uint16_t /*type*/, uint32_t /*size*/)> Dispatcher; + typedef boost::function<Chunk (const std::string& /*error*/)> Continuation; + typedef boost::function<void (const std::string& /*error*/)> ErrorHandler; + struct Chunk + { + void* buffer; + size_t size; + Continuation continuation; + }; + + Connection(Dispatcher dispatcher, ErrorHandler errorHandler) : m_dispatcher(dispatcher), m_errorHandler(errorHandler) {} + virtual ~Connection() {} + + virtual boost::asio::basic_stream_socket& socket() = 0; + + virtual void send(const void* data, size_t size) = 0; + + virtual void start() = 0; + virtual void stop() = 0; + + protected: + Dispatcher m_dispatcher; + ErrorHandler m_errorHandler; +}; + +} // namespace SGCP +} // namespace STG + +#endif diff --git a/stglibs/sgcp.lib/include/stg/sgcp_proto.h b/stglibs/sgcp.lib/include/stg/sgcp_proto.h index 3cc35b19..04bc3c88 100644 --- a/stglibs/sgcp.lib/include/stg/sgcp_proto.h +++ b/stglibs/sgcp.lib/include/stg/sgcp_proto.h @@ -26,6 +26,11 @@ #include "stg/os_int.h" +#include <boost/asio/basic_stream_socket.hpp> +#include <boost/function.hpp> +#include <boost/scoped_ptr.hpp> +#include <boost/system/error_code.hpp> + #include <string> #include <vector> #include <stdexcept> @@ -40,68 +45,35 @@ class TransportProto; class Proto { public: + enum { CONTEXT = 0 }; + enum PacketType { + INFO = 0, + PING, + PONG, + DATA + }; + struct Error : public std::runtime_error { - Error(const std::string& mesage); - Error(); + Error(const std::string& mesage) : runtime_error(message) {} }; + typedef boost::function<void (ConnectionPtr /*conn*/, const std::string& /*enpoint*/, const std::string& /*error*/)> AcceptHandler; + Proto(TransportType transport, const std::string& key); ~Proto(); - void connect(const std::string& address, uint16_t port); - - void writeAllBuf(const void* buf, size_t size); - void readAllBuf(void* buf, size_t size); - - template <typename T> - void writeAll(const T& value); + ConnectionPtr connect(const std::string& address, uint16_t port); + void bind(const std::string& address, uint16_t port, AcceptHandler handler); - template <typename T> - T readAll(); + void run(); + bool stop(); private: - TransportProto* m_transport; + class Impl; + boost::scoped_ptr<Impl> m_impl; }; -template <> -inline -void Proto::writeAll<uint64_t>(const uint64_t& value) -{ - uint64_t temp = hton(value); - writeAllBuf(&temp, sizeof(temp)); -} - -template <> -inline -void Proto::writeAll<std::string>(const std::string& value) -{ - uint64_t size = hton(value.size()); - writeAllBuf(&size, sizeof(size)); - writeAllBuf(value.c_str(), value.size()); -} - -template <> -inline -uint64_t Proto::readAll<uint64_t>() -{ - uint64_t temp = 0; - readAllBuf(&temp, sizeof(temp)); - return ntoh(temp); -} - -template <> -inline -std::string Proto::readAll<std::string>() -{ - uint64_t size = 0; - readAllBuf(&size, sizeof(size)); - size = ntoh(size); - std::vector<char> res(size); - readAllBuf(res.data(), res.size()); - return res.data(); -} - } // namespace SGCP } // namespace STG diff --git a/stglibs/sgcp.lib/include/stg/sgcp_transport.h b/stglibs/sgcp.lib/include/stg/sgcp_transport.h index 88a7d0c4..6e24756a 100644 --- a/stglibs/sgcp.lib/include/stg/sgcp_transport.h +++ b/stglibs/sgcp.lib/include/stg/sgcp_transport.h @@ -43,13 +43,11 @@ class TransportProto }; static TransportProto* create(TransportType transport, const std::string& key); - static TransportProto* create(TransportType transport); virtual ~TransportProto() {} - virtual void connect(const std::string& address, uint16_t port) = 0; - virtual ssize_t write(const void* buf, size_t size) = 0; - virtual ssize_t read(void* buf, size_t size) = 0; + virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0; + virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0; }; } // namespace SGCP diff --git a/stglibs/sgcp.lib/include/stg/sgcp_types.h b/stglibs/sgcp.lib/include/stg/sgcp_types.h index 47920077..a33be7b2 100644 --- a/stglibs/sgcp.lib/include/stg/sgcp_types.h +++ b/stglibs/sgcp.lib/include/stg/sgcp_types.h @@ -29,8 +29,8 @@ namespace SGCP enum TransportType { UNIX, - UDP, - TCP + TCP, + SSL }; } // namespace SGCP diff --git a/stglibs/sgcp.lib/include/stg/sgcp_utils.h b/stglibs/sgcp.lib/include/stg/sgcp_utils.h index 7e5004c1..e5152671 100644 --- a/stglibs/sgcp.lib/include/stg/sgcp_utils.h +++ b/stglibs/sgcp.lib/include/stg/sgcp_utils.h @@ -27,7 +27,6 @@ #include <vector> #include <arpa/inet.h> // hton* -#include <netinet/in.h> // in_addr namespace STG { @@ -59,8 +58,6 @@ uint64_t htonll(uint64_t value) template <> inline uint64_t hton(uint64_t value) { return htonll(value); } template <> inline int64_t hton(int64_t value) { return htonll(value); } -std::vector<in_addr> resolve(const std::string& address); - } // namespace SGCP } // namespace STG diff --git a/stglibs/sgcp.lib/packet.cpp b/stglibs/sgcp.lib/packet.cpp new file mode 100644 index 00000000..12d8f1e9 --- /dev/null +++ b/stglibs/sgcp.lib/packet.cpp @@ -0,0 +1,28 @@ +#include "packet.h" + +#include "stg/sgcp_utils.h" + +#include <ctime> + +using STG::SGCP::Packet; + +uint64_t Packet::MAGIC = 0x5f8edc0fdb6d3113; // Carefully picked random 64-bit number :) +uint16_t Packet::VERSION = 1; + +Packet::Packet(uint16_t ver, uint16_t t, uint16_t sz) + : magic(MAGIC), + senderTime(time(NULL)), + version(ver), + type(t), + size(s) +{ +} + +Packet hton(Packet value) +{ + value.magic = hton(value.magic); + value.senderTime = hton(value.senderTime); + value.version = hton(value.version); + value.type = hton(value.type); + value.size = hton(value.size); +} diff --git a/stglibs/sgcp.lib/packet.h b/stglibs/sgcp.lib/packet.h new file mode 100644 index 00000000..f5a4374e --- /dev/null +++ b/stglibs/sgcp.lib/packet.h @@ -0,0 +1,54 @@ +#ifndef __STG_SGCP_PACKET_H__ +#define __STG_SGCP_PACKET_H__ + +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/* + * Author : Maxim Mamontov <faust@stargazer.dp.ua> + */ + +#include <boost/stdint.hpp> + +namespace STG +{ +namespace SGCP +{ + +struct __attribute__ ((__packed__)) Packet +{ + Packet(uint16_t type, uint16_t size); + + bool valid() const { return magic == MAGIC; } + + static uint64_t MAGIC; + static uint16_t VERSION; + + enum Types { PING, PONG, DATA }; + + uint64_t magic; + uint64_t senderTime; + uint16_t version; + uint16_t type; + uint32_t size; +}; + +Packet hton(Packet value); + +} // namespace SGCP +} // namespace STG + +#endif diff --git a/stglibs/sgcp.lib/proto.cpp b/stglibs/sgcp.lib/proto.cpp index 96b6285b..8e3b1eb6 100644 --- a/stglibs/sgcp.lib/proto.cpp +++ b/stglibs/sgcp.lib/proto.cpp @@ -7,57 +7,95 @@ using STG::SGCP::Proto; -Proto::Error::Error(const std::string& message) - : runtime_error(message) -{} - -Proto::Error::Error() - : runtime_error(strerror(errno)) -{} - Proto::Proto(TransportType transport, const std::string& key) - : m_transport(TransportProto::create(transport, key)) + : m_impl(new Impl(transport, key)) { } Proto::~Proto() { - delete m_transport; } -void Proto::connect(const std::string& address, uint16_t port) +ConnectionPtr Proto::connect(const std::string& address, uint16_t port) { - try { - m_transport->connect(address, port); - } catch (const TransportProto::Error& ex) { - throw Error(ex.what()); - } + m_impl->connect(adress, port); } -void Proto::writeAllBuf(const void* buf, size_t size) +void Proto::bind(const std::string& address, uint16_t port, AcceptHandler handler) { - const char* pos = static_cast<const char*>(buf); - while (size > 0) { - ssize_t res = m_transport->write(pos, size); - if (res < 0) - throw Error(); - if (res == 0) - return; - size -= res; - pos += res; - } + m_impl->bind(address, port, handler); +} + +void Proto::run() +{ + m_impl->run(); +} + +bool Proto::stop() +{ + return m_impl->stop(); +} + +class Proto::Impl +{ + public: + Impl(TransportType transport, const std::string& key); + ~Impl(); + + Connection& connect(const std::string& address, uint16_t port); + void bind(const std::string& address, uint16_t port, AcceptHandler handler); + + void run(); + bool stop(); + + private: + ba::io_service m_ios; + boost::scoped_ptr<Transport> m_transport; + std::vector<ConnectionPtr> m_conns; + bool m_running; + bool m_stopped; +}; + +Proto::Impl::Impl(TransportType transport, const std::string& key) + : m_transport(makeTransport(transport, key)), + m_running(false), + m_stopped(true) +{ +} + +Proto::Impl::~Impl() +{ + stop(); +} + +ConnectionPtr Proto::Impl::connect(const std::string& address, uint16_t port) +{ + return m_transport->connect(address, port); +} + +void Proto::Impl::bind(const std::string& address, uint16_t port, AcceptHandler handler) +{ + m_transport->bind(address, port, handler); +} + +void Proto::Impl::run() +{ + m_stopped = false; + m_running = true; + while (m_running) + m_ios.run_once(); + m_stopped = true; } -void Proto::readAllBuf(void* buf, size_t size) +bool Proto::Impl::stop() { - char* pos = static_cast<char*>(buf); - while (size > 0) { - ssize_t res = m_transport->read(pos, size); - if (res < 0) - throw Error(); - if (res == 0) - return; - size -= res; - pos += res; + for (size_t i = 0; i < m_conns.size(); ++i) + m_conns[i]->stop(); + m_ios.stop(); + for (size_t i = 0; i < 10 && !m_ios.stopped(); ++i) { + timspec ts; + ts.tv_sec = 0; + ts.tv_nsec = 10000000; // 10 msec } + return m_ios.stopped(); } diff --git a/stglibs/sgcp.lib/stream_conn.h b/stglibs/sgcp.lib/stream_conn.h new file mode 100644 index 00000000..70ca09bc --- /dev/null +++ b/stglibs/sgcp.lib/stream_conn.h @@ -0,0 +1,126 @@ +#ifndef __STG_SGCP_STREAM_CONN_H__ +#define __STG_SGCP_STREAM_CONN_H__ + +/* + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + */ + +/* + * Author : Maxim Mamontov <faust@stargazer.dp.ua> + */ + +namespace STG +{ +namespace SGCP +{ +namespace Impl +{ + +template <typename Stream> +class StreamConn : public Connection +{ + public: + StreamConn(ba::io_service& ios, Dispatcher dispatcher, ErrorHandler errorHandler) + : Connection(dispatcher, errorHandler), + m_socket(ios) + { + } + + socket::endpoint_type& endpoint() { return m_endpoint; } + + virtual ba::basic_stream_socket& socket() { return m_socket; } + + virtual void start() + { + ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)), + boost::bind(&StreamConn::m_handleReadHeader, this)); + } + virtual void stop() { m_socket.shutdown(socket::shutdown_both); } + + virtual void send(const void* data, size_t size) + { + Packet* packet = new Packet(Packet::DATA, size)); + *packet = hton(*packet); + boost::array<ba::const_buffer, 2> data = { + ba::buffer(packet, sizeof(*packet)), + ba::buffer(data, size) + }; + ba::write(m_socket, data, boost::bind(&StreamConn::m_handleWrite, this, packet, boost::_1, boost::_2)); + } + + private: + typedef Stream::socket socket; + socket m_socket; + Packet m_packet; + + void m_handleReadHeader(const boost::system::error_code& ec, size_t size) + { + if (ec) { + // TODO: Handle errors. + /*if (ec != ba::error::operation_aborted) + m_errorHandler(ec);*/ + return; + } + Packet packet = ntoh(m_packet); + Chunk chunk = m_dispatcher(packet.type, packet.size); + if (chunk.size == 0) { + // TODO: Discard current data. + ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)), + boost::bind(&StreamConn::m_handleReadHeader, this)); + return; + } + ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size), + boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2)); + } + + void m_handleReadData(Packet packet, Chunk chunk, const boost::system::error_code& ec, size_t size) + { + if (ec) { + // TODO: Handle errors. + /*if (ec != ba::error::operation_aborted) + m_errorHandler(ec);*/ + return; + } + chunk = chunk.continuation(""); // TODO: Handle errors. + if (chunk.size == 0) { + // TODO: Discard current data. + ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)), + boost::bind(&StreamConn::m_handleReadHeader, this)); + return; + } + ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size), + boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2)); + } + + void m_handleWrite(Packet* packet, const boost::system::error_code& ec, size_t size) + { + delete packet; + if (ec) { + // TODO: Handle errors. + /*if (ec != ba::error::operation_aborted) + m_errorHandler(ec);*/ + return; + } + } +}; + +typedef StreamConn<ba::ip::tcp> TCPConn; +typedef StreamConn<ba::local::stream_protocol> UNIXConn; + +} // namespace Impl +} // namespace SGCP +} // namespace STG + +#endif diff --git a/stglibs/sgcp.lib/tcp.cpp b/stglibs/sgcp.lib/tcp.cpp index 414d5ec8..5d6c6cbc 100644 --- a/stglibs/sgcp.lib/tcp.cpp +++ b/stglibs/sgcp.lib/tcp.cpp @@ -12,8 +12,9 @@ using STG::SGCP::TCPProto; -TCPProto::TCPProto() - : m_sock(socket(AF_INET, SOCK_STREAM, 0)) +TCPProto::TCPProto(ba::io_service& ios) + : m_ios(ios), + m_acceptor(m_ios) { } @@ -22,31 +23,34 @@ TCPProto::~TCPProto() close(m_sock); } -void TCPProto::connect(const std::string& address, uint16_t port) +ConnectionPtr TCPProto::connect(const std::string& address, uint16_t port) { - std::vector<in_addr> addrs = resolve(address); - - for (size_t i = 0; i < addrs.size(); ++i) { - sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = hton(port); - addr.sin_addr = addrs[i]; - - if (::connect(m_sock, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == 0) - return; - - close(m_sock); - m_sock = socket(AF_INET, SOCK_STREAM, 0); - } - throw Error("No more addresses to connect to."); + bs::error_code ec; + ConnectionPtr conn = boost::make_shared(m_ios); + conn.socket().connect(ba::local::stream_protocol::enpoint(address, port), ec); + if (ec) + throw Error(ec.message()); + conn->start(); + return conn; } -ssize_t TCPProto::write(const void* buf, size_t size) +void TCPProto::bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) { - return ::write(m_sock, buf, size); + bs::error_code ec; + m_acceptor.bind(address, ec); + if (ec) + throw Error(ec.message()); + + TCPConn* conn = new TCPConn(m_ios); + m_acceptor.async_accept(conn->socket(), conn->endpoint(), boost::bind(&TCPProto::m_handleAccept, this, conn, handler, boost::_1); } -ssize_t TCPProto::read(void* buf, size_t size) +void TCPProto::m_handleAccept(TCPConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec) { - return ::read(m_sock, buf, size); + if (ec) { + delete conn; + handler(NULL, "", ec.message()); + return; + } + handler(conn, conn->enpoint().address(), ""); } diff --git a/stglibs/sgcp.lib/tcp.h b/stglibs/sgcp.lib/tcp.h index 8afa0b5e..55de821b 100644 --- a/stglibs/sgcp.lib/tcp.h +++ b/stglibs/sgcp.lib/tcp.h @@ -37,15 +37,18 @@ namespace SGCP class TCPProto : public TransportProto { public: - TCPProto(); + TCPProto(boost::asio::io_service& ios); virtual ~TCPProto(); - virtual void connect(const std::string& address, uint16_t port); - virtual ssize_t write(const void* buf, size_t size); - virtual ssize_t read(void* buf, size_t size); + virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0; + virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0; + typedef boost::asio::ip::tcp protocol; private: - int m_sock; + ba::io_service& m_ios; + protocol::acceptor m_acceptor; + + void m_handleAccept(TCPConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec) }; } // namespace SGCP diff --git a/stglibs/sgcp.lib/transport.cpp b/stglibs/sgcp.lib/transport.cpp index 1e94fa58..b67cea6a 100644 --- a/stglibs/sgcp.lib/transport.cpp +++ b/stglibs/sgcp.lib/transport.cpp @@ -2,26 +2,17 @@ #include "crypto.h" #include "unix.h" -#include "udp.h" #include "tcp.h" +#include "ssl.h" using STG::SGCP::TransportProto; TransportProto* TransportProto::create(TransportType transport, const std::string& key) -{ - TransportProto* underlying = create(transport); - if (key.empty()) - return underlying; - else - return new CryptoProto(key, underlying); -} - -TransportProto* TransportProto::create(TransportType transport) { switch (transport) { case UNIX: return new UnixProto; - case UDP: return new UDPProto; case TCP: return new TCPProto; + case SSL: return new SSLProto(key); }; return NULL; } diff --git a/stglibs/sgcp.lib/unix.cpp b/stglibs/sgcp.lib/unix.cpp index 15002e5e..02f7f524 100644 --- a/stglibs/sgcp.lib/unix.cpp +++ b/stglibs/sgcp.lib/unix.cpp @@ -1,41 +1,46 @@ #include "unix.h" -#include <cerrno> -#include <cstring> +using STG::SGCP::UNIXProto; -#include <sys/types.h> -#include <sys/socket.h> -#include <sys/un.h> - -using STG::SGCP::UnixProto; - -UnixProto::UnixProto() - : m_sock(socket(AF_UNIX, SOCK_STREAM, 0)) +UNIXProto::UNIXProto(ba::io_service& ios) + : m_ios(ios), + m_acceptor(m_ios) { } -UnixProto::~UnixProto() +UNIXProto::~UNIXProto() { close(m_sock); } -void UnixProto::connect(const std::string& address, uint16_t /*port*/) +ConnectionPtr UNIXProto::connect(const std::string& address, uint16_t /*port*/) { - sockaddr_un addr; - addr.sun_family = AF_UNIX; - size_t max = sizeof(addr.sun_path); - strncpy(addr.sun_path, address.c_str(), max - 1); - addr.sun_path[max - 1] = 0; // Just in case. - if (::connect(m_sock, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) < 0) - throw Error(strerror(errno)); + bs::error_code ec; + ConnectionPtr conn = boost::make_shared(m_ios); + conn.socket().connect(ba::local::stream_protocol::enpoint(address), ec); + if (ec) + throw Error(ec.message()); + conn->start(); + return conn; } -ssize_t UnixProto::write(const void* buf, size_t size) +void UNIXProto::bind(const std::string& address, uint16_t /*port*/, Proto::AcceptHandler handler) { - return ::write(m_sock, buf, size); + bs::error_code ec; + m_acceptor.bind(address, ec); + if (ec) + throw Error(ec.message()); + + UNIXConn* conn = new UNIXConn(m_ios); + m_acceptor.async_accept(conn->socket(), conn->endpoint(), boost::bind(&UNIXProto::m_handleAccept, this, conn, handler, boost::_1); } -ssize_t UnixProto::read(void* buf, size_t size) +void UNIXProto::m_handleAccept(UNIXConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec) { - return ::read(m_sock, buf, size); + if (ec) { + delete conn; + handler(NULL, "", ec.message()); + return; + } + handler(conn, conn->enpoint().path(), ""); } diff --git a/stglibs/sgcp.lib/unix.h b/stglibs/sgcp.lib/unix.h index e5b645f2..6735f6dc 100644 --- a/stglibs/sgcp.lib/unix.h +++ b/stglibs/sgcp.lib/unix.h @@ -34,18 +34,21 @@ namespace STG namespace SGCP { -class UnixProto : public TransportProto +class UNIXProto : public TransportProto { public: - UnixProto(); - virtual ~UnixProto(); + UNIXProto(boost::asio::io_service& ios); + virtual ~UNIXProto(); - virtual void connect(const std::string& address, uint16_t /*port*/); - virtual ssize_t write(const void* buf, size_t size); - virtual ssize_t read(void* buf, size_t size); + virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0; + virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0; + typedef boost::asio::local::stream_protocol protocol; private: - int m_sock; + ba::io_service& m_ios; + protocol::acceptor m_acceptor; + + void m_handleAccept(UNIXConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec) }; } // namespace SGCP