X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/8e80bb9cec2c90dd61f810fb1525932a434288eb..43ac308ea20014761481bc40525496a0bb1d9740:/projects/stargazer/traffcounter_impl.cpp?ds=sidebyside diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index 0897d6bf..a4227497 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -45,12 +45,15 @@ #include "traffcounter_impl.h" #include "stg_timer.h" #include "users_impl.h" +#include "async_pool.h" #define FLUSH_TIME (10) #define REMOVE_TIME (31) using STG::TraffCounterImpl; +namespace AsyncPoolST = STG::AsyncPoolST; + const char protoName[PROTOMAX][8] = {"TCP", "UDP", "ICMP", "TCP_UDP", "ALL"}; @@ -66,30 +69,28 @@ TraffCounterImpl::TraffCounterImpl(UsersImpl * u, const std::string & fn) monitoring(false), touchTimeP(stgTime - MONITOR_TIME_DELAY_SEC), users(u), - running(false), - stopped(true), - addUserNotifier(*this), - delUserNotifier(*this) + stopped(true) { for (int i = 0; i < DIR_NUM; i++) strprintf(&dirName[i], "DIR%d", i); dirName[DIR_NUM] = "NULL"; -users->AddNotifierUserAdd(&addUserNotifier); -users->AddNotifierUserDel(&delUserNotifier); - -pthread_mutex_init(&mutex, NULL); +m_onAddUserConn = users->onImplAdd([this](auto user){ + AsyncPoolST::enqueue([this, user](){ SetUserNotifiers(user); }); +}); +m_onDelUserConn = users->onImplDel([this](auto user){ + AsyncPoolST::enqueue([this, user](){ UnSetUserNotifiers(user); DelUser(user->GetCurrIP()); }); +}); } //----------------------------------------------------------------------------- TraffCounterImpl::~TraffCounterImpl() { -pthread_mutex_destroy(&mutex); } //----------------------------------------------------------------------------- int TraffCounterImpl::Start() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); if (!stopped) return 0; @@ -110,13 +111,7 @@ while (users->SearchNext(h, &u) == 0) SetUserNotifiers(u); users->CloseSearch(h); -running = true; -if (pthread_create(&thread, NULL, Run, this)) - { - printfd(__FILE__, "TraffCounterImpl::Start() - Cannot start thread\n"); - WriteServLog("TraffCounter: Error: Cannot start thread."); - return -1; - } +m_thread = std::jthread([this](auto token){ Run(std::move(token)); }); return 0; } //----------------------------------------------------------------------------- @@ -125,73 +120,68 @@ int TraffCounterImpl::Stop() if (stopped) return 0; -running = false; +m_thread.request_stop(); int h = users->OpenSearch(); assert(h && "USERS::OpenSearch is always correct"); -UserImpl * u; -while (users->SearchNext(h, &u) == 0) - UnSetUserNotifiers(u); -users->CloseSearch(h); +m_onIPConns.clear(); //5 seconds to thread stops itself struct timespec ts = {0, 200000000}; for (int i = 0; i < 25 && !stopped; i++) - { nanosleep(&ts, NULL); - } if (!stopped) +{ + m_thread.detach(); return -1; +} + +m_thread.join(); printfd(__FILE__, "TraffCounter::Stop()\n"); return 0; } //----------------------------------------------------------------------------- -void * TraffCounterImpl::Run(void * data) +void TraffCounterImpl::Run(std::stop_token token) { sigset_t signalSet; sigfillset(&signalSet); pthread_sigmask(SIG_BLOCK, &signalSet, NULL); -TraffCounterImpl * tc = static_cast(data); -tc->stopped = false; +stopped = false; int c = 0; time_t touchTime = stgTime - MONITOR_TIME_DELAY_SEC; struct timespec ts = {0, 500000000}; -while (tc->running) +while (!token.stop_requested()) { nanosleep(&ts, 0); - if (!tc->running) + if (token.stop_requested()) { - tc->FlushAndRemove(); + FlushAndRemove(); break; } - if (tc->monitoring && (touchTime + MONITOR_TIME_DELAY_SEC <= stgTime)) + if (monitoring && (touchTime + MONITOR_TIME_DELAY_SEC <= stgTime)) { - std::string monFile(tc->monitorDir + "/traffcounter_r"); - printfd(__FILE__, "Monitor=%d file TraffCounter %s\n", tc->monitoring, monFile.c_str()); + std::string monFile(monitorDir + "/traffcounter_r"); + printfd(__FILE__, "Monitor=%d file TraffCounter %s\n", monitoring, monFile.c_str()); touchTime = stgTime; TouchFile(monFile); } if (++c % FLUSH_TIME == 0) - tc->FlushAndRemove(); + FlushAndRemove(); } -tc->stopped = true; -return NULL; +stopped = true; } //----------------------------------------------------------------------------- void TraffCounterImpl::process(const RawPacket & rawPacket) { -if (!running) - return; - if (monitoring && (touchTimeP + MONITOR_TIME_DELAY_SEC <= stgTime)) { std::string monFile = monitorDir + "/traffcounter_p"; @@ -200,7 +190,7 @@ if (monitoring && (touchTimeP + MONITOR_TIME_DELAY_SEC <= stgTime)) TouchFile(monFile); } -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); //printfd(__FILE__, "TraffCounter::Process()\n"); //TODO replace find with lower_bound. @@ -278,7 +268,7 @@ if (ed.userUPresent || //----------------------------------------------------------------------------- void TraffCounterImpl::FlushAndRemove() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); Packets::size_type oldPacketsSize = packets.size(); Index::size_type oldIp2packetsSize = ip2packets.size(); @@ -365,7 +355,7 @@ printfd(__FILE__, "AddUser: %s\n", user->GetLogin().c_str()); uint32_t uip = user->GetCurrIP(); std::pair pi; -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); // Find all packets with IP belongs to this user pi = ip2packets.equal_range(uip); @@ -402,7 +392,7 @@ void TraffCounterImpl::DelUser(uint32_t uip) printfd(__FILE__, "DelUser: %s \n", inet_ntostring(uip).c_str()); std::pair pi; -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); pi = ip2packets.equal_range(uip); while (pi.first != pi.second) @@ -450,47 +440,22 @@ while (pi.first != pi.second) ip2packets.erase(pi.first, pi.second); } //----------------------------------------------------------------------------- -void TraffCounterImpl::SetUserNotifiers(UserImpl * user) +void TraffCounterImpl::SetUserNotifiers(UserImpl* user) { -// Adding user. Adding notifiers to user. -TRF_IP_BEFORE ipBNotifier(*this, user); -ipBeforeNotifiers.push_front(ipBNotifier); -user->AddCurrIPBeforeNotifier(&(*ipBeforeNotifiers.begin())); - -TRF_IP_AFTER ipANotifier(*this, user); -ipAfterNotifiers.push_front(ipANotifier); -user->AddCurrIPAfterNotifier(&(*ipAfterNotifiers.begin())); + // Adding user. Adding notifiers to user. + m_onIPConns.emplace_back( + user->GetID(), + user->beforeCurrIPChange([this](auto oldVal, auto /*newVal*/){ beforeIPChange(oldVal); }), + user->afterCurrIPChange([this, user](auto /*oldVal*/, auto newVal){ afterIPChange(user, newVal); }) + ); } //----------------------------------------------------------------------------- void TraffCounterImpl::UnSetUserNotifiers(UserImpl * user) { -// Removing user. Removing notifiers from user. -std::list::iterator bi; -std::list::iterator ai; - -bi = ipBeforeNotifiers.begin(); -while (bi != ipBeforeNotifiers.end()) - { - if (user->GetLogin() == bi->GetUser()->GetLogin()) - { - user->DelCurrIPBeforeNotifier(&(*bi)); - ipBeforeNotifiers.erase(bi); - break; - } - ++bi; - } - -ai = ipAfterNotifiers.begin(); -while (ai != ipAfterNotifiers.end()) - { - if (user->GetLogin() == ai->GetUser()->GetLogin()) - { - user->DelCurrIPAfterNotifier(&(*ai)); - ipAfterNotifiers.erase(ai); - break; - } - ++ai; - } + // Removing user. Removing notifiers from user. + m_onIPConns.erase(std::remove_if(m_onIPConns.begin(), m_onIPConns.end(), + [user](const auto& cs){ return std::get<0>(cs) == user->GetID(); }), + m_onIPConns.end()); } //----------------------------------------------------------------------------- void TraffCounterImpl::DeterminateDir(const RawPacket & packet, @@ -703,7 +668,7 @@ return false; //----------------------------------------------------------------------------- int TraffCounterImpl::Reload() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); if (ReadRules(true)) { @@ -855,3 +820,21 @@ monitorDir = dir; monitoring = !monitorDir.empty(); } //----------------------------------------------------------------------------- +void TraffCounterImpl::beforeIPChange(uint32_t oldVal) +{ + // User changes his address. Remove old IP + if (!oldVal) + return; + + AsyncPoolST::enqueue([this, oldVal](){ DelUser(oldVal); }); +} +//----------------------------------------------------------------------------- +void TraffCounterImpl::afterIPChange(UserImpl* user, uint32_t newVal) +{ + // User changes his address. Add new IP + if (!newVal) + return; + + AsyncPoolST::enqueue([this, user](){ AddUser(user); }); +} +//-----------------------------------------------------------------------------