From 646c8fd6c0112573ba2aae7f165f5d48e849831e Mon Sep 17 00:00:00 2001 From: Maksym Mamontov Date: Tue, 23 Aug 2022 16:55:28 +0300 Subject: [PATCH] Add async pool (to replace EVENT_LOOP). --- projects/stargazer/CMakeLists.txt | 1 + projects/stargazer/async_pool.cpp | 42 +++++++++++++++++++++ projects/stargazer/async_pool.h | 62 +++++++++++++++++++++++++++++++ projects/stargazer/main.cpp | 3 ++ tests/CMakeLists.txt | 5 +++ tests/test_async_pool.cpp | 54 +++++++++++++++++++++++++++ 6 files changed, 167 insertions(+) create mode 100644 projects/stargazer/async_pool.cpp create mode 100644 projects/stargazer/async_pool.h create mode 100644 tests/test_async_pool.cpp diff --git a/projects/stargazer/CMakeLists.txt b/projects/stargazer/CMakeLists.txt index 005e72df..6fbfbbc4 100644 --- a/projects/stargazer/CMakeLists.txt +++ b/projects/stargazer/CMakeLists.txt @@ -7,6 +7,7 @@ set ( CPP_FILES main.cpp user_impl.cpp tariff_impl.cpp eventloop.cpp + async_pool.cpp pidfile.cpp plugin_runner.cpp plugin_mgr.cpp diff --git a/projects/stargazer/async_pool.cpp b/projects/stargazer/async_pool.cpp new file mode 100644 index 00000000..1d8fec0c --- /dev/null +++ b/projects/stargazer/async_pool.cpp @@ -0,0 +1,42 @@ +#include "async_pool.h" + +using STG::AsyncPool; + +AsyncPool& STG::AsyncPoolST::instance() +{ + static AsyncPool pool; + return pool; +} + +void AsyncPool::start() +{ + if (m_thread.joinable()) + return; + m_thread = std::jthread([this](auto token){ run(std::move(token)); }); +} + +void AsyncPool::stop() +{ + if (!m_thread.joinable()) + return; + m_thread.request_stop(); + m_cond.notify_all(); +} + +void AsyncPool::run(std::stop_token token) noexcept +{ + while (true) + { + Queue tasks; + { + std::unique_lock lock(m_mutex); + m_cond.wait(lock, [this, &token](){ return !m_tasks.empty() || token.stop_requested(); }); + if (token.stop_requested()) + return; + if (!m_tasks.empty()) + tasks.swap(m_tasks); + } + for (const auto& t : tasks) + t(); + } +} diff --git a/projects/stargazer/async_pool.h b/projects/stargazer/async_pool.h new file mode 100644 index 00000000..cf82db0d --- /dev/null +++ b/projects/stargazer/async_pool.h @@ -0,0 +1,62 @@ +#pragma once + +#include +#include +#include +#include +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wshadow" +#include +#pragma GCC diagnostic pop +#include + +namespace STG +{ + +class AsyncPool +{ + public: + AsyncPool() = default; + + void start(); + void stop(); + + template + void enqueue(F&& f) + { + { + std::lock_guard lock(m_mutex); + m_tasks.emplace_back(std::forward(f)); + } + m_cond.notify_all(); + } + + private: + using Task = std::function; + using Queue = std::deque; + + std::mutex m_mutex; + std::condition_variable m_cond; + Queue m_tasks; + std::jthread m_thread; + + void run(std::stop_token token) noexcept; +}; + +namespace AsyncPoolST +{ + +AsyncPool& instance(); + +inline +void start() { instance().start(); } +inline +void stop() { instance().stop(); } + +template +inline +void enqueue(F&& f) { instance().enqueue(std::forward(f)); } + +}; + +} diff --git a/projects/stargazer/main.cpp b/projects/stargazer/main.cpp index bcf7af77..3ee229c2 100644 --- a/projects/stargazer/main.cpp +++ b/projects/stargazer/main.cpp @@ -29,6 +29,7 @@ #include "traffcounter_impl.h" #include "settings_impl.h" #include "pidfile.h" +#include "async_pool.h" #include "eventloop.h" #include "stg_timer.h" @@ -283,6 +284,7 @@ int main(int argc, char* argv[]) } auto& loop = EVENT_LOOP::instance(); + STG::AsyncPoolST::start(); StoreLoader storeLoader(settings); if (storeLoader.load()) @@ -377,6 +379,7 @@ int main(int argc, char* argv[]) manager.stop(); + STG::AsyncPoolST::stop(); if (loop.Stop()) WriteServLog("Event loop not stopped."); diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index a552cfbe..8fc662c6 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -53,3 +53,8 @@ add_test ( filter_params_log test_filter_params_log ) add_executable ( test_subscriptions test_subscriptions.cpp ) target_link_libraries ( test_subscriptions Boost::unit_test_framework ) add_test ( subscriptions test_subscriptions ) + +add_executable ( test_async_pool test_async_pool.cpp ../projects/stargazer/async_pool.cpp ) +target_include_directories ( test_async_pool PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ../projects/stargazer ) +target_link_libraries ( test_async_pool Boost::unit_test_framework Threads::Threads ) +add_test ( async_pool test_async_pool ) diff --git a/tests/test_async_pool.cpp b/tests/test_async_pool.cpp new file mode 100644 index 00000000..7e2e0cfe --- /dev/null +++ b/tests/test_async_pool.cpp @@ -0,0 +1,54 @@ +#define BOOST_TEST_MODULE STGSubscriptions + +#include "async_pool.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wold-style-cast" +#pragma GCC diagnostic ignored "-Wunused-parameter" +#pragma GCC diagnostic ignored "-Wsign-compare" +#pragma GCC diagnostic ignored "-Wparentheses" +#include +#pragma GCC diagnostic pop + +namespace AsyncPoolST = STG::AsyncPoolST; + +namespace +{ + +size_t counter = 0; + +} + +BOOST_AUTO_TEST_SUITE() + +BOOST_AUTO_TEST_CASE(BeforeStart) +{ + BOOST_CHECK_EQUAL(counter, 0); + AsyncPoolST::enqueue([](){ ++counter; }); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(counter, 0); +} + +BOOST_AUTO_TEST_CASE(AfterStart) +{ + BOOST_CHECK_EQUAL(counter, 0); + AsyncPoolST::start(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(counter, 1); + AsyncPoolST::enqueue([](){ ++counter; }); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(counter, 2); +} + +BOOST_AUTO_TEST_CASE(AfterStop) +{ + BOOST_CHECK_EQUAL(counter, 2); + AsyncPoolST::stop(); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(counter, 2); + AsyncPoolST::enqueue([](){ ++counter; }); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + BOOST_CHECK_EQUAL(counter, 2); +} + +BOOST_AUTO_TEST_SUITE_END() -- 2.44.2