X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/8e80bb9cec2c90dd61f810fb1525932a434288eb..1cd2b12bd4e4d86f6cd099240795f3ebeb3852b3:/projects/stargazer/traffcounter_impl.cpp diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index 0897d6bf..44f2e558 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -45,11 +45,16 @@ #include "traffcounter_impl.h" #include "stg_timer.h" #include "users_impl.h" +#include "async_pool.h" #define FLUSH_TIME (10) #define REMOVE_TIME (31) using STG::TraffCounterImpl; +using STG::TRF_IP_BEFORE; +using STG::TRF_IP_AFTER; + +namespace AsyncPoolST = STG::AsyncPoolST; const char protoName[PROTOMAX][8] = {"TCP", "UDP", "ICMP", "TCP_UDP", "ALL"}; @@ -66,30 +71,29 @@ TraffCounterImpl::TraffCounterImpl(UsersImpl * u, const std::string & fn) monitoring(false), touchTimeP(stgTime - MONITOR_TIME_DELAY_SEC), users(u), - running(false), - stopped(true), - addUserNotifier(*this), - delUserNotifier(*this) + stopped(true) { for (int i = 0; i < DIR_NUM; i++) strprintf(&dirName[i], "DIR%d", i); dirName[DIR_NUM] = "NULL"; -users->AddNotifierUserAdd(&addUserNotifier); -users->AddNotifierUserDel(&delUserNotifier); - -pthread_mutex_init(&mutex, NULL); +m_onAddUserConn = users->onImplAdd([this](auto user){ + AsyncPoolST::enqueue([this, user](){ SetUserNotifiers(user); }); +}); +m_onDelUserConn = users->onImplDel([this](auto user){ + AsyncPoolST::enqueue([this, user](){ UnSetUserNotifiers(user); }); + AsyncPoolST::enqueue([this, user](){ DelUser(user->GetCurrIP()); }); +}); } //----------------------------------------------------------------------------- TraffCounterImpl::~TraffCounterImpl() { -pthread_mutex_destroy(&mutex); } //----------------------------------------------------------------------------- int TraffCounterImpl::Start() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); if (!stopped) return 0; @@ -110,13 +114,7 @@ while (users->SearchNext(h, &u) == 0) SetUserNotifiers(u); users->CloseSearch(h); -running = true; -if (pthread_create(&thread, NULL, Run, this)) - { - printfd(__FILE__, "TraffCounterImpl::Start() - Cannot start thread\n"); - WriteServLog("TraffCounter: Error: Cannot start thread."); - return -1; - } +m_thread = std::jthread([this](auto token){ Run(std::move(token)); }); return 0; } //----------------------------------------------------------------------------- @@ -125,7 +123,7 @@ int TraffCounterImpl::Stop() if (stopped) return 0; -running = false; +m_thread.request_stop(); int h = users->OpenSearch(); assert(h && "USERS::OpenSearch is always correct"); @@ -138,60 +136,58 @@ users->CloseSearch(h); //5 seconds to thread stops itself struct timespec ts = {0, 200000000}; for (int i = 0; i < 25 && !stopped; i++) - { nanosleep(&ts, NULL); - } if (!stopped) +{ + m_thread.detach(); return -1; +} + +m_thread.join(); printfd(__FILE__, "TraffCounter::Stop()\n"); return 0; } //----------------------------------------------------------------------------- -void * TraffCounterImpl::Run(void * data) +void TraffCounterImpl::Run(std::stop_token token) { sigset_t signalSet; sigfillset(&signalSet); pthread_sigmask(SIG_BLOCK, &signalSet, NULL); -TraffCounterImpl * tc = static_cast(data); -tc->stopped = false; +stopped = false; int c = 0; time_t touchTime = stgTime - MONITOR_TIME_DELAY_SEC; struct timespec ts = {0, 500000000}; -while (tc->running) +while (!token.stop_requested()) { nanosleep(&ts, 0); - if (!tc->running) + if (token.stop_requested()) { - tc->FlushAndRemove(); + FlushAndRemove(); break; } - if (tc->monitoring && (touchTime + MONITOR_TIME_DELAY_SEC <= stgTime)) + if (monitoring && (touchTime + MONITOR_TIME_DELAY_SEC <= stgTime)) { - std::string monFile(tc->monitorDir + "/traffcounter_r"); - printfd(__FILE__, "Monitor=%d file TraffCounter %s\n", tc->monitoring, monFile.c_str()); + std::string monFile(monitorDir + "/traffcounter_r"); + printfd(__FILE__, "Monitor=%d file TraffCounter %s\n", monitoring, monFile.c_str()); touchTime = stgTime; TouchFile(monFile); } if (++c % FLUSH_TIME == 0) - tc->FlushAndRemove(); + FlushAndRemove(); } -tc->stopped = true; -return NULL; +stopped = true; } //----------------------------------------------------------------------------- void TraffCounterImpl::process(const RawPacket & rawPacket) { -if (!running) - return; - if (monitoring && (touchTimeP + MONITOR_TIME_DELAY_SEC <= stgTime)) { std::string monFile = monitorDir + "/traffcounter_p"; @@ -200,7 +196,7 @@ if (monitoring && (touchTimeP + MONITOR_TIME_DELAY_SEC <= stgTime)) TouchFile(monFile); } -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); //printfd(__FILE__, "TraffCounter::Process()\n"); //TODO replace find with lower_bound. @@ -278,7 +274,7 @@ if (ed.userUPresent || //----------------------------------------------------------------------------- void TraffCounterImpl::FlushAndRemove() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); Packets::size_type oldPacketsSize = packets.size(); Index::size_type oldIp2packetsSize = ip2packets.size(); @@ -365,7 +361,7 @@ printfd(__FILE__, "AddUser: %s\n", user->GetLogin().c_str()); uint32_t uip = user->GetCurrIP(); std::pair pi; -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); // Find all packets with IP belongs to this user pi = ip2packets.equal_range(uip); @@ -402,7 +398,7 @@ void TraffCounterImpl::DelUser(uint32_t uip) printfd(__FILE__, "DelUser: %s \n", inet_ntostring(uip).c_str()); std::pair pi; -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); pi = ip2packets.equal_range(uip); while (pi.first != pi.second) @@ -703,7 +699,7 @@ return false; //----------------------------------------------------------------------------- int TraffCounterImpl::Reload() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); if (ReadRules(true)) { @@ -855,3 +851,21 @@ monitorDir = dir; monitoring = !monitorDir.empty(); } //----------------------------------------------------------------------------- +void TRF_IP_BEFORE::notify(const uint32_t & oldValue, const uint32_t &) +{ +// User changes his address. Remove old IP +if (!oldValue) + return; + +AsyncPoolST::enqueue([this, oldValue](){ traffCnt.DelUser(oldValue); }); +} +//----------------------------------------------------------------------------- +void TRF_IP_AFTER::notify(const uint32_t &, const uint32_t & newValue) +{ +// User changes his address. Add new IP +if (!newValue) + return; + +AsyncPoolST::enqueue([this](){ traffCnt.AddUser(user); }); +} +//-----------------------------------------------------------------------------