X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/0907aa4037b12b6b88ee24495d4577a064d4f8db..b9bafe1042839cd7295adf84448b4d08de9f5f46:/projects/stargazer/traffcounter_impl.cpp diff --git a/projects/stargazer/traffcounter_impl.cpp b/projects/stargazer/traffcounter_impl.cpp index 0897d6bf..377e3889 100644 --- a/projects/stargazer/traffcounter_impl.cpp +++ b/projects/stargazer/traffcounter_impl.cpp @@ -66,7 +66,6 @@ TraffCounterImpl::TraffCounterImpl(UsersImpl * u, const std::string & fn) monitoring(false), touchTimeP(stgTime - MONITOR_TIME_DELAY_SEC), users(u), - running(false), stopped(true), addUserNotifier(*this), delUserNotifier(*this) @@ -78,18 +77,15 @@ dirName[DIR_NUM] = "NULL"; users->AddNotifierUserAdd(&addUserNotifier); users->AddNotifierUserDel(&delUserNotifier); - -pthread_mutex_init(&mutex, NULL); } //----------------------------------------------------------------------------- TraffCounterImpl::~TraffCounterImpl() { -pthread_mutex_destroy(&mutex); } //----------------------------------------------------------------------------- int TraffCounterImpl::Start() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); if (!stopped) return 0; @@ -110,13 +106,7 @@ while (users->SearchNext(h, &u) == 0) SetUserNotifiers(u); users->CloseSearch(h); -running = true; -if (pthread_create(&thread, NULL, Run, this)) - { - printfd(__FILE__, "TraffCounterImpl::Start() - Cannot start thread\n"); - WriteServLog("TraffCounter: Error: Cannot start thread."); - return -1; - } +m_thread = std::jthread([this](auto token){ Run(std::move(token)); }); return 0; } //----------------------------------------------------------------------------- @@ -125,7 +115,7 @@ int TraffCounterImpl::Stop() if (stopped) return 0; -running = false; +m_thread.request_stop(); int h = users->OpenSearch(); assert(h && "USERS::OpenSearch is always correct"); @@ -138,60 +128,58 @@ users->CloseSearch(h); //5 seconds to thread stops itself struct timespec ts = {0, 200000000}; for (int i = 0; i < 25 && !stopped; i++) - { nanosleep(&ts, NULL); - } if (!stopped) +{ + m_thread.detach(); return -1; +} + +m_thread.join(); printfd(__FILE__, "TraffCounter::Stop()\n"); return 0; } //----------------------------------------------------------------------------- -void * TraffCounterImpl::Run(void * data) +void TraffCounterImpl::Run(std::stop_token token) { sigset_t signalSet; sigfillset(&signalSet); pthread_sigmask(SIG_BLOCK, &signalSet, NULL); -TraffCounterImpl * tc = static_cast(data); -tc->stopped = false; +stopped = false; int c = 0; time_t touchTime = stgTime - MONITOR_TIME_DELAY_SEC; struct timespec ts = {0, 500000000}; -while (tc->running) +while (!token.stop_requested()) { nanosleep(&ts, 0); - if (!tc->running) + if (token.stop_requested()) { - tc->FlushAndRemove(); + FlushAndRemove(); break; } - if (tc->monitoring && (touchTime + MONITOR_TIME_DELAY_SEC <= stgTime)) + if (monitoring && (touchTime + MONITOR_TIME_DELAY_SEC <= stgTime)) { - std::string monFile(tc->monitorDir + "/traffcounter_r"); - printfd(__FILE__, "Monitor=%d file TraffCounter %s\n", tc->monitoring, monFile.c_str()); + std::string monFile(monitorDir + "/traffcounter_r"); + printfd(__FILE__, "Monitor=%d file TraffCounter %s\n", monitoring, monFile.c_str()); touchTime = stgTime; TouchFile(monFile); } if (++c % FLUSH_TIME == 0) - tc->FlushAndRemove(); + FlushAndRemove(); } -tc->stopped = true; -return NULL; +stopped = true; } //----------------------------------------------------------------------------- void TraffCounterImpl::process(const RawPacket & rawPacket) { -if (!running) - return; - if (monitoring && (touchTimeP + MONITOR_TIME_DELAY_SEC <= stgTime)) { std::string monFile = monitorDir + "/traffcounter_p"; @@ -200,7 +188,7 @@ if (monitoring && (touchTimeP + MONITOR_TIME_DELAY_SEC <= stgTime)) TouchFile(monFile); } -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); //printfd(__FILE__, "TraffCounter::Process()\n"); //TODO replace find with lower_bound. @@ -278,7 +266,7 @@ if (ed.userUPresent || //----------------------------------------------------------------------------- void TraffCounterImpl::FlushAndRemove() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); Packets::size_type oldPacketsSize = packets.size(); Index::size_type oldIp2packetsSize = ip2packets.size(); @@ -365,7 +353,7 @@ printfd(__FILE__, "AddUser: %s\n", user->GetLogin().c_str()); uint32_t uip = user->GetCurrIP(); std::pair pi; -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); // Find all packets with IP belongs to this user pi = ip2packets.equal_range(uip); @@ -402,7 +390,7 @@ void TraffCounterImpl::DelUser(uint32_t uip) printfd(__FILE__, "DelUser: %s \n", inet_ntostring(uip).c_str()); std::pair pi; -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); pi = ip2packets.equal_range(uip); while (pi.first != pi.second) @@ -703,7 +691,7 @@ return false; //----------------------------------------------------------------------------- int TraffCounterImpl::Reload() { -STG_LOCKER lock(&mutex); +std::lock_guard lock(m_mutex); if (ReadRules(true)) {