From: Maksym Mamontov Date: Tue, 23 Aug 2022 14:11:56 +0000 (+0300) Subject: Use async pool instead of EVENT_LOOP. X-Git-Url: https://git.stg.codes/stg.git/commitdiff_plain/c59911ca3cd38cf4ab36d2cc62686f97395899f9?ds=sidebyside;hp=646c8fd6c0112573ba2aae7f165f5d48e849831e Use async pool instead of EVENT_LOOP. --- diff --git a/projects/stargazer/CMakeLists.txt b/projects/stargazer/CMakeLists.txt index 6fbfbbc4..751bfc03 100644 --- a/projects/stargazer/CMakeLists.txt +++ b/projects/stargazer/CMakeLists.txt @@ -6,7 +6,6 @@ set ( CPP_FILES main.cpp services_impl.cpp user_impl.cpp tariff_impl.cpp - eventloop.cpp async_pool.cpp pidfile.cpp plugin_runner.cpp diff --git a/projects/stargazer/actions.h b/projects/stargazer/actions.h deleted file mode 100644 index 360a56e1..00000000 --- a/projects/stargazer/actions.h +++ /dev/null @@ -1,96 +0,0 @@ -#pragma once - -// Usage: -// -// ACTIONS_LIST actionsList; -// CLASS myClass; -// DATA1 myData1; -// DATA2 myData2; -// -// actionsList.Enqueue(myClass, &CLASS::myMethod1, myData1); -// actionsList.Enqueue(myClass, &CLASS::myMethod2, myData2); -// -// actionsList.InvokeAll(); - -#include -#include -#include - -// Generalized actor type - a method of some class with one argument -template -struct ACTOR -{ - using TYPE = void (ACTIVE_CLASS::*)(DATA_TYPE); -}; - -// Abstract base action class for polymorphic action invocation -class BASE_ACTION -{ -public: - virtual ~BASE_ACTION() {} - virtual void Invoke() = 0; -}; - -// Concrete generalized action type - an actor with it's data and owner -template -class ACTION : public BASE_ACTION -{ -public: - ACTION(ACTIVE_CLASS & ac, - typename ACTOR::TYPE a, - DATA_TYPE d) - : activeClass(ac), actor(a), data(d) {} - void Invoke() override - { - (activeClass.*actor)(data); - } -private: - ACTION(const ACTION & rvalue); - ACTION & operator=(const ACTION & rvalue); - - ACTIVE_CLASS & activeClass; - typename ACTOR::TYPE actor; - DATA_TYPE data; -}; - -// A list of an actions -// All methods are thread-safe -class ACTIONS_LIST -{ -public: - ~ACTIONS_LIST() - { - std::lock_guard lock(m_mutex); - for (auto action : m_list) - delete action; - } - - auto begin() { std::lock_guard lock(m_mutex); return m_list.begin(); } - auto end() { std::lock_guard lock(m_mutex); return m_list.end(); } - auto begin() const { std::lock_guard lock(m_mutex); return m_list.begin(); } - auto end() const { std::lock_guard lock(m_mutex); return m_list.end(); } - - bool empty() const { std::lock_guard lock(m_mutex); return m_list.empty(); } - size_t size() const { std::lock_guard lock(m_mutex); return m_list.size(); } - void swap(ACTIONS_LIST & rhs) { std::lock_guard lock(m_mutex); m_list.swap(rhs.m_list); } - - // Add an action to list - template - void Enqueue(ACTIVE_CLASS & ac, - typename ACTOR::TYPE a, - DATA_TYPE d) - { - std::lock_guard lock(m_mutex); - m_list.push_back(new ACTION(ac, a, d)); - } - // Invoke all actions in the list - void InvokeAll() - { - std::lock_guard lock(m_mutex); - for (auto action : m_list) - action->Invoke(); - } -private: - mutable std::mutex m_mutex; - std::vector m_list; -}; diff --git a/projects/stargazer/eventloop.cpp b/projects/stargazer/eventloop.cpp deleted file mode 100644 index ea175c41..00000000 --- a/projects/stargazer/eventloop.cpp +++ /dev/null @@ -1,59 +0,0 @@ -#include "stg/common.h" -#include "eventloop.h" - -#include -#include -#include - -EVENT_LOOP& EVENT_LOOP::instance() -{ - static EVENT_LOOP el; - return el; -} - -bool EVENT_LOOP::Start() -{ -m_thread = std::jthread([this](auto token){ Run(std::move(token)); }); -return false; -} - -bool EVENT_LOOP::Stop() -{ -m_thread.request_stop(); -// Wake up thread -m_cond.notify_all(); -m_thread.join(); -return false; -} - -void EVENT_LOOP::Run(std::stop_token token) -{ -sigset_t signalSet; -sigfillset(&signalSet); -pthread_sigmask(SIG_BLOCK, &signalSet, NULL); - -printfd(__FILE__, "EVENT_LOOP::Runner - Before start\n"); -while (!token.stop_requested()) - { - // Create new empty actions list - ACTIONS_LIST local; - { - std::unique_lock lock(m_mutex); - // Check for any actions... - // ... and sleep until new actions added - printfd(__FILE__, "EVENT_LOOP::Runner - Sleeping until new actions arrived\n"); - m_cond.wait(lock); - // Check for running after wake up - if (token.stop_requested()) - break; // Don't process any actions if stopping - if (!m_list.empty()) - local.swap(m_list); - } - // Fast swap with current - m_list.swap(local); - // Invoke all current actions - printfd(__FILE__, "EVENT_LOOP::Runner - Invoke %d actions\n", local.size()); - local.InvokeAll(); - } -printfd(__FILE__, "EVENT_LOOP::Runner - Before stop\n"); -} diff --git a/projects/stargazer/eventloop.h b/projects/stargazer/eventloop.h deleted file mode 100644 index 0b2b8397..00000000 --- a/projects/stargazer/eventloop.h +++ /dev/null @@ -1,45 +0,0 @@ -#ifndef __EVENT_LOOP_H__ -#define __EVENT_LOOP_H__ - -#include "actions.h" - -#include -#include -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wshadow" -#include -#pragma GCC diagnostic pop - -class EVENT_LOOP -{ - public: - static EVENT_LOOP& instance(); - - bool Start(); - bool Stop(); - - template - void Enqueue(ACTIVE_CLASS & ac, - typename ACTOR::TYPE a, - DATA_TYPE d) - { - std::lock_guard lock(m_mutex); - // Add new action - m_list.Enqueue(ac, a, d); - // Signal about new action - m_cond.notify_all(); - } - - private: - std::jthread m_thread; - std::mutex m_mutex; - std::condition_variable m_cond; - - ACTIONS_LIST m_list; - - EVENT_LOOP() = default; - - void Run(std::stop_token token); -}; - -#endif diff --git a/projects/stargazer/main.cpp b/projects/stargazer/main.cpp index 3ee229c2..ee14c3a8 100644 --- a/projects/stargazer/main.cpp +++ b/projects/stargazer/main.cpp @@ -30,7 +30,6 @@ #include "settings_impl.h" #include "pidfile.h" #include "async_pool.h" -#include "eventloop.h" #include "stg_timer.h" #include "stg/user.h" @@ -283,7 +282,6 @@ int main(int argc, char* argv[]) return -1; } - auto& loop = EVENT_LOOP::instance(); STG::AsyncPoolST::start(); StoreLoader storeLoader(settings); @@ -294,13 +292,6 @@ int main(int argc, char* argv[]) return -1; } - if (loop.Start()) - { - printfd(__FILE__, "Event loop not started.\n"); - WriteServLog("Event loop not started."); - return -1; - } - auto& store = storeLoader.get(); WriteServLog("Storage plugin: %s. Loading successfull.", store.GetVersion().c_str()); @@ -380,8 +371,6 @@ int main(int argc, char* argv[]) manager.stop(); STG::AsyncPoolST::stop(); - if (loop.Stop()) - WriteServLog("Event loop not stopped."); if (!traffCnt.Stop()) WriteServLog("Traffcounter: Stop successfull."); diff --git a/projects/stargazer/plugins/authorization/ao/ao.cpp b/projects/stargazer/plugins/authorization/ao/ao.cpp index 78e1c955..1145d679 100644 --- a/projects/stargazer/plugins/authorization/ao/ao.cpp +++ b/projects/stargazer/plugins/authorization/ao/ao.cpp @@ -213,7 +213,6 @@ return -1; template void CHG_BEFORE_NOTIFIER::notify(const varParamType &, const varParamType &) { -//EVENT_LOOP_SINGLETON::GetInstance().Enqueue(auth, &AUTH_AO::Unauthorize, user); if (user->IsAuthorizedBy(&auth)) auth.users->Unauthorize(user->GetLogin(), &auth); } @@ -221,7 +220,6 @@ if (user->IsAuthorizedBy(&auth)) template void CHG_AFTER_NOTIFIER::notify(const varParamType &, const varParamType &) { -//EVENT_LOOP_SINGLETON::GetInstance().Enqueue(auth, &AUTH_AO::UpdateUserAuthorization, user); auth.UpdateUserAuthorization(user); } //----------------------------------------------------------------------------- diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index 377e3889..ed0f93d5 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -45,11 +45,18 @@ #include "traffcounter_impl.h" #include "stg_timer.h" #include "users_impl.h" +#include "async_pool.h" #define FLUSH_TIME (10) #define REMOVE_TIME (31) using STG::TraffCounterImpl; +using STG::TRF_IP_BEFORE; +using STG::TRF_IP_AFTER; +using STG::ADD_USER_NONIFIER; +using STG::DEL_USER_NONIFIER; + +namespace AsyncPoolST = STG::AsyncPoolST; const char protoName[PROTOMAX][8] = {"TCP", "UDP", "ICMP", "TCP_UDP", "ALL"}; @@ -843,3 +850,32 @@ monitorDir = dir; monitoring = !monitorDir.empty(); } //----------------------------------------------------------------------------- +void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &) +{ +// User changes his address. Remove old IP +if (!oldValue) + return; + +AsyncPoolST::enqueue([this, oldValue](){ traffCnt.DelUser(oldValue); }); +} +//----------------------------------------------------------------------------- +void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue) +{ +// User changes his address. Add new IP +if (!newValue) + return; + +AsyncPoolST::enqueue([this](){ traffCnt.AddUser(user); }); +} +//----------------------------------------------------------------------------- +void ADD_USER_NONIFIER::notify(const UserImplPtr & user) +{ +AsyncPoolST::enqueue([this, user](){ traffCnt.SetUserNotifiers(user); }); +} +//----------------------------------------------------------------------------- +void DEL_USER_NONIFIER::notify(const UserImplPtr & user) +{ +AsyncPoolST::enqueue([this, user](){ traffCnt.UnSetUserNotifiers(user); }); +AsyncPoolST::enqueue([this, user](){ traffCnt.DelUser(user->GetCurrIP()); }); +} +//----------------------------------------------------------------------------- diff --git a/projects/stargazer/traffcounter_impl.h b/projects/stargazer/traffcounter_impl.h index d23f6306..7fd06309 100644 --- a/projects/stargazer/traffcounter_impl.h +++ b/projects/stargazer/traffcounter_impl.h @@ -25,8 +25,6 @@ #include "stg/raw_ip_packet.h" #include "stg/noncopyable.h" #include "stg/notifer.h" -#include "actions.h" -#include "eventloop.h" #include "user_impl.h" #include @@ -236,38 +234,6 @@ class TraffCounterImpl : public TraffCounter { ADD_USER_NONIFIER addUserNotifier; DEL_USER_NONIFIER delUserNotifier; }; -//----------------------------------------------------------------------------- -inline -void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &) -{ -// User changes his address. Remove old IP -if (!oldValue) - return; -EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::DelUser, oldValue); } //----------------------------------------------------------------------------- -inline -void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue) -{ -// User changes his address. Add new IP -if (!newValue) - return; - -EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::AddUser, user); -} -//----------------------------------------------------------------------------- -inline -void ADD_USER_NONIFIER::notify(const UserImplPtr & user) -{ -EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::SetUserNotifiers, user); -} -//----------------------------------------------------------------------------- -inline -void DEL_USER_NONIFIER::notify(const UserImplPtr & user) -{ -EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::UnSetUserNotifiers, user); -EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::DelUser, user->GetCurrIP()); -} -//----------------------------------------------------------------------------- -} diff --git a/projects/stargazer/users_impl.h b/projects/stargazer/users_impl.h index bfb52fa1..cd7d2932 100644 --- a/projects/stargazer/users_impl.h +++ b/projects/stargazer/users_impl.h @@ -41,8 +41,6 @@ #include "stg/logger.h" #include "stg/notifer.h" #include "stg/noncopyable.h" -#include "actions.h" -#include "eventloop.h" #include "settings_impl.h" #include "user_impl.h"