]> git.stg.codes - stg.git/commitdiff
Experimental implementation of SGCP using Boost.Asio.
authorMaxim Mamontov <faust.madf@gmail.com>
Wed, 22 Jul 2015 18:00:10 +0000 (21:00 +0300)
committerMaxim Mamontov <faust.madf@gmail.com>
Wed, 22 Jul 2015 18:00:10 +0000 (21:00 +0300)
16 files changed:
projects/rlm_stg/stg_client.cpp
projects/rlm_stg/stg_client.h
stglibs/sgcp.lib/include/stg/sgcp_conn.h [new file with mode: 0644]
stglibs/sgcp.lib/include/stg/sgcp_proto.h
stglibs/sgcp.lib/include/stg/sgcp_transport.h
stglibs/sgcp.lib/include/stg/sgcp_types.h
stglibs/sgcp.lib/include/stg/sgcp_utils.h
stglibs/sgcp.lib/packet.cpp [new file with mode: 0644]
stglibs/sgcp.lib/packet.h [new file with mode: 0644]
stglibs/sgcp.lib/proto.cpp
stglibs/sgcp.lib/stream_conn.h [new file with mode: 0644]
stglibs/sgcp.lib/tcp.cpp
stglibs/sgcp.lib/tcp.h
stglibs/sgcp.lib/transport.cpp
stglibs/sgcp.lib/unix.cpp
stglibs/sgcp.lib/unix.h

index 199aca905df76b3c5ba703fa21b35d7f42b907e1..6987976b7d44bf011ec4bb8d406fd538601dcb26 100644 (file)
@@ -22,6 +22,8 @@
 
 #include "stg/common.h"
 
+#include <boost/bind.hpp>
+
 #include <stdexcept>
 
 namespace {
@@ -80,17 +82,19 @@ ChannelConfig::ChannelConfig(std::string addr)
 
 STG_CLIENT::STG_CLIENT(const std::string& address)
     : m_config(address),
-      m_proto(m_config.transport, m_config.key)
+      m_proto(m_config.transport, m_config.key),
+      m_thread(boost::bind(&STG_CLIENT::m_run, this))
 {
-    try {
-        m_proto.connect(m_config.address, m_config.port);
-    } catch (const STG::SGCP::Proto::Error& ex) {
-        throw Error(ex.what());
-    }
 }
 
 STG_CLIENT::~STG_CLIENT()
 {
+    stop();
+}
+
+bool STG_CLIENT::stop()
+{
+    return m_proto.stop();
 }
 
 RESULT STG_CLIENT::request(TYPE type, const std::string& userName, const std::string& password, const PAIRS& pairs)
@@ -110,7 +114,7 @@ STG_CLIENT* STG_CLIENT::get()
 
 bool STG_CLIENT::configure(const std::string& address)
 {
-    if ( stgClient != NULL )
+    if ( stgClient != NULL && stgClient->stop() )
         delete stgClient;
     try {
         stgClient = new STG_CLIENT(address);
@@ -161,3 +165,9 @@ PAIRS STG_CLIENT::m_readPairBlock()
         throw Error(ex.what());
     }
 }
+
+void STG_CLIENT::m_run()
+{
+    m_proto.connect(m_config.address, m_config.port);
+    m_proto.run();
+}
index 6315a0a5dc3d18ee2dda1a7256cf3b104c3ffe6d..d57af7dacfc7c2201d0404750e0d31d89d213a8c 100644 (file)
@@ -25,6 +25,8 @@
 #include "stg/sgcp_types.h" // TransportType
 #include "stg/os_int.h"
 
+#include <boost/thread.hpp>
+
 #include <string>
 #include <vector>
 #include <utility>
@@ -67,6 +69,8 @@ public:
     STG_CLIENT(const std::string& address);
     ~STG_CLIENT();
 
+    bool stop();
+
     static STG_CLIENT* get();
     static bool configure(const std::string& address);
 
@@ -75,10 +79,13 @@ public:
 private:
     ChannelConfig m_config;
     STG::SGCP::Proto m_proto;
+    boost::thread m_thread;
 
     void m_writeHeader(TYPE type, const std::string& userName, const std::string& password);
     void m_writePairBlock(const PAIRS& source);
     PAIRS m_readPairBlock();
+
+    void m_run();
 };
 
 #endif
