ChangeLog update
[stg.git] / projects / traffcounter / traffcounter.cpp
1 /*
2  *    This program is free software; you can redistribute it and/or modify
3  *    it under the terms of the GNU General Public License as published by
4  *    the Free Software Foundation; either version 2 of the License, or
5  *    (at your option) any later version.
6  *
7  *    This program is distributed in the hope that it will be useful,
8  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  *    GNU General Public License for more details.
11  *
12  *    You should have received a copy of the GNU General Public License
13  *    along with this program; if not, write to the Free Software
14  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15  */
16
17 /*
18  *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
19  */
20
21 /*
22  $Revision: 1.5 $
23  $Date: 2009/10/12 08:43:32 $
24  $Author: faust $
25  */
26
27 #include <csignal>
28 #include <cassert>
29 #include <algorithm>
30
31 #include "traffcounter.h"
32 #include "logger.h"
33 #include "lock.h"
34 #include "utils.h"
35
36 //-----------------------------------------------------------------------------
37 STG::TRAFFCOUNTER::TRAFFCOUNTER()
38     : rulesFinder(),
39       pendingPackets(),
40       sessions(),
41       cacheHits(0),
42       cacheMisses(0),
43       pendingCount(0),
44       maxPending(0),
45       ip2sessions(),
46       stopped(true),
47       running(false)
48 {
49 LOG_IT << "TRAFFCOUNTER::TRAFFCOUNTER()\n";
50 pthread_mutex_init(&sessionMutex, NULL);
51 pthread_mutex_init(&pendingMutex, NULL);
52 pthread_mutex_init(&ipMutex, NULL);
53 pthread_mutex_init(&rulesMutex, NULL);
54 pthread_cond_init(&pendingCond, NULL);
55 }
56 //-----------------------------------------------------------------------------
57 STG::TRAFFCOUNTER::~TRAFFCOUNTER()
58 {
59 LOG_IT << "TRAFFCOUNTER::~TRAFFCOUNTER()\n";
60 pthread_cond_destroy(&pendingCond);
61 pthread_mutex_destroy(&rulesMutex);
62 pthread_mutex_destroy(&ipMutex);
63 pthread_mutex_destroy(&pendingMutex);
64 pthread_mutex_destroy(&sessionMutex);
65 }
66 //-----------------------------------------------------------------------------
67 //  Starting processing thread
68 bool STG::TRAFFCOUNTER::Start()
69 {
70 LOG_IT << "TRAFFCOUNTER::Start()\n";
71
72 if (running)
73     return false;
74
75 running = true;
76 stopped = true;
77
78 if (pthread_create(&thread, NULL, Run, this))
79     {
80     LOG_IT << "TRAFFCOUNTER::Start() Error: Cannot start thread!\n";
81     return true;
82     }
83
84 return false;
85 }
86 //-----------------------------------------------------------------------------
87 bool STG::TRAFFCOUNTER::Stop()
88 {
89 LOG_IT << "TRAFFCOUNTER::Stop()\n";
90 LOG_IT << "maxPending: " << maxPending << std::endl;
91
92 if (!running)
93     return false;
94
95 running = false;
96 // Awake thread
97 pthread_cond_signal(&pendingCond);
98
99 //5 seconds to thread stops itself
100 for (int i = 0; i < 25 && !stopped; ++i)
101     {
102     usleep(200000);
103     }
104
105 //after 5 seconds waiting thread still running. now kill it
106 if (!stopped)
107     {
108     LOG_IT << "TRAFFCOUNTER::Stop() Killing thread\n";
109     if (pthread_kill(thread, SIGINT))
110         {
111         return true;
112         }
113     LOG_IT << "TRAFFCOUNTER::Stop() Thread killed\n";
114     }
115
116 return false;
117 }
118 //-----------------------------------------------------------------------------
119 double STG::TRAFFCOUNTER::StreamQuality() const
120 {
121 if (!cacheHits && !cacheMisses)
122     {
123     return 0;
124     }
125
126 double quality = cacheHits;
127 return quality / (quality + cacheMisses);
128 }
129 //-----------------------------------------------------------------------------
130 void STG::TRAFFCOUNTER::AddPacket(const iphdr & ipHdr, uint16_t sport, uint16_t dport)
131 {
132 /*
133  *  Intersects with AddIP (from user thread), DeleteIP (from user thread) and
134  * Process (internal thread). AddPacket is calling from capturer's thread
135  *
136  *  ips is affected by AddIP (logarithmic lock time) and
137  *  DeleteIP (from user thread)
138  *
139  *  May be locked by AddIP or DeleteIP (from user thread)
140  *
141  *  Lock AddIP (user thread) or DeleteIP (user thread)
142  *  Logarithmic lock time
143  */
144
145 bool srcExists;
146 bool dstExists;
147
148     {
149     SCOPED_LOCK lock(ipMutex);
150     srcExists = std::binary_search(ips.begin(), ips.end(), ipHdr.saddr);
151     dstExists = std::binary_search(ips.begin(), ips.end(), ipHdr.daddr);
152     }
153
154 if (!srcExists &&
155     !dstExists)
156     {
157     // Just drop the packet
158     return;
159     }
160
161 STG::PENDING_PACKET p(ipHdr, sport, dport);
162
163 // Packet classification
164 if (srcExists)
165     {
166     if (dstExists)
167         {
168         // Both src and dst are countable
169         p.direction = PENDING_PACKET::LOCAL;
170         }
171     else
172         {
173         // Src is countable
174         p.direction = PENDING_PACKET::OUTGOING;
175         }
176     }
177 else
178     {
179     if (dstExists)
180         {
181         // Dst is countable
182         p.direction = PENDING_PACKET::INCOMING;
183         }
184     else
185         {
186         assert(0);
187         // Not src nor dst are countable
188         p.direction = PENDING_PACKET::FOREIGN;
189         }
190     }
191
192 /*
193  *  pendingPackets is affected by Process (from internal thread)
194  *
195  *  May be locked by Process (internal thread)
196  *
197  *  Lock Process (internal thread)
198  *  Constant lock time
199  */
200 SCOPED_LOCK lock(pendingMutex);
201 pendingPackets.push_back(p);
202 pendingCount++;
203 #ifdef STATISTIC
204 if (pendingCount > maxPending)
205     maxPending = pendingCount;
206 #endif
207 pthread_cond_signal(&pendingCond);
208
209 }
210 //-----------------------------------------------------------------------------
211 void STG::TRAFFCOUNTER::AddIP(uint32_t ip)
212 {
213 /*
214  *  AddIP is calling from users and affect DeleteIP and AddPacket.
215  * DeleteIP cannot be called concurrently with AddIP - it's the same
216  * thread. AddPacket is calling from capturer's thread - concurrently
217  * with AddIP.
218  *
219  * May be locked by AddPacket (from capturer's thread)
220  * Logarithmic lock time
221  *
222  * Lock AddPacket (capturer's thread)
223  * Logarithmic lock time
224  */
225 SCOPED_LOCK lock(ipMutex);
226 IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip));
227
228 if (it != ips.end() && *it == ip)
229     {
230     return;
231     }
232 // Insertion
233 ips.insert(it, ip);
234 }
235 //-----------------------------------------------------------------------------
236 void STG::TRAFFCOUNTER::DeleteIP(uint32_t ip, STG::TRAFF_DATA * traff)
237 {
238 /*
239  *  DeleteIP is calling from users and affect AddIP, AddPacket, GetIP and
240  * Process. AddIP and GetIP cannot be called concurrently with DeleteIP - it's
241  * the same thread. AddPacket is calling from capturer's thread - concurrently
242  * with DeleteIP. Process is calling from internal thread - concurrently with
243  * DeleteIP.
244  *
245  * May be locked by AddPacket (from capturer's thread)
246  * Logarithmic lock time
247  *
248  * Lock AddPacket (capturer's thread)
249  * Logarithmic lock time
250  */
251
252     {
253     SCOPED_LOCK lock(ipMutex);
254
255     IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip));
256     if (it == ips.end())
257         {
258         return;
259         }
260     if (*it != ip)
261         {
262         return;
263         }
264
265     ips.erase(it);
266     }
267
268 // Get sessions for this ip
269 std::pair<INDEX_ITER,
270           INDEX_ITER> range;
271
272 SCOPED_LOCK lock(sessionMutex);
273 range = ip2sessions.equal_range(ip);
274 std::list<INDEX_ITER> toDelete;
275
276 // Lock session growing
277 for (INDEX_ITER it = range.first; it != range.second; ++it)
278     {
279     traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second));
280
281     // Include self
282     toDelete.push_back(it);
283
284     /*if (ip == it->second->first.saddr)
285         {
286         toDelete.push_back(it->second->second.dIdx);
287         }
288     else
289         {
290         toDelete.push_back(it->second->second.sIdx);
291         }*/
292
293     --it->second->second.refCount;
294
295     // Remove session
296     /*
297      *  Normally we will lock here only in case of session between
298      *  two users from ips list
299      */
300     if (!it->second->second.refCount)
301         {
302         sessions.erase(it->second);
303         }
304     }
305
306 // Remove indexes
307 for (std::list<INDEX_ITER>::iterator it = toDelete.begin();
308      it != toDelete.end();
309      ++it)
310     {
311     ip2sessions.erase(*it);
312     }
313 }
314 //-----------------------------------------------------------------------------
315 void STG::TRAFFCOUNTER::GetIP(uint32_t ip, STG::TRAFF_DATA * traff)
316 {
317 /*
318  *  Normally we will lock here only in case of session between
319  *  two users from ips list
320  */
321 std::pair<INDEX_ITER,
322           INDEX_ITER> range;
323
324 SCOPED_LOCK lock(sessionMutex);
325 range = ip2sessions.equal_range(ip);
326 std::list<INDEX_ITER> toDelete;
327
328 // TODO: replace with foreach
329 for (SESSION_INDEX::iterator it = range.first;
330      it != range.second;
331      ++it)
332     {
333     traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second));
334     toDelete.push_back(it);
335     --it->second->second.refCount;
336     if (!it->second->second.refCount)
337         {
338         sessions.erase(it->second);
339         }
340     }
341
342 for (std::list<INDEX_ITER>::iterator it = toDelete.begin();
343      it != toDelete.end();
344      ++it)
345     {
346     ip2sessions.erase(*it);
347     }
348 }
349 //-----------------------------------------------------------------------------
350 void * STG::TRAFFCOUNTER::Run(void * data)
351 {
352 STG::TRAFFCOUNTER * tc = static_cast<STG::TRAFFCOUNTER *>(data);
353 tc->stopped = false;
354
355 while (tc->running)
356     {
357     STG::PENDING_PACKET packet;
358         {
359         SCOPED_LOCK lock(tc->pendingMutex);
360         if (tc->pendingPackets.empty())
361             {
362             pthread_cond_wait(&tc->pendingCond, &tc->pendingMutex);
363             }
364         if (!tc->running)
365             {
366             break;
367             }
368         packet = *tc->pendingPackets.begin();
369         tc->pendingPackets.pop_front();
370         --tc->pendingCount;
371         }
372     tc->Process(packet);
373     }
374
375 tc->stopped = true;
376 return NULL;
377 }
378 //-----------------------------------------------------------------------------
379 void STG::TRAFFCOUNTER::Process(const STG::PENDING_PACKET & p)
380 {
381 // Bypass on stop
382 if (!running)
383     return;
384
385 // Fail on foreign packets
386 if (p.direction == PENDING_PACKET::FOREIGN) {
387     assert(0);
388 }
389
390 // Searching a new packet in a tree.
391 SESSION_ITER si;
392     {
393     SCOPED_LOCK lock(sessionMutex);
394     si = sessions.find(STG::SESSION_ID(p));
395     }
396
397 // Packet found - update length and time
398 if (si != sessions.end())
399     {
400     // Grow session
401     SCOPED_LOCK lock(sessionMutex);
402     si->second.length += p.length;
403     ++cacheHits;
404     return;
405     }
406
407 ++cacheMisses;
408
409 // Packet not found - add new packet
410
411 // This packet is alowed to create session
412 STG::SESSION_ID sid(p);
413 SESSION_FULL_DATA sd;
414
415 // Identify a packet
416     {
417     SCOPED_LOCK lock(rulesMutex);
418     sd.dir = rulesFinder.GetDir(p);
419     }
420
421 sd.length = p.length;
422
423 if (p.direction == PENDING_PACKET::LOCAL)
424     {
425     sd.refCount = 2;
426     }
427 else
428     {
429     sd.refCount = 1;
430     }
431
432 // Create a session
433 std::pair<SESSION_ITER,
434           bool> sIt(sessions.insert(std::make_pair(sid, sd)));
435     {
436     SCOPED_LOCK lock(sessionMutex);
437     std::pair<SESSION_ITER,
438               bool> sIt(sessions.insert(std::make_pair(sid, sd)));
439
440     // Create an indexes
441     sIt.first->second.sIdx = ip2sessions.insert(std::make_pair(p.saddr, sIt.first));
442     sIt.first->second.dIdx = ip2sessions.insert(std::make_pair(p.daddr, sIt.first));
443     }
444
445 }
446 //-----------------------------------------------------------------------------
447 void STG::TRAFFCOUNTER::SetRules(const STG::RULES & data)
448 {
449 /*
450  *  SetRules is calling from outside internel thread. Process is calling
451  * from internal thread and calls DeterminateDir which use rules data.
452  *
453  * May be locked by DeterminateDir (Process) from internal thread.
454  *
455  * Lock DeterminateDir (Process) - internal thread.
456  * Linear lock time
457  */
458 SCOPED_LOCK lock(rulesMutex);
459 rulesFinder.SetRules(data);
460 }