X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/ee1709cd231588fe672d0bd2546ef69ee87ff88c..16e9905f82947dd471c09382122d8150ba6fda1a:/projects/stargazer/traffcounter_impl.cpp diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index aa156c9d..f983d6af 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -28,31 +28,29 @@ $Author: faust $ */ -/* inet_aton */ -#include -#include -#include -#include - -#include -#include -#include // fopen and similar -#include // strtol - #include "stg/common.h" -#include "stg/locker.h" #include "stg/const.h" // MONITOR_TIME_DELAY_SEC #include "traffcounter_impl.h" #include "stg_timer.h" #include "users_impl.h" #include "async_pool.h" +#include +#include +#include +#include // fopen and similar +#include // strtol + +/* inet_aton */ +#include +#include +#include +#include + #define FLUSH_TIME (10) #define REMOVE_TIME (31) using STG::TraffCounterImpl; -using STG::TRF_IP_BEFORE; -using STG::TRF_IP_AFTER; namespace AsyncPoolST = STG::AsyncPoolST; @@ -78,12 +76,11 @@ for (int i = 0; i < DIR_NUM; i++) dirName[DIR_NUM] = "NULL"; -m_onAddUserConn = users->onUserImplAdd([this](auto user){ +m_onAddUserConn = users->onImplAdd([this](auto user){ AsyncPoolST::enqueue([this, user](){ SetUserNotifiers(user); }); }); -m_onDelUserConn = users->onUserImplDel([this](auto user){ - AsyncPoolST::enqueue([this, user](){ UnSetUserNotifiers(user); }); - AsyncPoolST::enqueue([this, user](){ DelUser(user->GetCurrIP()); }); +m_onDelUserConn = users->onImplDel([this](auto user){ + AsyncPoolST::enqueue([this, user](){ UnSetUserNotifiers(user); DelUser(user->GetCurrIP()); }); }); } //----------------------------------------------------------------------------- @@ -128,15 +125,11 @@ m_thread.request_stop(); int h = users->OpenSearch(); assert(h && "USERS::OpenSearch is always correct"); -UserImpl * u; -while (users->SearchNext(h, &u) == 0) - UnSetUserNotifiers(u); -users->CloseSearch(h); +m_onIPConns.clear(); //5 seconds to thread stops itself -struct timespec ts = {0, 200000000}; for (int i = 0; i < 25 && !stopped; i++) - nanosleep(&ts, NULL); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); if (!stopped) { @@ -161,10 +154,9 @@ stopped = false; int c = 0; time_t touchTime = stgTime - MONITOR_TIME_DELAY_SEC; -struct timespec ts = {0, 500000000}; while (!token.stop_requested()) { - nanosleep(&ts, 0); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); if (token.stop_requested()) { FlushAndRemove(); @@ -446,47 +438,22 @@ while (pi.first != pi.second) ip2packets.erase(pi.first, pi.second); } //----------------------------------------------------------------------------- -void TraffCounterImpl::SetUserNotifiers(UserImpl * user) +void TraffCounterImpl::SetUserNotifiers(UserImpl* user) { -// Adding user. Adding notifiers to user. -TRF_IP_BEFORE ipBNotifier(*this, user); -ipBeforeNotifiers.push_front(ipBNotifier); -user->AddCurrIPBeforeNotifier(&(*ipBeforeNotifiers.begin())); - -TRF_IP_AFTER ipANotifier(*this, user); -ipAfterNotifiers.push_front(ipANotifier); -user->AddCurrIPAfterNotifier(&(*ipAfterNotifiers.begin())); + // Adding user. Adding notifiers to user. + m_onIPConns.emplace_back( + user->GetID(), + user->beforeCurrIPChange([this](auto oldVal, auto /*newVal*/){ beforeIPChange(oldVal); }), + user->afterCurrIPChange([this, user](auto /*oldVal*/, auto newVal){ afterIPChange(user, newVal); }) + ); } //----------------------------------------------------------------------------- void TraffCounterImpl::UnSetUserNotifiers(UserImpl * user) { -// Removing user. Removing notifiers from user. -std::list::iterator bi; -std::list::iterator ai; - -bi = ipBeforeNotifiers.begin(); -while (bi != ipBeforeNotifiers.end()) - { - if (user->GetLogin() == bi->GetUser()->GetLogin()) - { - user->DelCurrIPBeforeNotifier(&(*bi)); - ipBeforeNotifiers.erase(bi); - break; - } - ++bi; - } - -ai = ipAfterNotifiers.begin(); -while (ai != ipAfterNotifiers.end()) - { - if (user->GetLogin() == ai->GetUser()->GetLogin()) - { - user->DelCurrIPAfterNotifier(&(*ai)); - ipAfterNotifiers.erase(ai); - break; - } - ++ai; - } + // Removing user. Removing notifiers from user. + m_onIPConns.erase(std::remove_if(m_onIPConns.begin(), m_onIPConns.end(), + [user](const auto& cs){ return std::get<0>(cs) == user->GetID(); }), + m_onIPConns.end()); } //----------------------------------------------------------------------------- void TraffCounterImpl::DeterminateDir(const RawPacket & packet, @@ -851,21 +818,21 @@ monitorDir = dir; monitoring = !monitorDir.empty(); } //----------------------------------------------------------------------------- -void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &) +void TraffCounterImpl::beforeIPChange(uint32_t oldVal) { -// User changes his address. Remove old IP -if (!oldValue) - return; + // User changes his address. Remove old IP + if (!oldVal) + return; -AsyncPoolST::enqueue([this, oldValue](){ traffCnt.DelUser(oldValue); }); + AsyncPoolST::enqueue([this, oldVal](){ DelUser(oldVal); }); } //----------------------------------------------------------------------------- -void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue) +void TraffCounterImpl::afterIPChange(UserImpl* user, uint32_t newVal) { -// User changes his address. Add new IP -if (!newValue) - return; + // User changes his address. Add new IP + if (!newVal) + return; -AsyncPoolST::enqueue([this](){ traffCnt.AddUser(user); }); + AsyncPoolST::enqueue([this, user](){ AddUser(user); }); } //-----------------------------------------------------------------------------