diff --git a/stglibs/sgcp.lib/include/stg/sgcp_conn.h b/stglibs/sgcp.lib/include/stg/sgcp_conn.h
new file mode 100644 (file)
index 0000000..9deb7c7
--- /dev/null
@@ -0,0 +1,68 @@
+#ifndef __STG_SGCP_CONN_H__
+#define __STG_SGCP_CONN_H__
+
+/*
+ *    This program is free software; you can redistribute it and/or modify
+ *    it under the terms of the GNU General Public License as published by
+ *    the Free Software Foundation; either version 2 of the License, or
+ *    (at your option) any later version.
+ *
+ *    This program is distributed in the hope that it will be useful,
+ *    but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *    GNU General Public License for more details.
+ *
+ *    You should have received a copy of the GNU General Public License
+ *    along with this program; if not, write to the Free Software
+ *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+/*
+ *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
+ */
+
+#include "stg/os_int.h"
+
+#include <boost/asio/basic_stream_socket.hpp>
+#include <boost/function.hpp>
+
+#include <string>
+
+namespace STG
+{
+namespace SGCP
+{
+
+class Connection : public boost::enable_shared_from_this<Connection>
+{
+    public:
+        struct Chunk;
+        typedef boost::function<Chunk (uint16_t /*type*/, uint32_t /*size*/)> Dispatcher;
+        typedef boost::function<Chunk (const std::string& /*error*/)> Continuation;
+        typedef boost::function<void (const std::string& /*error*/)> ErrorHandler;
+        struct Chunk
+        {
+            void* buffer;
+            size_t size;
+            Continuation continuation;
+        };
+
+        Connection(Dispatcher dispatcher, ErrorHandler errorHandler) : m_dispatcher(dispatcher), m_errorHandler(errorHandler) {}
+        virtual ~Connection() {}
+
+        virtual boost::asio::basic_stream_socket& socket() = 0;
+
+        virtual void send(const void* data, size_t size) = 0;
+
+        virtual void start() = 0;
+        virtual void stop() = 0;
+
+    protected:
+        Dispatcher m_dispatcher;
+        ErrorHandler m_errorHandler;
+};
+
+} // namespace SGCP
+} // namespace STG
+
+#endif
index 3cc35b19223e388036b18646d4a1bdb8d3bda1b5..04bc3c8835e73c81afc44ffd92102993cc63ff86 100644 (file)
 
 #include "stg/os_int.h"
 
+#include <boost/asio/basic_stream_socket.hpp>
+#include <boost/function.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/system/error_code.hpp>
+
 #include <string>
 #include <vector>
 #include <stdexcept>
@@ -40,68 +45,35 @@ class TransportProto;
 class Proto
 {
     public:
+        enum { CONTEXT = 0 };
+        enum PacketType {
+            INFO = 0,
+            PING,
+            PONG,
+            DATA
+        };
+
         struct Error : public std::runtime_error
         {
-            Error(const std::string& mesage);
-            Error();
+            Error(const std::string& mesage) : runtime_error(message) {}
         };
 
+        typedef boost::function<void (ConnectionPtr /*conn*/, const std::string& /*enpoint*/, const std::string& /*error*/)> AcceptHandler;
+
         Proto(TransportType transport, const std::string& key);
         ~Proto();
 
-        void connect(const std::string& address, uint16_t port);
-
-        void writeAllBuf(const void* buf, size_t size);
-        void readAllBuf(void* buf, size_t size);
-
-        template <typename T>
-        void writeAll(const T& value);
+        ConnectionPtr connect(const std::string& address, uint16_t port);
+        void bind(const std::string& address, uint16_t port, AcceptHandler handler);
 
-        template <typename T>
-        T readAll();
+        void run();
+        bool stop();
 
     private:
-        TransportProto* m_transport;
+        class Impl;
+        boost::scoped_ptr<Impl> m_impl;
 };
 
