]> git.stg.codes - stg.git/commitdiff
Add async pool (to replace EVENT_LOOP).
authorMaksym Mamontov <madf@madf.info>
Tue, 23 Aug 2022 13:55:28 +0000 (16:55 +0300)
committerMaksym Mamontov <madf@madf.info>
Tue, 23 Aug 2022 13:55:28 +0000 (16:55 +0300)
projects/stargazer/CMakeLists.txt
projects/stargazer/async_pool.cpp [new file with mode: 0644]
projects/stargazer/async_pool.h [new file with mode: 0644]
projects/stargazer/main.cpp
tests/CMakeLists.txt
tests/test_async_pool.cpp [new file with mode: 0644]

index 005e72df724cadf5094ab039de42a55c5375e7fa..6fbfbbc41e666bfebd120fa4d73b93495ec04967 100644 (file)
@@ -7,6 +7,7 @@ set ( CPP_FILES main.cpp
                 user_impl.cpp
                 tariff_impl.cpp
                 eventloop.cpp
                 user_impl.cpp
                 tariff_impl.cpp
                 eventloop.cpp
+                async_pool.cpp
                 pidfile.cpp
                 plugin_runner.cpp
                 plugin_mgr.cpp
                 pidfile.cpp
                 plugin_runner.cpp
                 plugin_mgr.cpp
diff --git a/projects/stargazer/async_pool.cpp b/projects/stargazer/async_pool.cpp
new file mode 100644 (file)
index 0000000..1d8fec0
--- /dev/null
@@ -0,0 +1,42 @@
+#include "async_pool.h"
+
+using STG::AsyncPool;
+
+AsyncPool& STG::AsyncPoolST::instance()
+{
+    static AsyncPool pool;
+    return pool;
+}
+
+void AsyncPool::start()
+{
+    if (m_thread.joinable())
+        return;
+    m_thread = std::jthread([this](auto token){ run(std::move(token)); });
+}
+
+void AsyncPool::stop()
+{
+    if (!m_thread.joinable())
+        return;
+    m_thread.request_stop();
+    m_cond.notify_all();
+}
+
+void AsyncPool::run(std::stop_token token) noexcept
+{
+    while (true)
+    {
+        Queue tasks;
+        {
+            std::unique_lock lock(m_mutex);
+            m_cond.wait(lock, [this, &token](){ return !m_tasks.empty() || token.stop_requested(); });
+            if (token.stop_requested())
+                return;
+            if (!m_tasks.empty())
+                tasks.swap(m_tasks);
+        }
+        for (const auto& t : tasks)
+            t();
+    }
+}
diff --git a/projects/stargazer/async_pool.h b/projects/stargazer/async_pool.h
new file mode 100644 (file)
index 0000000..cf82db0
--- /dev/null
@@ -0,0 +1,62 @@
+#pragma once
+
+#include <functional>
+#include <deque>
+#include <mutex>
+#include <condition_variable>
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wshadow"
+#include <jthread.hpp>
+#pragma GCC diagnostic pop
+#include <jthread.hpp>
+
+namespace STG
+{
+
+class AsyncPool
+{
+    public:
+        AsyncPool() = default;
+
+        void start();
+        void stop();
+
+        template <typename F>
+        void enqueue(F&& f)
+        {
+            {
+                std::lock_guard lock(m_mutex);
+                m_tasks.emplace_back(std::forward<F>(f));
+            }
+            m_cond.notify_all();
+        }
+
+    private:
+        using Task = std::function<void ()>;
+        using Queue = std::deque<Task>;
+
+        std::mutex m_mutex;
+        std::condition_variable m_cond;
+        Queue m_tasks;
+        std::jthread m_thread;
+
+        void run(std::stop_token token) noexcept;
+};
+
+namespace AsyncPoolST
+{
+
+AsyncPool& instance();
+
+inline
+void start() { instance().start(); }
+inline
+void stop() { instance().stop(); }
+
+template <typename F>
+inline
+void enqueue(F&& f) { instance().enqueue(std::forward<F>(f)); }
+
+};
+
+}
index bcf7af777f2bec80e9b8c89a11a54bcf63a05159..3ee229c2aff35a5c15e79f0dcd527b801e4db416 100644 (file)
@@ -29,6 +29,7 @@
 #include "traffcounter_impl.h"
 #include "settings_impl.h"
 #include "pidfile.h"
 #include "traffcounter_impl.h"
 #include "settings_impl.h"
 #include "pidfile.h"
+#include "async_pool.h"
 #include "eventloop.h"
 #include "stg_timer.h"
 
 #include "eventloop.h"
 #include "stg_timer.h"
 
@@ -283,6 +284,7 @@ int main(int argc, char* argv[])
     }
 
     auto& loop = EVENT_LOOP::instance();
     }
 
     auto& loop = EVENT_LOOP::instance();
