user_impl.cpp
tariff_impl.cpp
eventloop.cpp
+ async_pool.cpp
pidfile.cpp
plugin_runner.cpp
plugin_mgr.cpp
--- /dev/null
+#include "async_pool.h"
+
+using STG::AsyncPool;
+
+AsyncPool& STG::AsyncPoolST::instance()
+{
+ static AsyncPool pool;
+ return pool;
+}
+
+void AsyncPool::start()
+{
+ if (m_thread.joinable())
+ return;
+ m_thread = std::jthread([this](auto token){ run(std::move(token)); });
+}
+
+void AsyncPool::stop()
+{
+ if (!m_thread.joinable())
+ return;
+ m_thread.request_stop();
+ m_cond.notify_all();
+}
+
+void AsyncPool::run(std::stop_token token) noexcept
+{
+ while (true)
+ {
+ Queue tasks;
+ {
+ std::unique_lock lock(m_mutex);
+ m_cond.wait(lock, [this, &token](){ return !m_tasks.empty() || token.stop_requested(); });
+ if (token.stop_requested())
+ return;
+ if (!m_tasks.empty())
+ tasks.swap(m_tasks);
+ }
+ for (const auto& t : tasks)
+ t();
+ }
+}
--- /dev/null
+#pragma once
+
+#include <functional>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wshadow"
+#include <jthread.hpp>
+#pragma GCC diagnostic pop
+#include <jthread.hpp>
+
+namespace STG
+{
+
+class AsyncPool
+{
+ public:
+ AsyncPool() = default;
+
+ void start();
+ void stop();
+
+ template <typename F>
+ void enqueue(F&& f)
+ {
+ {
+ std::lock_guard lock(m_mutex);
+ m_tasks.emplace_back(std::forward<F>(f));
+ }
+ m_cond.notify_all();
+ }
+
+ private:
+ using Task = std::function<void ()>;
+ using Queue = std::deque<Task>;
+
+ std::mutex m_mutex;
+ std::condition_variable m_cond;
+ Queue m_tasks;
+ std::jthread m_thread;
+
+ void run(std::stop_token token) noexcept;
+};
+
+namespace AsyncPoolST
+{
+
+AsyncPool& instance();
+
+inline
+void start() { instance().start(); }
+inline
+void stop() { instance().stop(); }
+
+template <typename F>
+inline
+void enqueue(F&& f) { instance().enqueue(std::forward<F>(f)); }
+
+};
+
+}
#include "traffcounter_impl.h"
#include "settings_impl.h"
#include "pidfile.h"
+#include "async_pool.h"
#include "eventloop.h"
#include "stg_timer.h"
}
auto& loop = EVENT_LOOP::instance();
+ STG::AsyncPoolST::start();
StoreLoader storeLoader(settings);
if (storeLoader.load())
manager.stop();
+ STG::AsyncPoolST::stop();
if (loop.Stop())
WriteServLog("Event loop not stopped.");
add_executable ( test_subscriptions test_subscriptions.cpp )
target_link_libraries ( test_subscriptions Boost::unit_test_framework )
add_test ( subscriptions test_subscriptions )
+
+add_executable ( test_async_pool test_async_pool.cpp ../projects/stargazer/async_pool.cpp )
+target_include_directories ( test_async_pool PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ../projects/stargazer )
+target_link_libraries ( test_async_pool Boost::unit_test_framework Threads::Threads )
+add_test ( async_pool test_async_pool )
--- /dev/null
+#define BOOST_TEST_MODULE STGSubscriptions
+
+#include "async_pool.h"
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+#pragma GCC diagnostic ignored "-Wunused-parameter"
+#pragma GCC diagnostic ignored "-Wsign-compare"
+#pragma GCC diagnostic ignored "-Wparentheses"
+#include <boost/test/unit_test.hpp>
+#pragma GCC diagnostic pop
+
+namespace AsyncPoolST = STG::AsyncPoolST;
+
+namespace
+{
+
+size_t counter = 0;
+
+}
+
+BOOST_AUTO_TEST_SUITE()
+
+BOOST_AUTO_TEST_CASE(BeforeStart)
+{
+ BOOST_CHECK_EQUAL(counter, 0);
+ AsyncPoolST::enqueue([](){ ++counter; });
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ BOOST_CHECK_EQUAL(counter, 0);
+}
+
+BOOST_AUTO_TEST_CASE(AfterStart)
+{
+ BOOST_CHECK_EQUAL(counter, 0);
+ AsyncPoolST::start();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ BOOST_CHECK_EQUAL(counter, 1);
+ AsyncPoolST::enqueue([](){ ++counter; });
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ BOOST_CHECK_EQUAL(counter, 2);
+}
+
+BOOST_AUTO_TEST_CASE(AfterStop)
+{
+ BOOST_CHECK_EQUAL(counter, 2);
+ AsyncPoolST::stop();
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ BOOST_CHECK_EQUAL(counter, 2);
+ AsyncPoolST::enqueue([](){ ++counter; });
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ BOOST_CHECK_EQUAL(counter, 2);
+}
+
+BOOST_AUTO_TEST_SUITE_END()