-template <>
-inline
-void Proto::writeAll<uint64_t>(const uint64_t& value)
-{
-    uint64_t temp = hton(value);
-    writeAllBuf(&temp, sizeof(temp));
-}
-
-template <>
-inline
-void Proto::writeAll<std::string>(const std::string& value)
-{
-    uint64_t size = hton(value.size());
-    writeAllBuf(&size, sizeof(size));
-    writeAllBuf(value.c_str(), value.size());
-}
-
-template <>
-inline
-uint64_t Proto::readAll<uint64_t>()
-{
-    uint64_t temp = 0;
-    readAllBuf(&temp, sizeof(temp));
-    return ntoh(temp);
-}
-
-template <>
-inline
-std::string Proto::readAll<std::string>()
-{
-    uint64_t size = 0;
-    readAllBuf(&size, sizeof(size));
-    size = ntoh(size);
-    std::vector<char> res(size);
-    readAllBuf(res.data(), res.size());
-    return res.data();
-}
-
 } // namespace SGCP
 } // namespace STG
 
index 88a7d0c4be392db5febe2ad4c34b86cf2efdcfdd..6e24756ab3ffc10ca30ea24a86c783b77225b26d 100644 (file)
@@ -43,13 +43,11 @@ class TransportProto
         };
 
         static TransportProto* create(TransportType transport, const std::string& key);
-        static TransportProto* create(TransportType transport);
 
         virtual ~TransportProto() {}
 
-        virtual void connect(const std::string& address, uint16_t port) = 0;
-        virtual ssize_t write(const void* buf, size_t size) = 0;
-        virtual ssize_t read(void* buf, size_t size) = 0;
+        virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0;
+        virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0;
 };
 
 } // namespace SGCP
index 47920077c42149735e71cf9a2d8a6e83ec0e4f5c..a33be7b2f7672cfe78909f9c0f3d04e49fa7ca87 100644 (file)
@@ -29,8 +29,8 @@ namespace SGCP
 enum TransportType
 {
     UNIX,
-    UDP,
-    TCP
+    TCP,
+    SSL
 };
 
 } // namespace SGCP
index 7e5004c17684424d0f747b831e692b98933a2ceb..e5152671c39b10c00dc89b3dbb3fbda76f4e2cc8 100644 (file)
@@ -27,7 +27,6 @@
 #include <vector>
 
 #include <arpa/inet.h> // hton*
-#include <netinet/in.h> // in_addr
 
 namespace STG
 {
@@ -59,8 +58,6 @@ uint64_t htonll(uint64_t value)
 template <> inline uint64_t hton(uint64_t value) { return htonll(value); }
 template <> inline int64_t hton(int64_t value) { return htonll(value); }
 
-std::vector<in_addr> resolve(const std::string& address);
-
 } // namespace SGCP
 } // namespace STG
 
diff --git a/stglibs/sgcp.lib/packet.cpp b/stglibs/sgcp.lib/packet.cpp
new file mode 100644 (file)
index 0000000..12d8f1e
--- /dev/null
@@ -0,0 +1,28 @@
+#include "packet.h"
+
+#include "stg/sgcp_utils.h"
+
+#include <ctime>
+
+using STG::SGCP::Packet;
+
+uint64_t Packet::MAGIC = 0x5f8edc0fdb6d3113; // Carefully picked random 64-bit number :)
+uint16_t Packet::VERSION = 1;
+
+Packet::Packet(uint16_t ver, uint16_t t, uint16_t sz)
+    : magic(MAGIC),
+      senderTime(time(NULL)),
+      version(ver),
+      type(t),
+      size(s)
+{
+}
+
+Packet hton(Packet value)
+{
+    value.magic = hton(value.magic);
+    value.senderTime = hton(value.senderTime);
+    value.version = hton(value.version);
+    value.type = hton(value.type);
+    value.size = hton(value.size);
+}
diff --git a/stglibs/sgcp.lib/packet.h b/stglibs/sgcp.lib/packet.h
new file mode 100644 (file)
index 0000000..f5a4374
--- /dev/null
@@ -0,0 +1,54 @@
+#ifndef __STG_SGCP_PACKET_H__
+#define __STG_SGCP_PACKET_H__
+
+/*
+ *    This program is free software; you can redistribute it and/or modify
+ *    it under the terms of the GNU General Public License as published by
+ *    the Free Software Foundation; either version 2 of the License, or
+ *    (at your option) any later version.
+ *
+ *    This program is distributed in the hope that it will be useful,
+ *    but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *    GNU General Public License for more details.
+ *
+ *    You should have received a copy of the GNU General Public License
+ *    along with this program; if not, write to the Free Software
+ *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+/*
+ *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
+ */
+
+#include <boost/stdint.hpp>
+
+namespace STG
+{
+namespace SGCP
+{
+
+struct __attribute__ ((__packed__)) Packet
+{
+    Packet(uint16_t type, uint16_t size);
+
+    bool valid() const { return magic == MAGIC; }
+
+    static uint64_t MAGIC;
+    static uint16_t VERSION;
+
+    enum Types { PING, PONG, DATA };
+
+    uint64_t magic;
+    uint64_t senderTime;
+    uint16_t version;
+    uint16_t type;
+    uint32_t size;
+};
+
+Packet hton(Packet value);
+
+} // namespace SGCP
+} // namespace STG
+
+#endif
index 96b6285b05e2de64e76e84bc3cf77ef889e9b46e..8e3b1eb673ceee16abd27a4eba291ee0e6fc43de 100644 (file)
@@ -7,57 +7,95 @@
 
 using STG::SGCP::Proto;
 
