X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/a8690e044055da20e4cf7d7d7e51d182b5e09154..21fb8d8eb93ddba5eb08976d013d3f956bdd19d6:/stglibs/sgcp.lib/proto.cpp diff --git a/stglibs/sgcp.lib/proto.cpp b/stglibs/sgcp.lib/proto.cpp index 96b6285b..8e3b1eb6 100644 --- a/stglibs/sgcp.lib/proto.cpp +++ b/stglibs/sgcp.lib/proto.cpp @@ -7,57 +7,95 @@ using STG::SGCP::Proto; -Proto::Error::Error(const std::string& message) - : runtime_error(message) -{} - -Proto::Error::Error() - : runtime_error(strerror(errno)) -{} - Proto::Proto(TransportType transport, const std::string& key) - : m_transport(TransportProto::create(transport, key)) + : m_impl(new Impl(transport, key)) { } Proto::~Proto() { - delete m_transport; } -void Proto::connect(const std::string& address, uint16_t port) +ConnectionPtr Proto::connect(const std::string& address, uint16_t port) { - try { - m_transport->connect(address, port); - } catch (const TransportProto::Error& ex) { - throw Error(ex.what()); - } + m_impl->connect(adress, port); } -void Proto::writeAllBuf(const void* buf, size_t size) +void Proto::bind(const std::string& address, uint16_t port, AcceptHandler handler) { - const char* pos = static_cast(buf); - while (size > 0) { - ssize_t res = m_transport->write(pos, size); - if (res < 0) - throw Error(); - if (res == 0) - return; - size -= res; - pos += res; - } + m_impl->bind(address, port, handler); +} + +void Proto::run() +{ + m_impl->run(); +} + +bool Proto::stop() +{ + return m_impl->stop(); +} + +class Proto::Impl +{ + public: + Impl(TransportType transport, const std::string& key); + ~Impl(); + + Connection& connect(const std::string& address, uint16_t port); + void bind(const std::string& address, uint16_t port, AcceptHandler handler); + + void run(); + bool stop(); + + private: + ba::io_service m_ios; + boost::scoped_ptr m_transport; + std::vector m_conns; + bool m_running; + bool m_stopped; +}; + +Proto::Impl::Impl(TransportType transport, const std::string& key) + : m_transport(makeTransport(transport, key)), + m_running(false), + m_stopped(true) +{ +} + +Proto::Impl::~Impl() +{ + stop(); +} + +ConnectionPtr Proto::Impl::connect(const std::string& address, uint16_t port) +{ + return m_transport->connect(address, port); +} + +void Proto::Impl::bind(const std::string& address, uint16_t port, AcceptHandler handler) +{ + m_transport->bind(address, port, handler); +} + +void Proto::Impl::run() +{ + m_stopped = false; + m_running = true; + while (m_running) + m_ios.run_once(); + m_stopped = true; } -void Proto::readAllBuf(void* buf, size_t size) +bool Proto::Impl::stop() { - char* pos = static_cast(buf); - while (size > 0) { - ssize_t res = m_transport->read(pos, size); - if (res < 0) - throw Error(); - if (res == 0) - return; - size -= res; - pos += res; + for (size_t i = 0; i < m_conns.size(); ++i) + m_conns[i]->stop(); + m_ios.stop(); + for (size_t i = 0; i < 10 && !m_ios.stopped(); ++i) { + timspec ts; + ts.tv_sec = 0; + ts.tv_nsec = 10000000; // 10 msec } + return m_ios.stopped(); }