]> git.stg.codes - stg.git/blob - projects/stargazer/plugins/capture/cap_nf/cap_nf.cpp
More std::jthread stuff.
[stg.git] / projects / stargazer / plugins / capture / cap_nf / cap_nf.cpp
1 /*
2  *    This program is free software; you can redistribute it and/or modify
3  *    it under the terms of the GNU General Public License as published by
4  *    the Free Software Foundation; either version 2 of the License, or
5  *    (at your option) any later version.
6  *
7  *    This program is distributed in the hope that it will be useful,
8  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  *    GNU General Public License for more details.
11  *
12  *    You should have received a copy of the GNU General Public License
13  *    along with this program; if not, write to the Free Software
14  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15  */
16
17 /*
18 Date: 16.05.2008
19 */
20
21 /*
22 * Author : Maxim Mamontov <faust@stg.dp.ua>
23 */
24
25 /*
26 $Revision: 1.11 $
27 $Date: 2010/09/10 06:41:06 $
28 $Author: faust $
29 */
30
31 #include "cap_nf.h"
32
33 #include "stg/common.h"
34 #include "stg/raw_ip_packet.h"
35 #include "stg/traffcounter.h"
36
37 #include <vector>
38
39 #include <csignal>
40 #include <cerrno>
41 #include <cstring>
42
43 #include <sys/types.h>
44 #include <sys/socket.h>
45 #include <netinet/in.h>
46 #include <arpa/inet.h>
47
48 using STG::NF_CAP;
49
50 namespace
51 {
52
53 struct NF_HEADER
54 {
55     uint16_t version;   // Protocol version
56     uint16_t count;     // Flows count
57     uint32_t uptime;    // System uptime
58     uint32_t timestamp; // UNIX timestamp
59     uint32_t nsecs;     // Residual nanoseconds
60     uint32_t flowSeq;   // Sequence counter
61     uint8_t  eType;     // Engine type
62     uint8_t  eID;       // Engine ID
63     uint16_t sInterval; // Sampling mode and interval
64 };
65
66 struct NF_DATA
67 {
68     uint32_t srcAddr;   // Flow source address
69     uint32_t dstAddr;   // Flow destination address
70     uint32_t nextHop;   // IP addres on next hop router
71     uint16_t inSNMP;    // SNMP index of input iface
72     uint16_t outSNMP;   // SNMP index of output iface
73     uint32_t packets;   // Packets in flow
74     uint32_t octets;    // Total number of bytes in flow
75     uint32_t timeStart; // Uptime on first packet in flow
76     uint32_t timeFinish;// Uptime on last packet in flow
77     uint16_t srcPort;   // Flow source port
78     uint16_t dstPort;   // Flow destination port
79     uint8_t  pad1;      // 1-byte padding
80     uint8_t  TCPFlags;  // Cumulative OR of TCP flags
81     uint8_t  proto;     // IP protocol type (tcp, udp, etc.)
82     uint8_t  tos;       // IP Type of Service (ToS)
83     uint16_t srcAS;     // Source BGP autonomous system number
84     uint16_t dstAS;     // Destination BGP autonomus system number
85     uint8_t  srcMask;   // Source address mask in "slash" notation
86     uint8_t  dstMask;   // Destination address mask in "slash" notation
87     uint16_t pad2;      // 2-byte padding
88 };
89
90 #define BUF_SIZE (sizeof(NF_HEADER) + 30 * sizeof(NF_DATA))
91
92 }
93
94 extern "C" STG::Plugin* GetPlugin()
95 {
96     static NF_CAP plugin;
97     return &plugin;
98 }
99
100 NF_CAP::NF_CAP()
101     : traffCnt(NULL),
102       stoppedTCP(true),
103       stoppedUDP(true),
104       portT(0),
105       portU(0),
106       sockTCP(-1),
107       sockUDP(-1),
108       logger(PluginLogger::get("cap_nf"))
109 {
110 }
111
112 int NF_CAP::ParseSettings()
113 {
114 std::vector<ParamValue>::iterator it;
115 for (it = settings.moduleParams.begin(); it != settings.moduleParams.end(); ++it)
116     {
117     if (it->param == "TCPPort" && !it->value.empty())
118         {
119         if (str2x(it->value[0], portT))
120             {
121             errorStr = "Invalid TCPPort value";
122             printfd(__FILE__, "Error: Invalid TCPPort value\n");
123             return -1;
124             }
125         continue;
126         }
127     if (it->param == "UDPPort" && !it->value.empty())
128         {
129         if (str2x(it->value[0], portU))
130             {
131             errorStr = "Invalid UDPPort value";
132             printfd(__FILE__, "Error: Invalid UDPPort value\n");
133             return -1;
134             }
135         continue;
136         }
137     printfd(__FILE__, "'%s' is not a valid module param\n", it->param.c_str());
138     }
139 return 0;
140 }
141
142 int NF_CAP::Start()
143 {
144 if (portU > 0)
145     {
146     if (OpenUDP())
147         {
148         return -1;
149         }
150     m_threadUDP = std::jthread([this](auto token){ RunUDP(std::move(token)); });
151     }
152 if (portT > 0)
153     {
154     if (OpenTCP())
155         {
156         return -1;
157         }
158     m_threadTCP = std::jthread([this](auto token){ RunTCP(std::move(token)); });
159     }
160 return 0;
161 }
162
163 int NF_CAP::Stop()
164 {
165 m_threadTCP.request_stop();
166 m_threadUDP.request_stop();
167 if (portU && !stoppedUDP)
168     {
169     CloseUDP();
170     for (int i = 0; i < 25 && !stoppedUDP; ++i)
171         {
172         struct timespec ts = {0, 200000000};
173         nanosleep(&ts, NULL);
174         }
175     if (stoppedUDP)
176         {
177         m_threadUDP.join();
178         }
179     else
180         {
181         m_threadUDP.detach();
182         printfd(__FILE__, "UDP thread NOT stopped\n");
183         logger("Cannot stop UDP thread.");
184         }
185     }
186 if (portT && !stoppedTCP)
187     {
188     CloseTCP();
189     for (int i = 0; i < 25 && !stoppedTCP; ++i)
190         {
191         struct timespec ts = {0, 200000000};
192         nanosleep(&ts, NULL);
193         }
194     if (stoppedTCP)
195         {
196         m_threadTCP.join();
197         }
198     else
199         {
200         m_threadTCP.detach();
201         printfd(__FILE__, "TCP thread NOT stopped\n");
202         logger("Cannot stop TCP thread.");
203         }
204     }
205 return 0;
206 }
207
208 bool NF_CAP::OpenUDP()
209 {
210 struct sockaddr_in sin;
211 sockUDP = socket(PF_INET, SOCK_DGRAM, 0);
212 if (sockUDP <= 0)
213     {
214     errorStr = "Error opening UDP socket";
215     logger("Cannot create UDP socket: %s", strerror(errno));
216     printfd(__FILE__, "Error: Error opening UDP socket\n");
217     return true;
218     }
219 sin.sin_family = AF_INET;
220 sin.sin_port = htons(portU);
221 sin.sin_addr.s_addr = inet_addr("0.0.0.0");
222 if (bind(sockUDP, reinterpret_cast<const sockaddr*>(&sin), sizeof(sin)))
223     {
224     errorStr = "Error binding UDP socket";
225     logger("Cannot bind UDP socket: %s", strerror(errno));
226     printfd(__FILE__, "Error: Error binding UDP socket\n");
227     return true;
228     }
229 return false;
230 }
231
232 bool NF_CAP::OpenTCP()
233 {
234 struct sockaddr_in sin;
235 sockTCP = socket(PF_INET, SOCK_STREAM, 0);
236 if (sockTCP <= 0)
237     {
238     errorStr = "Error opening TCP socket";
239     logger("Cannot create TCP socket: %s", strerror(errno));
240     printfd(__FILE__, "Error: Error opening TCP socket\n");
241     return true;
242     }
243 sin.sin_family = AF_INET;
244 sin.sin_port = htons(portT);
245 sin.sin_addr.s_addr = inet_addr("0.0.0.0");
246 if (bind(sockTCP, reinterpret_cast<const sockaddr*>(&sin), sizeof(sin)))
247     {
248     errorStr = "Error binding TCP socket";
249     logger("Cannot bind TCP socket: %s", strerror(errno));
250     printfd(__FILE__, "Error: Error binding TCP socket\n");
251     return true;
252     }
253 if (listen(sockTCP, 1))
254     {
255     errorStr = "Error listening on TCP socket";
256     logger("Cannot listen on TCP socket: %s", strerror(errno));
257     printfd(__FILE__, "Error: Error listening TCP socket\n");
258     return true;
259     }
260 return false;
261 }
262
263 void NF_CAP::RunUDP(std::stop_token token) noexcept
264 {
265 sigset_t signalSet;
266 sigfillset(&signalSet);
267 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
268
269 stoppedUDP = false;
270 while (!token.stop_requested())
271     {
272     if (!WaitPackets(sockUDP))
273         {
274         continue;
275         }
276
277     // Data
278     struct sockaddr_in sin;
279     socklen_t slen = sizeof(sin);
280     uint8_t buf[BUF_SIZE];
281     ssize_t res = recvfrom(sockUDP, buf, BUF_SIZE, 0, reinterpret_cast<struct sockaddr *>(&sin), &slen);
282     if (token.stop_requested())
283         break;
284
285     if (res < 0)
286         {
287         logger("recvfrom error: %s", strerror(errno));
288         continue;
289         }
290
291     if (res == 0) // EOF
292         {
293         continue;
294         }
295
296     if (res < 24)
297         {
298         if (errno != EINTR)
299             {
300             errorStr = "Invalid data received";
301             printfd(__FILE__, "Error: Invalid data received through UDP\n");
302             }
303         continue;
304         }
305
306     ParseBuffer(buf, res);
307     }
308 stoppedUDP = true;
309 }
310
311 void NF_CAP::RunTCP(std::stop_token token) noexcept
312 {
313 sigset_t signalSet;
314 sigfillset(&signalSet);
315 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
316
317 stoppedTCP = false;
318 while (!token.stop_requested())
319     {
320     if (!WaitPackets(sockTCP))
321         {
322         continue;
323         }
324
325     // Data
326     struct sockaddr_in sin;
327     socklen_t slen = sizeof(sin);
328     int sd = accept(sockTCP, reinterpret_cast<struct sockaddr *>(&sin), &slen);
329     if (token.stop_requested())
330         break;
331
332     if (sd <= 0)
333         {
334         if (sd < 0)
335             logger("accept error: %s", strerror(errno));
336         continue;
337         }
338
339     if (!WaitPackets(sd))
340         {
341         close(sd);
342         continue;
343         }
344
345     uint8_t buf[BUF_SIZE];
346     ssize_t res = recv(sd, buf, BUF_SIZE, MSG_WAITALL);
347
348     if (res < 0)
349         logger("recv error: %s", strerror(errno));
350
351     close(sd);
352
353     if (token.stop_requested())
354         break;
355
356     if (res == 0) // EOF
357         {
358         continue;
359         }
360
361     // Wrong logic!
362     // Need to check actual data length and wait all data to receive
363     if (res < 24)
364         {
365         continue;
366         }
367
368     ParseBuffer(buf, res);
369     }
370 stoppedTCP = true;
371 }
372
373 void NF_CAP::ParseBuffer(uint8_t * buf, ssize_t size)
374 {
375 RawPacket ip;
376 NF_HEADER * hdr = reinterpret_cast<NF_HEADER *>(buf);
377 if (htons(hdr->version) != 5)
378     {
379     return;
380     }
381
382 int packets = htons(hdr->count);
383
384 if (packets < 0 || packets > 30)
385     {
386     return;
387     }
388
389 if (24 + 48 * packets != size)
390     {
391     // See 'wrong logic' upper
392     return;
393     }
394
395 for (int i = 0; i < packets; ++i)
396     {
397     NF_DATA * data = reinterpret_cast<NF_DATA *>(buf + 24 + i * 48);
398
399     ip.rawPacket.header.ipHeader.ip_v = 4;
400     ip.rawPacket.header.ipHeader.ip_hl = 5;
401     ip.rawPacket.header.ipHeader.ip_p = data->proto;
402     ip.dataLen = ntohl(data->octets);
403     ip.rawPacket.header.ipHeader.ip_src.s_addr = data->srcAddr;
404     ip.rawPacket.header.ipHeader.ip_dst.s_addr = data->dstAddr;
405     ip.rawPacket.header.sPort = data->srcPort;
406     ip.rawPacket.header.dPort = data->dstPort;
407
408     traffCnt->process(ip);
409     }
410 }