-Proto::Error::Error(const std::string& message)
-    : runtime_error(message)
-{}
-
-Proto::Error::Error()
-    : runtime_error(strerror(errno))
-{}
-
 Proto::Proto(TransportType transport, const std::string& key)
-    : m_transport(TransportProto::create(transport, key))
+    : m_impl(new Impl(transport, key))
 {
 }
 
 Proto::~Proto()
 {
-    delete m_transport;
 }
 
-void Proto::connect(const std::string& address, uint16_t port)
+ConnectionPtr Proto::connect(const std::string& address, uint16_t port)
 {
-    try {
-        m_transport->connect(address, port);
-    } catch (const TransportProto::Error& ex) {
-        throw Error(ex.what());
-    }
+    m_impl->connect(adress, port);
 }
 
-void Proto::writeAllBuf(const void* buf, size_t size)
+void Proto::bind(const std::string& address, uint16_t port, AcceptHandler handler)
 {
-    const char* pos = static_cast<const char*>(buf);
-    while (size > 0) {
-        ssize_t res = m_transport->write(pos, size);
-        if (res < 0)
-            throw Error();
-        if (res == 0)
-            return;
-        size -= res;
-        pos += res;
-    }
+    m_impl->bind(address, port, handler);
+}
+
+void Proto::run()
+{
+    m_impl->run();
+}
+
+bool Proto::stop()
+{
+    return m_impl->stop();
+}
+
+class Proto::Impl
+{
+    public:
+        Impl(TransportType transport, const std::string& key);
+        ~Impl();
+
+        Connection& connect(const std::string& address, uint16_t port);
+        void bind(const std::string& address, uint16_t port, AcceptHandler handler);
+
+        void run();
+        bool stop();
+
+    private:
+        ba::io_service m_ios;
+        boost::scoped_ptr<Transport> m_transport;
+        std::vector<ConnectionPtr> m_conns;
+        bool m_running;
+        bool m_stopped;
+};
+
+Proto::Impl::Impl(TransportType transport, const std::string& key)
+    : m_transport(makeTransport(transport, key)),
+      m_running(false),
+      m_stopped(true)
+{
+}
+
+Proto::Impl::~Impl()
+{
+    stop();
+}
+
+ConnectionPtr Proto::Impl::connect(const std::string& address, uint16_t port)
+{
+    return m_transport->connect(address, port);
+}
+
+void Proto::Impl::bind(const std::string& address, uint16_t port, AcceptHandler handler)
+{
+    m_transport->bind(address, port, handler);
+}
+
+void Proto::Impl::run()
+{
+    m_stopped = false;
+    m_running = true;
+    while (m_running)
+        m_ios.run_once();
+    m_stopped = true;
 }
 