+    STG::AsyncPoolST::start();
 
     StoreLoader storeLoader(settings);
     if (storeLoader.load())
 
     StoreLoader storeLoader(settings);
     if (storeLoader.load())
@@ -377,6 +379,7 @@ int main(int argc, char* argv[])
 
     manager.stop();
 
 
     manager.stop();
 
+    STG::AsyncPoolST::stop();
     if (loop.Stop())
         WriteServLog("Event loop not stopped.");
 
     if (loop.Stop())
         WriteServLog("Event loop not stopped.");
 
index a552cfbe2732df1a6ef7150515d6deb8316326f6..8fc662c6cb1dd405cf2c7565696cf70e61412263 100644 (file)
@@ -53,3 +53,8 @@ add_test ( filter_params_log test_filter_params_log )
 add_executable ( test_subscriptions test_subscriptions.cpp )
 target_link_libraries ( test_subscriptions Boost::unit_test_framework )
 add_test ( subscriptions test_subscriptions )
 add_executable ( test_subscriptions test_subscriptions.cpp )
 target_link_libraries ( test_subscriptions Boost::unit_test_framework )
 add_test ( subscriptions test_subscriptions )
+
+add_executable ( test_async_pool test_async_pool.cpp ../projects/stargazer/async_pool.cpp )
+target_include_directories ( test_async_pool PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ../projects/stargazer )
+target_link_libraries ( test_async_pool Boost::unit_test_framework Threads::Threads )
+add_test ( async_pool test_async_pool )
diff --git a/tests/test_async_pool.cpp b/tests/test_async_pool.cpp
new file mode 100644 (file)
index 0000000..7e2e0cf
--- /dev/null
@@ -0,0 +1,54 @@
+#define BOOST_TEST_MODULE STGSubscriptions
+
+#include "async_pool.h"
+
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wold-style-cast"
+#pragma GCC diagnostic ignored "-Wunused-parameter"
+#pragma GCC diagnostic ignored "-Wsign-compare"
+#pragma GCC diagnostic ignored "-Wparentheses"
+#include <boost/test/unit_test.hpp>
+#pragma GCC diagnostic pop
+
+namespace AsyncPoolST = STG::AsyncPoolST;
+
+namespace
+{
+
+size_t counter = 0;
+
+}
+
+BOOST_AUTO_TEST_SUITE()
+
+BOOST_AUTO_TEST_CASE(BeforeStart)
+{
+    BOOST_CHECK_EQUAL(counter, 0);
+    AsyncPoolST::enqueue([](){ ++counter; });
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    BOOST_CHECK_EQUAL(counter, 0);
+}
+
+BOOST_AUTO_TEST_CASE(AfterStart)
+{
+    BOOST_CHECK_EQUAL(counter, 0);
+    AsyncPoolST::start();
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    BOOST_CHECK_EQUAL(counter, 1);
+    AsyncPoolST::enqueue([](){ ++counter; });
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    BOOST_CHECK_EQUAL(counter, 2);
+}
+
+BOOST_AUTO_TEST_CASE(AfterStop)
+{
+    BOOST_CHECK_EQUAL(counter, 2);
+    AsyncPoolST::stop();
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    BOOST_CHECK_EQUAL(counter, 2);
+    AsyncPoolST::enqueue([](){ ++counter; });
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    BOOST_CHECK_EQUAL(counter, 2);
+}
+
+BOOST_AUTO_TEST_SUITE_END()