X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/c963a109219ed101fa42f501b16f90d7b7b4f3f2..0c097ef3435d2a45c1ee4ac80f8bd3f254fb8df5:/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp?ds=inline diff --git a/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp b/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp index 0e0204da..b4555f7c 100644 --- a/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp +++ b/projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp @@ -27,72 +27,94 @@ $Revision: 1.11 $ $Date: 2010/09/10 06:41:06 $ $Author: faust $ */ -#include -#include -#include -#include -#include + +#include "cap_nf.h" + +#include "stg/common.h" +#include "stg/raw_ip_packet.h" +#include "stg/traffcounter.h" + +#include #include #include #include -#include - -#include "common.h" -#include "cap_nf.h" -#include "raw_ip_packet.h" +#include +#include +#include +#include -#include "../../../traffcounter.h" +using STG::NF_CAP; -class CAP_NF_CREATOR +namespace { -public: - CAP_NF_CREATOR() - : nf(new NF_CAP()) - { - }; - ~CAP_NF_CREATOR() - { - delete nf; - }; +struct NF_HEADER +{ + uint16_t version; // Protocol version + uint16_t count; // Flows count + uint32_t uptime; // System uptime + uint32_t timestamp; // UNIX timestamp + uint32_t nsecs; // Residual nanoseconds + uint32_t flowSeq; // Sequence counter + uint8_t eType; // Engine type + uint8_t eID; // Engine ID + uint16_t sInterval; // Sampling mode and interval +}; + +struct NF_DATA +{ + uint32_t srcAddr; // Flow source address + uint32_t dstAddr; // Flow destination address + uint32_t nextHop; // IP addres on next hop router + uint16_t inSNMP; // SNMP index of input iface + uint16_t outSNMP; // SNMP index of output iface + uint32_t packets; // Packets in flow + uint32_t octets; // Total number of bytes in flow + uint32_t timeStart; // Uptime on first packet in flow + uint32_t timeFinish;// Uptime on last packet in flow + uint16_t srcPort; // Flow source port + uint16_t dstPort; // Flow destination port + uint8_t pad1; // 1-byte padding + uint8_t TCPFlags; // Cumulative OR of TCP flags + uint8_t proto; // IP protocol type (tcp, udp, etc.) + uint8_t tos; // IP Type of Service (ToS) + uint16_t srcAS; // Source BGP autonomous system number + uint16_t dstAS; // Destination BGP autonomus system number + uint8_t srcMask; // Source address mask in "slash" notation + uint8_t dstMask; // Destination address mask in "slash" notation + uint16_t pad2; // 2-byte padding +}; + +#define BUF_SIZE (sizeof(NF_HEADER) + 30 * sizeof(NF_DATA)) - NF_CAP * GetCapturer() { return nf; }; -private: - NF_CAP * nf; -} cnc; +} -PLUGIN * GetPlugin() +extern "C" STG::Plugin* GetPlugin() { -return cnc.GetCapturer(); + static NF_CAP plugin; + return &plugin; } NF_CAP::NF_CAP() : traffCnt(NULL), - tidTCP(0), - tidUDP(0), - runningTCP(false), - runningUDP(false), stoppedTCP(true), stoppedUDP(true), portT(0), portU(0), sockTCP(-1), - sockUDP(-1) -{ -} - -NF_CAP::~NF_CAP() + sockUDP(-1), + logger(PluginLogger::get("cap_nf")) { } int NF_CAP::ParseSettings() { -std::vector::iterator it; +std::vector::iterator it; for (it = settings.moduleParams.begin(); it != settings.moduleParams.end(); ++it) { - if (it->param == "TCPPort") + if (it->param == "TCPPort" && !it->value.empty()) { if (str2x(it->value[0], portT)) { @@ -102,7 +124,7 @@ for (it = settings.moduleParams.begin(); it != settings.moduleParams.end(); ++it } continue; } - if (it->param == "UDPPort") + if (it->param == "UDPPort" && !it->value.empty()) { if (str2x(it->value[0], portU)) { @@ -125,15 +147,7 @@ if (portU > 0) { return -1; } - runningUDP = true; - if (pthread_create(&tidUDP, NULL, RunUDP, this)) - { - runningUDP = false; - CloseUDP(); - errorStr = "Cannot create UDP thread"; - printfd(__FILE__, "Error: Cannot create UDP thread\n"); - return -1; - } + m_threadUDP = std::jthread([this](auto token){ RunUDP(std::move(token)); }); } if (portT > 0) { @@ -141,42 +155,32 @@ if (portT > 0) { return -1; } - runningTCP = true; - if (pthread_create(&tidTCP, NULL, RunTCP, this)) - { - runningTCP = false; - CloseTCP(); - errorStr = "Cannot create TCP thread"; - printfd(__FILE__, "Error: Cannot create TCP thread\n"); - return -1; - } + m_threadTCP = std::jthread([this](auto token){ RunTCP(std::move(token)); }); } return 0; } int NF_CAP::Stop() { -runningTCP = runningUDP = false; +m_threadTCP.request_stop(); +m_threadUDP.request_stop(); if (portU && !stoppedUDP) { CloseUDP(); for (int i = 0; i < 25 && !stoppedUDP; ++i) { - usleep(200000); + struct timespec ts = {0, 200000000}; + nanosleep(&ts, NULL); } if (stoppedUDP) { - pthread_join(tidUDP, NULL); + m_threadUDP.join(); } else { - if (pthread_kill(tidUDP, SIGUSR1)) - { - errorStr = "Error sending signal to UDP thread"; - printfd(__FILE__, "Error: Error sending signal to UDP thread\n"); - return -1; - } + m_threadUDP.detach(); printfd(__FILE__, "UDP thread NOT stopped\n"); + logger("Cannot stop UDP thread."); } } if (portT && !stoppedTCP) @@ -184,21 +188,18 @@ if (portT && !stoppedTCP) CloseTCP(); for (int i = 0; i < 25 && !stoppedTCP; ++i) { - usleep(200000); + struct timespec ts = {0, 200000000}; + nanosleep(&ts, NULL); } if (stoppedTCP) { - pthread_join(tidTCP, NULL); + m_threadTCP.join(); } else { - if (pthread_kill(tidTCP, SIGUSR1)) - { - errorStr = "Error sending signal to TCP thread"; - printfd(__FILE__, "Error: Error sending signal to TCP thread\n"); - return -1; - } + m_threadTCP.detach(); printfd(__FILE__, "TCP thread NOT stopped\n"); + logger("Cannot stop TCP thread."); } } return 0; @@ -211,15 +212,17 @@ sockUDP = socket(PF_INET, SOCK_DGRAM, 0); if (sockUDP <= 0) { errorStr = "Error opening UDP socket"; + logger("Cannot create UDP socket: %s", strerror(errno)); printfd(__FILE__, "Error: Error opening UDP socket\n"); return true; } sin.sin_family = AF_INET; sin.sin_port = htons(portU); sin.sin_addr.s_addr = inet_addr("0.0.0.0"); -if (bind(sockUDP, (struct sockaddr *)&sin, sizeof(sin))) +if (bind(sockUDP, reinterpret_cast(&sin), sizeof(sin))) { errorStr = "Error binding UDP socket"; + logger("Cannot bind UDP socket: %s", strerror(errno)); printfd(__FILE__, "Error: Error binding UDP socket\n"); return true; } @@ -233,114 +236,121 @@ sockTCP = socket(PF_INET, SOCK_STREAM, 0); if (sockTCP <= 0) { errorStr = "Error opening TCP socket"; + logger("Cannot create TCP socket: %s", strerror(errno)); printfd(__FILE__, "Error: Error opening TCP socket\n"); return true; } sin.sin_family = AF_INET; sin.sin_port = htons(portT); sin.sin_addr.s_addr = inet_addr("0.0.0.0"); -if (bind(sockTCP, (struct sockaddr *)&sin, sizeof(sin))) +if (bind(sockTCP, reinterpret_cast(&sin), sizeof(sin))) { errorStr = "Error binding TCP socket"; + logger("Cannot bind TCP socket: %s", strerror(errno)); printfd(__FILE__, "Error: Error binding TCP socket\n"); return true; } if (listen(sockTCP, 1)) { errorStr = "Error listening on TCP socket"; + logger("Cannot listen on TCP socket: %s", strerror(errno)); printfd(__FILE__, "Error: Error listening TCP socket\n"); return true; } return false; } -void * NF_CAP::RunUDP(void * c) +void NF_CAP::RunUDP(std::stop_token token) noexcept { -NF_CAP * cap = static_cast(c); -uint8_t buf[BUF_SIZE]; -int res; -struct sockaddr_in sin; -socklen_t slen; -cap->stoppedUDP = false; -while (cap->runningUDP) +sigset_t signalSet; +sigfillset(&signalSet); +pthread_sigmask(SIG_BLOCK, &signalSet, NULL); + +stoppedUDP = false; +while (!token.stop_requested()) { - if (!cap->WaitPackets(cap->sockUDP)) + if (!WaitPackets(sockUDP)) { continue; } // Data - slen = sizeof(sin); - res = recvfrom(cap->sockUDP, buf, BUF_SIZE, 0, reinterpret_cast(&sin), &slen); - if (!cap->runningUDP) + struct sockaddr_in sin; + socklen_t slen = sizeof(sin); + uint8_t buf[BUF_SIZE]; + ssize_t res = recvfrom(sockUDP, buf, BUF_SIZE, 0, reinterpret_cast(&sin), &slen); + if (token.stop_requested()) break; - if (res == 0) // EOF + if (res < 0) { + logger("recvfrom error: %s", strerror(errno)); continue; } + if (res == 0) // EOF + { + continue; + } - // Wrong logic! - // Need to check actual data length and wait all data to receive if (res < 24) { if (errno != EINTR) { - cap->errorStr = "Invalid data received"; + errorStr = "Invalid data received"; printfd(__FILE__, "Error: Invalid data received through UDP\n"); } continue; } - cap->ParseBuffer(buf, res); + ParseBuffer(buf, res); } -cap->stoppedUDP = true; -return NULL; +stoppedUDP = true; } -void * NF_CAP::RunTCP(void * c) +void NF_CAP::RunTCP(std::stop_token token) noexcept { -NF_CAP * cap = static_cast(c); -uint8_t buf[BUF_SIZE]; -int res; -int sd; -struct sockaddr_in sin; -socklen_t slen; -cap->stoppedTCP = false; -while (cap->runningTCP) +sigset_t signalSet; +sigfillset(&signalSet); +pthread_sigmask(SIG_BLOCK, &signalSet, NULL); + +stoppedTCP = false; +while (!token.stop_requested()) { - if (!cap->WaitPackets(cap->sockTCP)) + if (!WaitPackets(sockTCP)) { continue; } // Data - slen = sizeof(sin); - sd = accept(cap->sockTCP, reinterpret_cast(&sin), &slen); - if (!cap->runningTCP) + struct sockaddr_in sin; + socklen_t slen = sizeof(sin); + int sd = accept(sockTCP, reinterpret_cast(&sin), &slen); + if (token.stop_requested()) break; if (sd <= 0) { - if (errno != EINTR) - { - cap->errorStr = "Error accepting connection"; - printfd(__FILE__, "Error: Error accepting connection\n"); - } + if (sd < 0) + logger("accept error: %s", strerror(errno)); continue; } - if (!cap->WaitPackets(sd)) + if (!WaitPackets(sd)) { close(sd); continue; } - res = recv(sd, buf, BUF_SIZE, MSG_WAITALL); + uint8_t buf[BUF_SIZE]; + ssize_t res = recv(sd, buf, BUF_SIZE, MSG_WAITALL); + + if (res < 0) + logger("recv error: %s", strerror(errno)); + close(sd); - if (!cap->runningTCP) + if (token.stop_requested()) break; if (res == 0) // EOF @@ -352,23 +362,17 @@ while (cap->runningTCP) // Need to check actual data length and wait all data to receive if (res < 24) { - if (errno != EINTR) - { - cap->errorStr = "Invalid data received"; - printfd(__FILE__, "Error: Invalid data received through TCP\n"); - } continue; } - cap->ParseBuffer(buf, res); + ParseBuffer(buf, res); } -cap->stoppedTCP = true; -return NULL; +stoppedTCP = true; } -void NF_CAP::ParseBuffer(uint8_t * buf, int size) +void NF_CAP::ParseBuffer(uint8_t * buf, ssize_t size) { -RAW_PACKET ip; +RawPacket ip; NF_HEADER * hdr = reinterpret_cast(buf); if (htons(hdr->version) != 5) { @@ -392,51 +396,15 @@ for (int i = 0; i < packets; ++i) { NF_DATA * data = reinterpret_cast(buf + 24 + i * 48); - /*ip.pckt[0] = 4 << 4; - ip.pckt[0] |= 5; - ip.pckt[9] = data->proto; + ip.rawPacket.header.ipHeader.ip_v = 4; + ip.rawPacket.header.ipHeader.ip_hl = 5; + ip.rawPacket.header.ipHeader.ip_p = data->proto; ip.dataLen = ntohl(data->octets); - *(uint32_t *)(ip.pckt + 12) = data->srcAddr; - *(uint32_t *)(ip.pckt + 16) = data->dstAddr; - *(uint16_t *)(ip.pckt + 20) = data->srcPort; - *(uint16_t *)(ip.pckt + 22) = data->dstPort;*/ - ip.header.ipHeader.ip_v = 4; - ip.header.ipHeader.ip_hl = 5; - ip.header.ipHeader.ip_p = data->proto; - ip.dataLen = ntohl(data->octets); - ip.header.ipHeader.ip_src.s_addr = data->srcAddr; - ip.header.ipHeader.ip_dst.s_addr = data->dstAddr; - ip.header.sPort = data->srcPort; - ip.header.dPort = data->dstPort; + ip.rawPacket.header.ipHeader.ip_src.s_addr = data->srcAddr; + ip.rawPacket.header.ipHeader.ip_dst.s_addr = data->dstAddr; + ip.rawPacket.header.sPort = data->srcPort; + ip.rawPacket.header.dPort = data->dstPort; - traffCnt->Process(ip); + traffCnt->process(ip); } } - -bool NF_CAP::WaitPackets(int sd) const -{ -fd_set rfds; -FD_ZERO(&rfds); -FD_SET(sd, &rfds); - -struct timeval tv; -tv.tv_sec = 0; -tv.tv_usec = 500000; - -int res = select(sd + 1, &rfds, NULL, NULL, &tv); -if (res == -1) // Error - { - if (errno != EINTR) - { - printfd(__FILE__, "Error on select: '%s'\n", strerror(errno)); - } - return false; - } - -if (res == 0) // Timeout - { - return false; - } - -return true; -}