-void Proto::readAllBuf(void* buf, size_t size)
+bool Proto::Impl::stop()
 {
-    char* pos = static_cast<char*>(buf);
-    while (size > 0) {
-        ssize_t res = m_transport->read(pos, size);
-        if (res < 0)
-            throw Error();
-        if (res == 0)
-            return;
-        size -= res;
-        pos += res;
+    for (size_t i = 0; i < m_conns.size(); ++i)
+        m_conns[i]->stop();
+    m_ios.stop();
+    for (size_t i = 0; i < 10 && !m_ios.stopped(); ++i) {
+        timspec ts;
+        ts.tv_sec = 0;
+        ts.tv_nsec = 10000000; // 10 msec
     }
+    return m_ios.stopped();
 }
diff --git a/stglibs/sgcp.lib/stream_conn.h b/stglibs/sgcp.lib/stream_conn.h
new file mode 100644 (file)
index 0000000..70ca09b
--- /dev/null
@@ -0,0 +1,126 @@
+#ifndef __STG_SGCP_STREAM_CONN_H__
+#define __STG_SGCP_STREAM_CONN_H__
+
+/*
+ *    This program is free software; you can redistribute it and/or modify
+ *    it under the terms of the GNU General Public License as published by
+ *    the Free Software Foundation; either version 2 of the License, or
+ *    (at your option) any later version.
+ *
+ *    This program is distributed in the hope that it will be useful,
+ *    but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *    GNU General Public License for more details.
+ *
+ *    You should have received a copy of the GNU General Public License
+ *    along with this program; if not, write to the Free Software
+ *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ */
+
+/*
+ *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
+ */
+
+namespace STG
+{
+namespace SGCP
+{
+namespace Impl
+{
+
+template <typename Stream>
+class StreamConn : public Connection
+{
+    public:
+        StreamConn(ba::io_service& ios, Dispatcher dispatcher, ErrorHandler errorHandler)
+            : Connection(dispatcher, errorHandler),
+              m_socket(ios)
+        {
+        }
+
+        socket::endpoint_type& endpoint() { return m_endpoint; }
+
+        virtual ba::basic_stream_socket& socket() { return m_socket; }
+
+        virtual void start()
+        {
+            ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
+                     boost::bind(&StreamConn::m_handleReadHeader, this));
+        }
+        virtual void stop() { m_socket.shutdown(socket::shutdown_both); }
+
+        virtual void send(const void* data, size_t size)
+        {
+            Packet* packet = new Packet(Packet::DATA, size));
+            *packet = hton(*packet);
+            boost::array<ba::const_buffer, 2> data = {
+                ba::buffer(packet, sizeof(*packet)),
+                ba::buffer(data, size)
+            };
+            ba::write(m_socket, data, boost::bind(&StreamConn::m_handleWrite, this, packet, boost::_1, boost::_2));
+        }
+
+    private:
+        typedef Stream::socket socket;
+        socket m_socket;
+        Packet m_packet;
+
+        void m_handleReadHeader(const boost::system::error_code& ec, size_t size)
+        {
+            if (ec) {
+                // TODO: Handle errors.
+                /*if (ec != ba::error::operation_aborted)
+                    m_errorHandler(ec);*/
+                return;
+            }
+            Packet packet = ntoh(m_packet);
+            Chunk chunk = m_dispatcher(packet.type, packet.size);
+            if (chunk.size == 0) {
+                // TODO: Discard current data.
+                ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
+                         boost::bind(&StreamConn::m_handleReadHeader, this));
+                return;
+            }
+            ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size),
+                     boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2));
+        }
+
+        void m_handleReadData(Packet packet, Chunk chunk, const boost::system::error_code& ec, size_t size)
+        {
+            if (ec) {
+                // TODO: Handle errors.
+                /*if (ec != ba::error::operation_aborted)
+                    m_errorHandler(ec);*/
+                return;
+            }
+            chunk = chunk.continuation(""); // TODO: Handle errors.
+            if (chunk.size == 0) {
+                // TODO: Discard current data.
+                ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
+                         boost::bind(&StreamConn::m_handleReadHeader, this));
+                return;
+            }
+            ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size),
+                     boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2));
+        }
+
+        void m_handleWrite(Packet* packet, const boost::system::error_code& ec, size_t size)
+        {
+            delete packet;
+            if (ec) {
+                // TODO: Handle errors.
+                /*if (ec != ba::error::operation_aborted)
+                    m_errorHandler(ec);*/
+                return;
+            }
+        }
+};
+
+typedef StreamConn<ba::ip::tcp> TCPConn;
+typedef StreamConn<ba::local::stream_protocol> UNIXConn;
+
+} // namespace Impl
+} // namespace SGCP
+} // namespace STG
+
+#endif
index 414d5ec84322c37153f0a29ad86087f10b53ef22..5d6c6cbc2b6b21be33f5df3e1ea330a653cb58cb 100644 (file)
@@ -12,8 +12,9 @@
 
 using STG::SGCP::TCPProto;
 
