services_impl.cpp
user_impl.cpp
tariff_impl.cpp
- eventloop.cpp
async_pool.cpp
pidfile.cpp
plugin_runner.cpp
+++ /dev/null
-#pragma once
-
-// Usage:
-//
-// ACTIONS_LIST actionsList;
-// CLASS myClass;
-// DATA1 myData1;
-// DATA2 myData2;
-//
-// actionsList.Enqueue(myClass, &CLASS::myMethod1, myData1);
-// actionsList.Enqueue(myClass, &CLASS::myMethod2, myData2);
-//
-// actionsList.InvokeAll();
-
-#include <vector>
-#include <functional>
-#include <mutex>
-
-// Generalized actor type - a method of some class with one argument
-template <class ACTIVE_CLASS, typename DATA_TYPE>
-struct ACTOR
-{
- using TYPE = void (ACTIVE_CLASS::*)(DATA_TYPE);
-};
-
-// Abstract base action class for polymorphic action invocation
-class BASE_ACTION
-{
-public:
- virtual ~BASE_ACTION() {}
- virtual void Invoke() = 0;
-};
-
-// Concrete generalized action type - an actor with it's data and owner
-template <class ACTIVE_CLASS, typename DATA_TYPE>
-class ACTION : public BASE_ACTION
-{
-public:
- ACTION(ACTIVE_CLASS & ac,
- typename ACTOR<ACTIVE_CLASS, DATA_TYPE>::TYPE a,
- DATA_TYPE d)
- : activeClass(ac), actor(a), data(d) {}
- void Invoke() override
- {
- (activeClass.*actor)(data);
- }
-private:
- ACTION(const ACTION<ACTIVE_CLASS, DATA_TYPE> & rvalue);
- ACTION<ACTIVE_CLASS, DATA_TYPE> & operator=(const ACTION<ACTIVE_CLASS, DATA_TYPE> & rvalue);
-
- ACTIVE_CLASS & activeClass;
- typename ACTOR<ACTIVE_CLASS, DATA_TYPE>::TYPE actor;
- DATA_TYPE data;
-};
-
-// A list of an actions
-// All methods are thread-safe
-class ACTIONS_LIST
-{
-public:
- ~ACTIONS_LIST()
- {
- std::lock_guard lock(m_mutex);
- for (auto action : m_list)
- delete action;
- }
-
- auto begin() { std::lock_guard lock(m_mutex); return m_list.begin(); }
- auto end() { std::lock_guard lock(m_mutex); return m_list.end(); }
- auto begin() const { std::lock_guard lock(m_mutex); return m_list.begin(); }
- auto end() const { std::lock_guard lock(m_mutex); return m_list.end(); }
-
- bool empty() const { std::lock_guard lock(m_mutex); return m_list.empty(); }
- size_t size() const { std::lock_guard lock(m_mutex); return m_list.size(); }
- void swap(ACTIONS_LIST & rhs) { std::lock_guard lock(m_mutex); m_list.swap(rhs.m_list); }
-
- // Add an action to list
- template <class ACTIVE_CLASS, typename DATA_TYPE>
- void Enqueue(ACTIVE_CLASS & ac,
- typename ACTOR<ACTIVE_CLASS, DATA_TYPE>::TYPE a,
- DATA_TYPE d)
- {
- std::lock_guard lock(m_mutex);
- m_list.push_back(new ACTION<ACTIVE_CLASS, DATA_TYPE>(ac, a, d));
- }
- // Invoke all actions in the list
- void InvokeAll()
- {
- std::lock_guard lock(m_mutex);
- for (auto action : m_list)
- action->Invoke();
- }
-private:
- mutable std::mutex m_mutex;
- std::vector<BASE_ACTION*> m_list;
-};
+++ /dev/null
-#include "stg/common.h"
-#include "eventloop.h"
-
-#include <csignal>
-#include <cerrno>
-#include <cstring>
-
-EVENT_LOOP& EVENT_LOOP::instance()
-{
- static EVENT_LOOP el;
- return el;
-}
-
-bool EVENT_LOOP::Start()
-{
-m_thread = std::jthread([this](auto token){ Run(std::move(token)); });
-return false;
-}
-
-bool EVENT_LOOP::Stop()
-{
-m_thread.request_stop();
-// Wake up thread
-m_cond.notify_all();
-m_thread.join();
-return false;
-}
-
-void EVENT_LOOP::Run(std::stop_token token)
-{
-sigset_t signalSet;
-sigfillset(&signalSet);
-pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
-
-printfd(__FILE__, "EVENT_LOOP::Runner - Before start\n");
-while (!token.stop_requested())
- {
- // Create new empty actions list
- ACTIONS_LIST local;
- {
- std::unique_lock lock(m_mutex);
- // Check for any actions...
- // ... and sleep until new actions added
- printfd(__FILE__, "EVENT_LOOP::Runner - Sleeping until new actions arrived\n");
- m_cond.wait(lock);
- // Check for running after wake up
- if (token.stop_requested())
- break; // Don't process any actions if stopping
- if (!m_list.empty())
- local.swap(m_list);
- }
- // Fast swap with current
- m_list.swap(local);
- // Invoke all current actions
- printfd(__FILE__, "EVENT_LOOP::Runner - Invoke %d actions\n", local.size());
- local.InvokeAll();
- }
-printfd(__FILE__, "EVENT_LOOP::Runner - Before stop\n");
-}
+++ /dev/null
-#ifndef __EVENT_LOOP_H__
-#define __EVENT_LOOP_H__
-
-#include "actions.h"
-
-#include <mutex>
-#include <condition_variable>
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wshadow"
-#include <jthread.hpp>
-#pragma GCC diagnostic pop
-
-class EVENT_LOOP
-{
- public:
- static EVENT_LOOP& instance();
-
- bool Start();
- bool Stop();
-
- template <class ACTIVE_CLASS, typename DATA_TYPE>
- void Enqueue(ACTIVE_CLASS & ac,
- typename ACTOR<ACTIVE_CLASS, DATA_TYPE>::TYPE a,
- DATA_TYPE d)
- {
- std::lock_guard lock(m_mutex);
- // Add new action
- m_list.Enqueue(ac, a, d);
- // Signal about new action
- m_cond.notify_all();
- }
-
- private:
- std::jthread m_thread;
- std::mutex m_mutex;
- std::condition_variable m_cond;
-
- ACTIONS_LIST m_list;
-
- EVENT_LOOP() = default;
-
- void Run(std::stop_token token);
-};
-
-#endif
#include "settings_impl.h"
#include "pidfile.h"
#include "async_pool.h"
-#include "eventloop.h"
#include "stg_timer.h"
#include "stg/user.h"
return -1;
}
- auto& loop = EVENT_LOOP::instance();
STG::AsyncPoolST::start();
StoreLoader storeLoader(settings);
return -1;
}
- if (loop.Start())
- {
- printfd(__FILE__, "Event loop not started.\n");
- WriteServLog("Event loop not started.");
- return -1;
- }
-
auto& store = storeLoader.get();
WriteServLog("Storage plugin: %s. Loading successfull.", store.GetVersion().c_str());
manager.stop();
STG::AsyncPoolST::stop();
- if (loop.Stop())
- WriteServLog("Event loop not stopped.");
if (!traffCnt.Stop())
WriteServLog("Traffcounter: Stop successfull.");
template <typename varParamType>
void CHG_BEFORE_NOTIFIER<varParamType>::notify(const varParamType &, const varParamType &)
{
-//EVENT_LOOP_SINGLETON::GetInstance().Enqueue(auth, &AUTH_AO::Unauthorize, user);
if (user->IsAuthorizedBy(&auth))
auth.users->Unauthorize(user->GetLogin(), &auth);
}
template <typename varParamType>
void CHG_AFTER_NOTIFIER<varParamType>::notify(const varParamType &, const varParamType &)
{
-//EVENT_LOOP_SINGLETON::GetInstance().Enqueue(auth, &AUTH_AO::UpdateUserAuthorization, user);
auth.UpdateUserAuthorization(user);
}
//-----------------------------------------------------------------------------
#include "traffcounter_impl.h"
#include "stg_timer.h"
#include "users_impl.h"
+#include "async_pool.h"
#define FLUSH_TIME (10)
#define REMOVE_TIME (31)
using STG::TraffCounterImpl;
+using STG::TRF_IP_BEFORE;
+using STG::TRF_IP_AFTER;
+using STG::ADD_USER_NONIFIER;
+using STG::DEL_USER_NONIFIER;
+
+namespace AsyncPoolST = STG::AsyncPoolST;
const char protoName[PROTOMAX][8] =
{"TCP", "UDP", "ICMP", "TCP_UDP", "ALL"};
monitoring = !monitorDir.empty();
}
//-----------------------------------------------------------------------------
+void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &)
+{
+// User changes his address. Remove old IP
+if (!oldValue)
+ return;
+
+AsyncPoolST::enqueue([this, oldValue](){ traffCnt.DelUser(oldValue); });
+}
+//-----------------------------------------------------------------------------
+void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue)
+{
+// User changes his address. Add new IP
+if (!newValue)
+ return;
+
+AsyncPoolST::enqueue([this](){ traffCnt.AddUser(user); });
+}
+//-----------------------------------------------------------------------------
+void ADD_USER_NONIFIER::notify(const UserImplPtr & user)
+{
+AsyncPoolST::enqueue([this, user](){ traffCnt.SetUserNotifiers(user); });
+}
+//-----------------------------------------------------------------------------
+void DEL_USER_NONIFIER::notify(const UserImplPtr & user)
+{
+AsyncPoolST::enqueue([this, user](){ traffCnt.UnSetUserNotifiers(user); });
+AsyncPoolST::enqueue([this, user](){ traffCnt.DelUser(user->GetCurrIP()); });
+}
+//-----------------------------------------------------------------------------
#include "stg/raw_ip_packet.h"
#include "stg/noncopyable.h"
#include "stg/notifer.h"
-#include "actions.h"
-#include "eventloop.h"
#include "user_impl.h"
#include <ctime>
ADD_USER_NONIFIER addUserNotifier;
DEL_USER_NONIFIER delUserNotifier;
};
-//-----------------------------------------------------------------------------
-inline
-void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &)
-{
-// User changes his address. Remove old IP
-if (!oldValue)
- return;
-EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::DelUser, oldValue);
}
//-----------------------------------------------------------------------------
-inline
-void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue)
-{
-// User changes his address. Add new IP
-if (!newValue)
- return;
-
-EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::AddUser, user);
-}
-//-----------------------------------------------------------------------------
-inline
-void ADD_USER_NONIFIER::notify(const UserImplPtr & user)
-{
-EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::SetUserNotifiers, user);
-}
-//-----------------------------------------------------------------------------
-inline
-void DEL_USER_NONIFIER::notify(const UserImplPtr & user)
-{
-EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::UnSetUserNotifiers, user);
-EVENT_LOOP::instance().Enqueue(traffCnt, &TraffCounterImpl::DelUser, user->GetCurrIP());
-}
-//-----------------------------------------------------------------------------
-}
#include "stg/logger.h"
#include "stg/notifer.h"
#include "stg/noncopyable.h"
-#include "actions.h"
-#include "eventloop.h"
#include "settings_impl.h"
#include "user_impl.h"