X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/c59911ca3cd38cf4ab36d2cc62686f97395899f9..refs/heads/master:/projects/stargazer/traffcounter_impl.cpp?ds=sidebyside diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index ed0f93d5..70ade1ae 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -40,7 +40,6 @@ #include // strtol #include "stg/common.h" -#include "stg/locker.h" #include "stg/const.h" // MONITOR_TIME_DELAY_SEC #include "traffcounter_impl.h" #include "stg_timer.h" @@ -51,10 +50,6 @@ #define REMOVE_TIME (31) using STG::TraffCounterImpl; -using STG::TRF_IP_BEFORE; -using STG::TRF_IP_AFTER; -using STG::ADD_USER_NONIFIER; -using STG::DEL_USER_NONIFIER; namespace AsyncPoolST = STG::AsyncPoolST; @@ -73,17 +68,19 @@ TraffCounterImpl::TraffCounterImpl(UsersImpl * u, const std::string & fn) monitoring(false), touchTimeP(stgTime - MONITOR_TIME_DELAY_SEC), users(u), - stopped(true), - addUserNotifier(*this), - delUserNotifier(*this) + stopped(true) { for (int i = 0; i < DIR_NUM; i++) strprintf(&dirName[i], "DIR%d", i); dirName[DIR_NUM] = "NULL"; -users->AddNotifierUserAdd(&addUserNotifier); -users->AddNotifierUserDel(&delUserNotifier); +m_onAddUserConn = users->onImplAdd([this](auto user){ + AsyncPoolST::enqueue([this, user](){ SetUserNotifiers(user); }); +}); +m_onDelUserConn = users->onImplDel([this](auto user){ + AsyncPoolST::enqueue([this, user](){ UnSetUserNotifiers(user); DelUser(user->GetCurrIP()); }); +}); } //----------------------------------------------------------------------------- TraffCounterImpl::~TraffCounterImpl() @@ -127,15 +124,11 @@ m_thread.request_stop(); int h = users->OpenSearch(); assert(h && "USERS::OpenSearch is always correct"); -UserImpl * u; -while (users->SearchNext(h, &u) == 0) - UnSetUserNotifiers(u); -users->CloseSearch(h); +m_onIPConns.clear(); //5 seconds to thread stops itself -struct timespec ts = {0, 200000000}; for (int i = 0; i < 25 && !stopped; i++) - nanosleep(&ts, NULL); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); if (!stopped) { @@ -160,10 +153,9 @@ stopped = false; int c = 0; time_t touchTime = stgTime - MONITOR_TIME_DELAY_SEC; -struct timespec ts = {0, 500000000}; while (!token.stop_requested()) { - nanosleep(&ts, 0); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); if (token.stop_requested()) { FlushAndRemove(); @@ -445,47 +437,22 @@ while (pi.first != pi.second) ip2packets.erase(pi.first, pi.second); } //----------------------------------------------------------------------------- -void TraffCounterImpl::SetUserNotifiers(UserImpl * user) +void TraffCounterImpl::SetUserNotifiers(UserImpl* user) { -// Adding user. Adding notifiers to user. -TRF_IP_BEFORE ipBNotifier(*this, user); -ipBeforeNotifiers.push_front(ipBNotifier); -user->AddCurrIPBeforeNotifier(&(*ipBeforeNotifiers.begin())); - -TRF_IP_AFTER ipANotifier(*this, user); -ipAfterNotifiers.push_front(ipANotifier); -user->AddCurrIPAfterNotifier(&(*ipAfterNotifiers.begin())); + // Adding user. Adding notifiers to user. + m_onIPConns.emplace_back( + user->GetID(), + user->beforeCurrIPChange([this](auto oldVal, auto /*newVal*/){ beforeIPChange(oldVal); }), + user->afterCurrIPChange([this, user](auto /*oldVal*/, auto newVal){ afterIPChange(user, newVal); }) + ); } //----------------------------------------------------------------------------- void TraffCounterImpl::UnSetUserNotifiers(UserImpl * user) { -// Removing user. Removing notifiers from user. -std::list::iterator bi; -std::list::iterator ai; - -bi = ipBeforeNotifiers.begin(); -while (bi != ipBeforeNotifiers.end()) - { - if (user->GetLogin() == bi->GetUser()->GetLogin()) - { - user->DelCurrIPBeforeNotifier(&(*bi)); - ipBeforeNotifiers.erase(bi); - break; - } - ++bi; - } - -ai = ipAfterNotifiers.begin(); -while (ai != ipAfterNotifiers.end()) - { - if (user->GetLogin() == ai->GetUser()->GetLogin()) - { - user->DelCurrIPAfterNotifier(&(*ai)); - ipAfterNotifiers.erase(ai); - break; - } - ++ai; - } + // Removing user. Removing notifiers from user. + m_onIPConns.erase(std::remove_if(m_onIPConns.begin(), m_onIPConns.end(), + [user](const auto& cs){ return std::get<0>(cs) == user->GetID(); }), + m_onIPConns.end()); } //----------------------------------------------------------------------------- void TraffCounterImpl::DeterminateDir(const RawPacket & packet, @@ -850,32 +817,21 @@ monitorDir = dir; monitoring = !monitorDir.empty(); } //----------------------------------------------------------------------------- -void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &) +void TraffCounterImpl::beforeIPChange(uint32_t oldVal) { -// User changes his address. Remove old IP -if (!oldValue) - return; + // User changes his address. Remove old IP + if (!oldVal) + return; -AsyncPoolST::enqueue([this, oldValue](){ traffCnt.DelUser(oldValue); }); + AsyncPoolST::enqueue([this, oldVal](){ DelUser(oldVal); }); } //----------------------------------------------------------------------------- -void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue) +void TraffCounterImpl::afterIPChange(UserImpl* user, uint32_t newVal) { -// User changes his address. Add new IP -if (!newValue) - return; + // User changes his address. Add new IP + if (!newVal) + return; -AsyncPoolST::enqueue([this](){ traffCnt.AddUser(user); }); -} -//----------------------------------------------------------------------------- -void ADD_USER_NONIFIER::notify(const UserImplPtr & user) -{ -AsyncPoolST::enqueue([this, user](){ traffCnt.SetUserNotifiers(user); }); -} -//----------------------------------------------------------------------------- -void DEL_USER_NONIFIER::notify(const UserImplPtr & user) -{ -AsyncPoolST::enqueue([this, user](){ traffCnt.UnSetUserNotifiers(user); }); -AsyncPoolST::enqueue([this, user](){ traffCnt.DelUser(user->GetCurrIP()); }); + AsyncPoolST::enqueue([this, user](){ AddUser(user); }); } //-----------------------------------------------------------------------------