-TCPProto::TCPProto()
-    : m_sock(socket(AF_INET, SOCK_STREAM, 0))
+TCPProto::TCPProto(ba::io_service& ios)
+    : m_ios(ios),
+      m_acceptor(m_ios)
 {
 }
 
@@ -22,31 +23,34 @@ TCPProto::~TCPProto()
     close(m_sock);
 }
 
-void TCPProto::connect(const std::string& address, uint16_t port)
+ConnectionPtr TCPProto::connect(const std::string& address, uint16_t port)
 {
-    std::vector<in_addr> addrs = resolve(address);
-
-    for (size_t i = 0; i < addrs.size(); ++i) {
-        sockaddr_in addr;
-        addr.sin_family = AF_INET;
-        addr.sin_port = hton(port);
-        addr.sin_addr = addrs[i];
-
-        if (::connect(m_sock, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) == 0)
-            return;
-
-        close(m_sock);
-        m_sock = socket(AF_INET, SOCK_STREAM, 0);
-    }
-    throw Error("No more addresses to connect to.");
+    bs::error_code ec;
+    ConnectionPtr conn = boost::make_shared(m_ios);
+    conn.socket().connect(ba::local::stream_protocol::enpoint(address, port), ec);
+    if (ec)
+        throw Error(ec.message());
+    conn->start();
+    return conn;
 }
 
