X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/a500fb72810060e52d87ad2c2e4691531f0bcc5a..ee1709cd231588fe672d0bd2546ef69ee87ff88c:/projects/stargazer/traffcounter_impl.cpp diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index e61be966..aa156c9d 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -45,11 +45,16 @@ #include "traffcounter_impl.h" #include "stg_timer.h" #include "users_impl.h" +#include "async_pool.h" #define FLUSH_TIME (10) #define REMOVE_TIME (31) using STG::TraffCounterImpl; +using STG::TRF_IP_BEFORE; +using STG::TRF_IP_AFTER; + +namespace AsyncPoolST = STG::AsyncPoolST; const char protoName[PROTOMAX][8] = {"TCP", "UDP", "ICMP", "TCP_UDP", "ALL"}; @@ -66,17 +71,20 @@ TraffCounterImpl::TraffCounterImpl(UsersImpl * u, const std::string & fn) monitoring(false), touchTimeP(stgTime - MONITOR_TIME_DELAY_SEC), users(u), - stopped(true), - addUserNotifier(*this), - delUserNotifier(*this) + stopped(true) { for (int i = 0; i < DIR_NUM; i++) strprintf(&dirName[i], "DIR%d", i); dirName[DIR_NUM] = "NULL"; -users->AddNotifierUserAdd(&addUserNotifier); -users->AddNotifierUserDel(&delUserNotifier); +m_onAddUserConn = users->onUserImplAdd([this](auto user){ + AsyncPoolST::enqueue([this, user](){ SetUserNotifiers(user); }); +}); +m_onDelUserConn = users->onUserImplDel([this](auto user){ + AsyncPoolST::enqueue([this, user](){ UnSetUserNotifiers(user); }); + AsyncPoolST::enqueue([this, user](){ DelUser(user->GetCurrIP()); }); +}); } //----------------------------------------------------------------------------- TraffCounterImpl::~TraffCounterImpl() @@ -106,7 +114,7 @@ while (users->SearchNext(h, &u) == 0) SetUserNotifiers(u); users->CloseSearch(h); -m_thread = std::jthread([this](auto token){ Run(token); }); +m_thread = std::jthread([this](auto token){ Run(std::move(token)); }); return 0; } //----------------------------------------------------------------------------- @@ -128,9 +136,7 @@ users->CloseSearch(h); //5 seconds to thread stops itself struct timespec ts = {0, 200000000}; for (int i = 0; i < 25 && !stopped; i++) - { nanosleep(&ts, NULL); - } if (!stopped) { @@ -138,6 +144,8 @@ if (!stopped) return -1; } +m_thread.join(); + printfd(__FILE__, "TraffCounter::Stop()\n"); return 0; @@ -843,3 +851,21 @@ monitorDir = dir; monitoring = !monitorDir.empty(); } //----------------------------------------------------------------------------- +void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &) +{ +// User changes his address. Remove old IP +if (!oldValue) + return; + +AsyncPoolST::enqueue([this, oldValue](){ traffCnt.DelUser(oldValue); }); +} +//----------------------------------------------------------------------------- +void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue) +{ +// User changes his address. Add new IP +if (!newValue) + return; + +AsyncPoolST::enqueue([this](){ traffCnt.AddUser(user); }); +} +//-----------------------------------------------------------------------------