X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/a500fb72810060e52d87ad2c2e4691531f0bcc5a..2574a28cbf000603bc31f61593dbf061ff56c1d5:/projects/stargazer/traffcounter_impl.cpp?ds=inline diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index e61be966..70ade1ae 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -40,17 +40,19 @@ #include // strtol #include "stg/common.h" -#include "stg/locker.h" #include "stg/const.h" // MONITOR_TIME_DELAY_SEC #include "traffcounter_impl.h" #include "stg_timer.h" #include "users_impl.h" +#include "async_pool.h" #define FLUSH_TIME (10) #define REMOVE_TIME (31) using STG::TraffCounterImpl; +namespace AsyncPoolST = STG::AsyncPoolST; + const char protoName[PROTOMAX][8] = {"TCP", "UDP", "ICMP", "TCP_UDP", "ALL"}; @@ -66,17 +68,19 @@ TraffCounterImpl::TraffCounterImpl(UsersImpl * u, const std::string & fn) monitoring(false), touchTimeP(stgTime - MONITOR_TIME_DELAY_SEC), users(u), - stopped(true), - addUserNotifier(*this), - delUserNotifier(*this) + stopped(true) { for (int i = 0; i < DIR_NUM; i++) strprintf(&dirName[i], "DIR%d", i); dirName[DIR_NUM] = "NULL"; -users->AddNotifierUserAdd(&addUserNotifier); -users->AddNotifierUserDel(&delUserNotifier); +m_onAddUserConn = users->onImplAdd([this](auto user){ + AsyncPoolST::enqueue([this, user](){ SetUserNotifiers(user); }); +}); +m_onDelUserConn = users->onImplDel([this](auto user){ + AsyncPoolST::enqueue([this, user](){ UnSetUserNotifiers(user); DelUser(user->GetCurrIP()); }); +}); } //----------------------------------------------------------------------------- TraffCounterImpl::~TraffCounterImpl() @@ -106,7 +110,7 @@ while (users->SearchNext(h, &u) == 0) SetUserNotifiers(u); users->CloseSearch(h); -m_thread = std::jthread([this](auto token){ Run(token); }); +m_thread = std::jthread([this](auto token){ Run(std::move(token)); }); return 0; } //----------------------------------------------------------------------------- @@ -120,17 +124,11 @@ m_thread.request_stop(); int h = users->OpenSearch(); assert(h && "USERS::OpenSearch is always correct"); -UserImpl * u; -while (users->SearchNext(h, &u) == 0) - UnSetUserNotifiers(u); -users->CloseSearch(h); +m_onIPConns.clear(); //5 seconds to thread stops itself -struct timespec ts = {0, 200000000}; for (int i = 0; i < 25 && !stopped; i++) - { - nanosleep(&ts, NULL); - } + std::this_thread::sleep_for(std::chrono::milliseconds(200)); if (!stopped) { @@ -138,6 +136,8 @@ if (!stopped) return -1; } +m_thread.join(); + printfd(__FILE__, "TraffCounter::Stop()\n"); return 0; @@ -153,10 +153,9 @@ stopped = false; int c = 0; time_t touchTime = stgTime - MONITOR_TIME_DELAY_SEC; -struct timespec ts = {0, 500000000}; while (!token.stop_requested()) { - nanosleep(&ts, 0); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); if (token.stop_requested()) { FlushAndRemove(); @@ -438,47 +437,22 @@ while (pi.first != pi.second) ip2packets.erase(pi.first, pi.second); } //----------------------------------------------------------------------------- -void TraffCounterImpl::SetUserNotifiers(UserImpl * user) +void TraffCounterImpl::SetUserNotifiers(UserImpl* user) { -// Adding user. Adding notifiers to user. -TRF_IP_BEFORE ipBNotifier(*this, user); -ipBeforeNotifiers.push_front(ipBNotifier); -user->AddCurrIPBeforeNotifier(&(*ipBeforeNotifiers.begin())); - -TRF_IP_AFTER ipANotifier(*this, user); -ipAfterNotifiers.push_front(ipANotifier); -user->AddCurrIPAfterNotifier(&(*ipAfterNotifiers.begin())); + // Adding user. Adding notifiers to user. + m_onIPConns.emplace_back( + user->GetID(), + user->beforeCurrIPChange([this](auto oldVal, auto /*newVal*/){ beforeIPChange(oldVal); }), + user->afterCurrIPChange([this, user](auto /*oldVal*/, auto newVal){ afterIPChange(user, newVal); }) + ); } //----------------------------------------------------------------------------- void TraffCounterImpl::UnSetUserNotifiers(UserImpl * user) { -// Removing user. Removing notifiers from user. -std::list::iterator bi; -std::list::iterator ai; - -bi = ipBeforeNotifiers.begin(); -while (bi != ipBeforeNotifiers.end()) - { - if (user->GetLogin() == bi->GetUser()->GetLogin()) - { - user->DelCurrIPBeforeNotifier(&(*bi)); - ipBeforeNotifiers.erase(bi); - break; - } - ++bi; - } - -ai = ipAfterNotifiers.begin(); -while (ai != ipAfterNotifiers.end()) - { - if (user->GetLogin() == ai->GetUser()->GetLogin()) - { - user->DelCurrIPAfterNotifier(&(*ai)); - ipAfterNotifiers.erase(ai); - break; - } - ++ai; - } + // Removing user. Removing notifiers from user. + m_onIPConns.erase(std::remove_if(m_onIPConns.begin(), m_onIPConns.end(), + [user](const auto& cs){ return std::get<0>(cs) == user->GetID(); }), + m_onIPConns.end()); } //----------------------------------------------------------------------------- void TraffCounterImpl::DeterminateDir(const RawPacket & packet, @@ -843,3 +817,21 @@ monitorDir = dir; monitoring = !monitorDir.empty(); } //----------------------------------------------------------------------------- +void TraffCounterImpl::beforeIPChange(uint32_t oldVal) +{ + // User changes his address. Remove old IP + if (!oldVal) + return; + + AsyncPoolST::enqueue([this, oldVal](){ DelUser(oldVal); }); +} +//----------------------------------------------------------------------------- +void TraffCounterImpl::afterIPChange(UserImpl* user, uint32_t newVal) +{ + // User changes his address. Add new IP + if (!newVal) + return; + + AsyncPoolST::enqueue([this, user](){ AddUser(user); }); +} +//-----------------------------------------------------------------------------