-ssize_t TCPProto::write(const void* buf, size_t size)
+void TCPProto::bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler)
 {
-    return ::write(m_sock, buf, size);
+    bs::error_code ec;
+    m_acceptor.bind(address, ec);
+    if (ec)
+        throw Error(ec.message());
+
+    TCPConn* conn = new TCPConn(m_ios);
+    m_acceptor.async_accept(conn->socket(), conn->endpoint(), boost::bind(&TCPProto::m_handleAccept, this, conn, handler, boost::_1);
 }
 
-ssize_t TCPProto::read(void* buf, size_t size)
+void TCPProto::m_handleAccept(TCPConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
 {
-    return ::read(m_sock, buf, size);
+    if (ec) {
+        delete conn;
+        handler(NULL, "", ec.message());
+        return;
+    }
+    handler(conn, conn->enpoint().address(), "");
 }
index 8afa0b5e95b947247605dd1c1bcf90d2596e07b2..55de821b2177dc40721647339f7fa70eba3d6ba4 100644 (file)
@@ -37,15 +37,18 @@ namespace SGCP
 class TCPProto : public TransportProto
 {
     public:
-        TCPProto();
+        TCPProto(boost::asio::io_service& ios);
         virtual ~TCPProto();
 
-        virtual void connect(const std::string& address, uint16_t port);
-        virtual ssize_t write(const void* buf, size_t size);
-        virtual ssize_t read(void* buf, size_t size);
+        virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0;
+        virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0;
 
+        typedef boost::asio::ip::tcp protocol;
     private:
-        int m_sock;
+        ba::io_service& m_ios;
+        protocol::acceptor m_acceptor;
+
+        void m_handleAccept(TCPConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
 };
 
 } // namespace SGCP
index 1e94fa58da5ac5b91a6a3769931c12a3bca336dd..b67cea6a1bf8f347c9dcd72a1db8ed0e65ebf907 100644 (file)
@@ -2,26 +2,17 @@
 
 #include "crypto.h"
 #include "unix.h"
-#include "udp.h"
 #include "tcp.h"
+#include "ssl.h"
 
 using STG::SGCP::TransportProto;
 
 TransportProto* TransportProto::create(TransportType transport, const std::string& key)
-{
-    TransportProto* underlying = create(transport);
-    if (key.empty())
-        return underlying;
-    else
-        return new CryptoProto(key, underlying);
-}
-
-TransportProto* TransportProto::create(TransportType transport)
 {
     switch (transport) {
         case UNIX: return new UnixProto;
-        case UDP: return new UDPProto;
         case TCP: return new TCPProto;
+        case SSL: return new SSLProto(key);
     };
     return NULL;
 }
index 15002e5ef2361f79ff0d914b96ce9573c26ee79f..02f7f5244ee001718f698a20eac3320a635c786f 100644 (file)
@@ -1,41 +1,46 @@
 #include "unix.h"
 
-#include <cerrno>
-#include <cstring>
+using STG::SGCP::UNIXProto;
 
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-
-using STG::SGCP::UnixProto;
-
-UnixProto::UnixProto()
-    : m_sock(socket(AF_UNIX, SOCK_STREAM, 0))
+UNIXProto::UNIXProto(ba::io_service& ios)
+    : m_ios(ios),
+      m_acceptor(m_ios)
 {
 }
 
-UnixProto::~UnixProto()
+UNIXProto::~UNIXProto()
 {
     close(m_sock);
 }
 
-void UnixProto::connect(const std::string& address, uint16_t /*port*/)
+ConnectionPtr UNIXProto::connect(const std::string& address, uint16_t /*port*/)
 {
-    sockaddr_un addr;
-    addr.sun_family = AF_UNIX;
-    size_t max = sizeof(addr.sun_path);
-    strncpy(addr.sun_path, address.c_str(), max - 1);
-    addr.sun_path[max - 1] = 0; // Just in case.
-    if (::connect(m_sock, reinterpret_cast<const sockaddr*>(&addr), sizeof(addr)) < 0)
-        throw Error(strerror(errno));
+    bs::error_code ec;
+    ConnectionPtr conn = boost::make_shared(m_ios);
+    conn.socket().connect(ba::local::stream_protocol::enpoint(address), ec);
+    if (ec)
+        throw Error(ec.message());
+    conn->start();
+    return conn;
 }
 
-ssize_t UnixProto::write(const void* buf, size_t size)
+void UNIXProto::bind(const std::string& address, uint16_t /*port*/, Proto::AcceptHandler handler)
 {
-    return ::write(m_sock, buf, size);
+    bs::error_code ec;
+    m_acceptor.bind(address, ec);
+    if (ec)
+        throw Error(ec.message());
+
+    UNIXConn* conn = new UNIXConn(m_ios);
+    m_acceptor.async_accept(conn->socket(), conn->endpoint(), boost::bind(&UNIXProto::m_handleAccept, this, conn, handler, boost::_1);
 }
 
-ssize_t UnixProto::read(void* buf, size_t size)
+void UNIXProto::m_handleAccept(UNIXConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
 {
-    return ::read(m_sock, buf, size);
+    if (ec) {
+        delete conn;
+        handler(NULL, "", ec.message());
+        return;
+    }
+    handler(conn, conn->enpoint().path(), "");
 }
index e5b645f2d78ab06b26ecaf17de2ad63653b7a907..6735f6dc4469dcffbda8098b83269777937218fd 100644 (file)
@@ -34,18 +34,21 @@ namespace STG
 namespace SGCP
 {
 
-class UnixProto : public TransportProto
+class UNIXProto : public TransportProto
 {
     public:
-        UnixProto();
-        virtual ~UnixProto();
+        UNIXProto(boost::asio::io_service& ios);
+        virtual ~UNIXProto();
 
-        virtual void connect(const std::string& address, uint16_t /*port*/);
-        virtual ssize_t write(const void* buf, size_t size);
-        virtual ssize_t read(void* buf, size_t size);
+        virtual ConnectionPtr connect(const std::string& address, uint16_t port) = 0;
+        virtual void bind(const std::string& address, uint16_t port, Proto::AcceptHandler handler) = 0;
 
+        typedef boost::asio::local::stream_protocol protocol;
     private:
-        int m_sock;
+        ba::io_service& m_ios;
+        protocol::acceptor m_acceptor;
+
+        void m_handleAccept(UNIXConn* conn, Proto::AcceptHandler handler, const boost::system::error_code& ec)
 };
 
 } // namespace SGCP