2 * This program is free software; you can redistribute it and/or modify
3 * it under the terms of the GNU General Public License as published by
4 * the Free Software Foundation; either version 2 of the License, or
5 * (at your option) any later version.
7 * This program is distributed in the hope that it will be useful,
8 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
10 * GNU General Public License for more details.
12 * You should have received a copy of the GNU General Public License
13 * along with this program; if not, write to the Free Software
14 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Author : Maxim Mamontov <faust@stargazer.dp.ua>
23 $Date: 2009/10/12 08:43:32 $
31 #include "traffcounter.h"
36 //-----------------------------------------------------------------------------
37 STG::TRAFFCOUNTER::TRAFFCOUNTER()
49 LOG_IT << "TRAFFCOUNTER::TRAFFCOUNTER()\n";
50 pthread_mutex_init(&sessionMutex, NULL);
51 pthread_mutex_init(&pendingMutex, NULL);
52 pthread_mutex_init(&ipMutex, NULL);
53 pthread_mutex_init(&rulesMutex, NULL);
54 pthread_cond_init(&pendingCond, NULL);
56 //-----------------------------------------------------------------------------
57 STG::TRAFFCOUNTER::~TRAFFCOUNTER()
59 LOG_IT << "TRAFFCOUNTER::~TRAFFCOUNTER()\n";
60 pthread_cond_destroy(&pendingCond);
61 pthread_mutex_destroy(&rulesMutex);
62 pthread_mutex_destroy(&ipMutex);
63 pthread_mutex_destroy(&pendingMutex);
64 pthread_mutex_destroy(&sessionMutex);
66 //-----------------------------------------------------------------------------
67 // Starting processing thread
68 bool STG::TRAFFCOUNTER::Start()
70 LOG_IT << "TRAFFCOUNTER::Start()\n";
78 if (pthread_create(&thread, NULL, Run, this))
80 LOG_IT << "TRAFFCOUNTER::Start() Error: Cannot start thread!\n";
86 //-----------------------------------------------------------------------------
87 bool STG::TRAFFCOUNTER::Stop()
89 LOG_IT << "TRAFFCOUNTER::Stop()\n";
90 LOG_IT << "maxPending: " << maxPending << std::endl;
97 pthread_cond_signal(&pendingCond);
99 //5 seconds to thread stops itself
100 for (int i = 0; i < 25 && !stopped; ++i)
105 //after 5 seconds waiting thread still running. now kill it
108 LOG_IT << "TRAFFCOUNTER::Stop() Killing thread\n";
109 if (pthread_kill(thread, SIGINT))
113 LOG_IT << "TRAFFCOUNTER::Stop() Thread killed\n";
118 //-----------------------------------------------------------------------------
119 double STG::TRAFFCOUNTER::StreamQuality() const
121 if (!cacheHits && !cacheMisses)
126 double quality = cacheHits;
127 return quality / (quality + cacheMisses);
129 //-----------------------------------------------------------------------------
130 void STG::TRAFFCOUNTER::AddPacket(const iphdr & ipHdr, uint16_t sport, uint16_t dport)
133 * Intersects with AddIP (from user thread), DeleteIP (from user thread) and
134 * Process (internal thread). AddPacket is calling from capturer's thread
136 * ips is affected by AddIP (logarithmic lock time) and
137 * DeleteIP (from user thread)
139 * May be locked by AddIP or DeleteIP (from user thread)
141 * Lock AddIP (user thread) or DeleteIP (user thread)
142 * Logarithmic lock time
149 SCOPED_LOCK lock(ipMutex);
150 srcExists = std::binary_search(ips.begin(), ips.end(), ipHdr.saddr);
151 dstExists = std::binary_search(ips.begin(), ips.end(), ipHdr.daddr);
157 // Just drop the packet
161 STG::PENDING_PACKET p(ipHdr, sport, dport);
163 // Packet classification
168 // Both src and dst are countable
169 p.direction = PENDING_PACKET::LOCAL;
174 p.direction = PENDING_PACKET::OUTGOING;
182 p.direction = PENDING_PACKET::INCOMING;
187 // Not src nor dst are countable
188 p.direction = PENDING_PACKET::FOREIGN;
193 * pendingPackets is affected by Process (from internal thread)
195 * May be locked by Process (internal thread)
197 * Lock Process (internal thread)
200 SCOPED_LOCK lock(pendingMutex);
201 pendingPackets.push_back(p);
204 if (pendingCount > maxPending)
205 maxPending = pendingCount;
207 pthread_cond_signal(&pendingCond);
210 //-----------------------------------------------------------------------------
211 void STG::TRAFFCOUNTER::AddIP(uint32_t ip)
214 * AddIP is calling from users and affect DeleteIP and AddPacket.
215 * DeleteIP cannot be called concurrently with AddIP - it's the same
216 * thread. AddPacket is calling from capturer's thread - concurrently
219 * May be locked by AddPacket (from capturer's thread)
220 * Logarithmic lock time
222 * Lock AddPacket (capturer's thread)
223 * Logarithmic lock time
225 SCOPED_LOCK lock(ipMutex);
226 IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip));
228 if (it != ips.end() && *it == ip)
235 //-----------------------------------------------------------------------------
236 void STG::TRAFFCOUNTER::DeleteIP(uint32_t ip, STG::TRAFF_DATA * traff)
239 * DeleteIP is calling from users and affect AddIP, AddPacket, GetIP and
240 * Process. AddIP and GetIP cannot be called concurrently with DeleteIP - it's
241 * the same thread. AddPacket is calling from capturer's thread - concurrently
242 * with DeleteIP. Process is calling from internal thread - concurrently with
245 * May be locked by AddPacket (from capturer's thread)
246 * Logarithmic lock time
248 * Lock AddPacket (capturer's thread)
249 * Logarithmic lock time
253 SCOPED_LOCK lock(ipMutex);
255 IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip));
268 // Get sessions for this ip
269 std::pair<INDEX_ITER,
272 SCOPED_LOCK lock(sessionMutex);
273 range = ip2sessions.equal_range(ip);
274 std::list<INDEX_ITER> toDelete;
276 // Lock session growing
277 for (INDEX_ITER it = range.first; it != range.second; ++it)
279 traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second));
282 toDelete.push_back(it);
284 /*if (ip == it->second->first.saddr)
286 toDelete.push_back(it->second->second.dIdx);
290 toDelete.push_back(it->second->second.sIdx);
293 --it->second->second.refCount;
297 * Normally we will lock here only in case of session between
298 * two users from ips list
300 if (!it->second->second.refCount)
302 sessions.erase(it->second);
307 for (std::list<INDEX_ITER>::iterator it = toDelete.begin();
308 it != toDelete.end();
311 ip2sessions.erase(*it);
314 //-----------------------------------------------------------------------------
315 void STG::TRAFFCOUNTER::GetIP(uint32_t ip, STG::TRAFF_DATA * traff)
318 * Normally we will lock here only in case of session between
319 * two users from ips list
321 std::pair<INDEX_ITER,
324 SCOPED_LOCK lock(sessionMutex);
325 range = ip2sessions.equal_range(ip);
326 std::list<INDEX_ITER> toDelete;
328 // TODO: replace with foreach
329 for (SESSION_INDEX::iterator it = range.first;
333 traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second));
334 toDelete.push_back(it);
335 --it->second->second.refCount;
336 if (!it->second->second.refCount)
338 sessions.erase(it->second);
342 for (std::list<INDEX_ITER>::iterator it = toDelete.begin();
343 it != toDelete.end();
346 ip2sessions.erase(*it);
349 //-----------------------------------------------------------------------------
350 void * STG::TRAFFCOUNTER::Run(void * data)
352 STG::TRAFFCOUNTER * tc = static_cast<STG::TRAFFCOUNTER *>(data);
357 STG::PENDING_PACKET packet;
359 SCOPED_LOCK lock(tc->pendingMutex);
360 if (tc->pendingPackets.empty())
362 pthread_cond_wait(&tc->pendingCond, &tc->pendingMutex);
368 packet = *tc->pendingPackets.begin();
369 tc->pendingPackets.pop_front();
378 //-----------------------------------------------------------------------------
379 void STG::TRAFFCOUNTER::Process(const STG::PENDING_PACKET & p)
385 // Fail on foreign packets
386 if (p.direction == PENDING_PACKET::FOREIGN) {
390 // Searching a new packet in a tree.
393 SCOPED_LOCK lock(sessionMutex);
394 si = sessions.find(STG::SESSION_ID(p));
397 // Packet found - update length and time
398 if (si != sessions.end())
401 SCOPED_LOCK lock(sessionMutex);
402 si->second.length += p.length;
409 // Packet not found - add new packet
411 // This packet is alowed to create session
412 STG::SESSION_ID sid(p);
413 SESSION_FULL_DATA sd;
417 SCOPED_LOCK lock(rulesMutex);
418 sd.dir = rulesFinder.GetDir(p);
421 sd.length = p.length;
423 if (p.direction == PENDING_PACKET::LOCAL)
433 std::pair<SESSION_ITER,
434 bool> sIt(sessions.insert(std::make_pair(sid, sd)));
436 SCOPED_LOCK lock(sessionMutex);
437 std::pair<SESSION_ITER,
438 bool> sIt(sessions.insert(std::make_pair(sid, sd)));
441 sIt.first->second.sIdx = ip2sessions.insert(std::make_pair(p.saddr, sIt.first));
442 sIt.first->second.dIdx = ip2sessions.insert(std::make_pair(p.daddr, sIt.first));
446 //-----------------------------------------------------------------------------
447 void STG::TRAFFCOUNTER::SetRules(const STG::RULES & data)
450 * SetRules is calling from outside internel thread. Process is calling
451 * from internal thread and calls DeterminateDir which use rules data.
453 * May be locked by DeterminateDir (Process) from internal thread.
455 * Lock DeterminateDir (Process) - internal thread.
458 SCOPED_LOCK lock(rulesMutex);
459 rulesFinder.SetRules(data);