X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/8e80bb9cec2c90dd61f810fb1525932a434288eb..0c097ef3435d2a45c1ee4ac80f8bd3f254fb8df5:/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp diff --git a/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp b/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp index 817ed715..b4555f7c 100644 --- a/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp +++ b/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp @@ -45,10 +45,13 @@ $Author: faust $ #include #include +using STG::NF_CAP; + namespace { -struct NF_HEADER { +struct NF_HEADER +{ uint16_t version; // Protocol version uint16_t count; // Flows count uint32_t uptime; // System uptime @@ -60,7 +63,8 @@ struct NF_HEADER { uint16_t sInterval; // Sampling mode and interval }; -struct NF_DATA { +struct NF_DATA +{ uint32_t srcAddr; // Flow source address uint32_t dstAddr; // Flow destination address uint32_t nextHop; // IP addres on next hop router @@ -95,21 +99,19 @@ extern "C" STG::Plugin* GetPlugin() NF_CAP::NF_CAP() : traffCnt(NULL), - runningTCP(false), - runningUDP(false), stoppedTCP(true), stoppedUDP(true), portT(0), portU(0), sockTCP(-1), sockUDP(-1), - logger(STG::PluginLogger::get("cap_nf")) + logger(PluginLogger::get("cap_nf")) { } int NF_CAP::ParseSettings() { -std::vector::iterator it; +std::vector::iterator it; for (it = settings.moduleParams.begin(); it != settings.moduleParams.end(); ++it) { if (it->param == "TCPPort" && !it->value.empty()) @@ -145,16 +147,7 @@ if (portU > 0) { return -1; } - runningUDP = true; - if (pthread_create(&tidUDP, NULL, RunUDP, this)) - { - runningUDP = false; - CloseUDP(); - errorStr = "Cannot create UDP thread"; - logger("Cannot create UDP thread."); - printfd(__FILE__, "Error: Cannot create UDP thread\n"); - return -1; - } + m_threadUDP = std::jthread([this](auto token){ RunUDP(std::move(token)); }); } if (portT > 0) { @@ -162,23 +155,15 @@ if (portT > 0) { return -1; } - runningTCP = true; - if (pthread_create(&tidTCP, NULL, RunTCP, this)) - { - runningTCP = false; - CloseTCP(); - logger("Cannot create TCP thread."); - errorStr = "Cannot create TCP thread"; - printfd(__FILE__, "Error: Cannot create TCP thread\n"); - return -1; - } + m_threadTCP = std::jthread([this](auto token){ RunTCP(std::move(token)); }); } return 0; } int NF_CAP::Stop() { -runningTCP = runningUDP = false; +m_threadTCP.request_stop(); +m_threadUDP.request_stop(); if (portU && !stoppedUDP) { CloseUDP(); @@ -189,17 +174,11 @@ if (portU && !stoppedUDP) } if (stoppedUDP) { - pthread_join(tidUDP, NULL); + m_threadUDP.join(); } else { - if (pthread_kill(tidUDP, SIGUSR1)) - { - errorStr = "Error sending signal to UDP thread"; - logger("Error sending sugnal to UDP thread."); - printfd(__FILE__, "Error: Error sending signal to UDP thread\n"); - return -1; - } + m_threadUDP.detach(); printfd(__FILE__, "UDP thread NOT stopped\n"); logger("Cannot stop UDP thread."); } @@ -214,17 +193,11 @@ if (portT && !stoppedTCP) } if (stoppedTCP) { - pthread_join(tidTCP, NULL); + m_threadTCP.join(); } else { - if (pthread_kill(tidTCP, SIGUSR1)) - { - errorStr = "Error sending signal to TCP thread"; - logger("Error sending signal to TCP thread."); - printfd(__FILE__, "Error: Error sending signal to TCP thread\n"); - return -1; - } + m_threadTCP.detach(); printfd(__FILE__, "TCP thread NOT stopped\n"); logger("Cannot stop TCP thread."); } @@ -246,7 +219,7 @@ if (sockUDP <= 0) sin.sin_family = AF_INET; sin.sin_port = htons(portU); sin.sin_addr.s_addr = inet_addr("0.0.0.0"); -if (bind(sockUDP, (struct sockaddr *)&sin, sizeof(sin))) +if (bind(sockUDP, reinterpret_cast(&sin), sizeof(sin))) { errorStr = "Error binding UDP socket"; logger("Cannot bind UDP socket: %s", strerror(errno)); @@ -270,7 +243,7 @@ if (sockTCP <= 0) sin.sin_family = AF_INET; sin.sin_port = htons(portT); sin.sin_addr.s_addr = inet_addr("0.0.0.0"); -if (bind(sockTCP, (struct sockaddr *)&sin, sizeof(sin))) +if (bind(sockTCP, reinterpret_cast(&sin), sizeof(sin))) { errorStr = "Error binding TCP socket"; logger("Cannot bind TCP socket: %s", strerror(errno)); @@ -287,17 +260,16 @@ if (listen(sockTCP, 1)) return false; } -void * NF_CAP::RunUDP(void * c) +void NF_CAP::RunUDP(std::stop_token token) noexcept { sigset_t signalSet; sigfillset(&signalSet); pthread_sigmask(SIG_BLOCK, &signalSet, NULL); -NF_CAP * cap = static_cast(c); -cap->stoppedUDP = false; -while (cap->runningUDP) +stoppedUDP = false; +while (!token.stop_requested()) { - if (!WaitPackets(cap->sockUDP)) + if (!WaitPackets(sockUDP)) { continue; } @@ -306,13 +278,13 @@ while (cap->runningUDP) struct sockaddr_in sin; socklen_t slen = sizeof(sin); uint8_t buf[BUF_SIZE]; - ssize_t res = recvfrom(cap->sockUDP, buf, BUF_SIZE, 0, reinterpret_cast(&sin), &slen); - if (!cap->runningUDP) + ssize_t res = recvfrom(sockUDP, buf, BUF_SIZE, 0, reinterpret_cast(&sin), &slen); + if (token.stop_requested()) break; if (res < 0) { - cap->logger("recvfrom error: %s", strerror(errno)); + logger("recvfrom error: %s", strerror(errno)); continue; } @@ -325,29 +297,27 @@ while (cap->runningUDP) { if (errno != EINTR) { - cap->errorStr = "Invalid data received"; + errorStr = "Invalid data received"; printfd(__FILE__, "Error: Invalid data received through UDP\n"); } continue; } - cap->ParseBuffer(buf, res); + ParseBuffer(buf, res); } -cap->stoppedUDP = true; -return NULL; +stoppedUDP = true; } -void * NF_CAP::RunTCP(void * c) +void NF_CAP::RunTCP(std::stop_token token) noexcept { sigset_t signalSet; sigfillset(&signalSet); pthread_sigmask(SIG_BLOCK, &signalSet, NULL); -NF_CAP * cap = static_cast(c); -cap->stoppedTCP = false; -while (cap->runningTCP) +stoppedTCP = false; +while (!token.stop_requested()) { - if (!WaitPackets(cap->sockTCP)) + if (!WaitPackets(sockTCP)) { continue; } @@ -355,14 +325,14 @@ while (cap->runningTCP) // Data struct sockaddr_in sin; socklen_t slen = sizeof(sin); - int sd = accept(cap->sockTCP, reinterpret_cast(&sin), &slen); - if (!cap->runningTCP) + int sd = accept(sockTCP, reinterpret_cast(&sin), &slen); + if (token.stop_requested()) break; if (sd <= 0) { if (sd < 0) - cap->logger("accept error: %s", strerror(errno)); + logger("accept error: %s", strerror(errno)); continue; } @@ -376,11 +346,11 @@ while (cap->runningTCP) ssize_t res = recv(sd, buf, BUF_SIZE, MSG_WAITALL); if (res < 0) - cap->logger("recv error: %s", strerror(errno)); + logger("recv error: %s", strerror(errno)); close(sd); - if (!cap->runningTCP) + if (token.stop_requested()) break; if (res == 0) // EOF @@ -395,15 +365,14 @@ while (cap->runningTCP) continue; } - cap->ParseBuffer(buf, res); + ParseBuffer(buf, res); } -cap->stoppedTCP = true; -return NULL; +stoppedTCP = true; } void NF_CAP::ParseBuffer(uint8_t * buf, ssize_t size) { -STG::RawPacket ip; +RawPacket ip; NF_HEADER * hdr = reinterpret_cast(buf); if (htons(hdr->version) != 5) {