]> git.stg.codes - stg.git/blob - stglibs/sgcp.lib/stream_conn.h
Experimental implementation of SGCP using Boost.Asio.
[stg.git] / stglibs / sgcp.lib / stream_conn.h
1 #ifndef __STG_SGCP_STREAM_CONN_H__
2 #define __STG_SGCP_STREAM_CONN_H__
3
4 /*
5  *    This program is free software; you can redistribute it and/or modify
6  *    it under the terms of the GNU General Public License as published by
7  *    the Free Software Foundation; either version 2 of the License, or
8  *    (at your option) any later version.
9  *
10  *    This program is distributed in the hope that it will be useful,
11  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *    GNU General Public License for more details.
14  *
15  *    You should have received a copy of the GNU General Public License
16  *    along with this program; if not, write to the Free Software
17  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
18  */
19
20 /*
21  *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
22  */
23
24 namespace STG
25 {
26 namespace SGCP
27 {
28 namespace Impl
29 {
30
31 template <typename Stream>
32 class StreamConn : public Connection
33 {
34     public:
35         StreamConn(ba::io_service& ios, Dispatcher dispatcher, ErrorHandler errorHandler)
36             : Connection(dispatcher, errorHandler),
37               m_socket(ios)
38         {
39         }
40
41         socket::endpoint_type& endpoint() { return m_endpoint; }
42
43         virtual ba::basic_stream_socket& socket() { return m_socket; }
44
45         virtual void start()
46         {
47             ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
48                      boost::bind(&StreamConn::m_handleReadHeader, this));
49         }
50         virtual void stop() { m_socket.shutdown(socket::shutdown_both); }
51
52         virtual void send(const void* data, size_t size)
53         {
54             Packet* packet = new Packet(Packet::DATA, size));
55             *packet = hton(*packet);
56             boost::array<ba::const_buffer, 2> data = {
57                 ba::buffer(packet, sizeof(*packet)),
58                 ba::buffer(data, size)
59             };
60             ba::write(m_socket, data, boost::bind(&StreamConn::m_handleWrite, this, packet, boost::_1, boost::_2));
61         }
62
63     private:
64         typedef Stream::socket socket;
65         socket m_socket;
66         Packet m_packet;
67
68         void m_handleReadHeader(const boost::system::error_code& ec, size_t size)
69         {
70             if (ec) {
71                 // TODO: Handle errors.
72                 /*if (ec != ba::error::operation_aborted)
73                     m_errorHandler(ec);*/
74                 return;
75             }
76             Packet packet = ntoh(m_packet);
77             Chunk chunk = m_dispatcher(packet.type, packet.size);
78             if (chunk.size == 0) {
79                 // TODO: Discard current data.
80                 ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
81                          boost::bind(&StreamConn::m_handleReadHeader, this));
82                 return;
83             }
84             ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size),
85                      boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2));
86         }
87
88         void m_handleReadData(Packet packet, Chunk chunk, const boost::system::error_code& ec, size_t size)
89         {
90             if (ec) {
91                 // TODO: Handle errors.
92                 /*if (ec != ba::error::operation_aborted)
93                     m_errorHandler(ec);*/
94                 return;
95             }
96             chunk = chunk.continuation(""); // TODO: Handle errors.
97             if (chunk.size == 0) {
98                 // TODO: Discard current data.
99                 ba::read(m_socket, ba::buffer(&m_packet, sizeof(m_packet)),
100                          boost::bind(&StreamConn::m_handleReadHeader, this));
101                 return;
102             }
103             ba::read(m_socket, ba::buffer(chunk.buffer, chunk.size),
104                      boost::bind(&StreamConn::m_handleReadData, this, packet, chunk, boost::_1, boost::_2));
105         }
106
107         void m_handleWrite(Packet* packet, const boost::system::error_code& ec, size_t size)
108         {
109             delete packet;
110             if (ec) {
111                 // TODO: Handle errors.
112                 /*if (ec != ba::error::operation_aborted)
113                     m_errorHandler(ec);*/
114                 return;
115             }
116         }
117 };
118
119 typedef StreamConn<ba::ip::tcp> TCPConn;
120 typedef StreamConn<ba::local::stream_protocol> UNIXConn;
121
122 } // namespace Impl
123 } // namespace SGCP
124 } // namespace STG
125
126 #endif