2  *    This program is free software; you can redistribute it and/or modify
 
   3  *    it under the terms of the GNU General Public License as published by
 
   4  *    the Free Software Foundation; either version 2 of the License, or
 
   5  *    (at your option) any later version.
 
   7  *    This program is distributed in the hope that it will be useful,
 
   8  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
 
   9  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
  10  *    GNU General Public License for more details.
 
  12  *    You should have received a copy of the GNU General Public License
 
  13  *    along with this program; if not, write to the Free Software
 
  14  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 
  18  *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
 
  23  $Date: 2009/10/12 08:43:32 $
 
  31 #include "traffcounter.h"
 
  36 //-----------------------------------------------------------------------------
 
  37 STG::TRAFFCOUNTER::TRAFFCOUNTER()
 
  49 LOG_IT << "TRAFFCOUNTER::TRAFFCOUNTER()\n";
 
  50 pthread_mutex_init(&sessionMutex, NULL);
 
  51 pthread_mutex_init(&pendingMutex, NULL);
 
  52 pthread_mutex_init(&ipMutex, NULL);
 
  53 pthread_mutex_init(&rulesMutex, NULL);
 
  54 pthread_cond_init(&pendingCond, NULL);
 
  56 //-----------------------------------------------------------------------------
 
  57 STG::TRAFFCOUNTER::~TRAFFCOUNTER()
 
  59 LOG_IT << "TRAFFCOUNTER::~TRAFFCOUNTER()\n";
 
  60 pthread_cond_destroy(&pendingCond);
 
  61 pthread_mutex_destroy(&rulesMutex);
 
  62 pthread_mutex_destroy(&ipMutex);
 
  63 pthread_mutex_destroy(&pendingMutex);
 
  64 pthread_mutex_destroy(&sessionMutex);
 
  66 //-----------------------------------------------------------------------------
 
  67 //  Starting processing thread
 
  68 bool STG::TRAFFCOUNTER::Start()
 
  70 LOG_IT << "TRAFFCOUNTER::Start()\n";
 
  78 if (pthread_create(&thread, NULL, Run, this))
 
  80     LOG_IT << "TRAFFCOUNTER::Start() Error: Cannot start thread!\n";
 
  86 //-----------------------------------------------------------------------------
 
  87 bool STG::TRAFFCOUNTER::Stop()
 
  89 LOG_IT << "TRAFFCOUNTER::Stop()\n";
 
  90 LOG_IT << "maxPending: " << maxPending << std::endl;
 
  97 pthread_cond_signal(&pendingCond);
 
  99 //5 seconds to thread stops itself
 
 100 for (int i = 0; i < 25 && !stopped; ++i)
 
 105 //after 5 seconds waiting thread still running. now kill it
 
 108     LOG_IT << "TRAFFCOUNTER::Stop() Killing thread\n";
 
 109     if (pthread_kill(thread, SIGINT))
 
 113     LOG_IT << "TRAFFCOUNTER::Stop() Thread killed\n";
 
 118 //-----------------------------------------------------------------------------
 
 119 double STG::TRAFFCOUNTER::StreamQuality() const
 
 121 if (!cacheHits && !cacheMisses)
 
 126 double quality = cacheHits;
 
 127 return quality / (quality + cacheMisses);
 
 129 //-----------------------------------------------------------------------------
 
 130 void STG::TRAFFCOUNTER::AddPacket(const iphdr & ipHdr, uint16_t sport, uint16_t dport)
 
 133  *  Intersects with AddIP (from user thread), DeleteIP (from user thread) and
 
 134  * Process (internal thread). AddPacket is calling from capturer's thread
 
 136  *  ips is affected by AddIP (logarithmic lock time) and
 
 137  *  DeleteIP (from user thread)
 
 139  *  May be locked by AddIP or DeleteIP (from user thread)
 
 141  *  Lock AddIP (user thread) or DeleteIP (user thread)
 
 142  *  Logarithmic lock time
 
 149     SCOPED_LOCK lock(ipMutex);
 
 150     srcExists = std::binary_search(ips.begin(), ips.end(), ipHdr.saddr);
 
 151     dstExists = std::binary_search(ips.begin(), ips.end(), ipHdr.daddr);
 
 157     // Just drop the packet
 
 161 STG::PENDING_PACKET p(ipHdr, sport, dport);
 
 163 // Packet classification
 
 168         // Both src and dst are countable
 
 169         p.direction = PENDING_PACKET::LOCAL;
 
 174         p.direction = PENDING_PACKET::OUTGOING;
 
 182         p.direction = PENDING_PACKET::INCOMING;
 
 187         // Not src nor dst are countable
 
 188         p.direction = PENDING_PACKET::FOREIGN;
 
 193  *  pendingPackets is affected by Process (from internal thread)
 
 195  *  May be locked by Process (internal thread)
 
 197  *  Lock Process (internal thread)
 
 200 SCOPED_LOCK lock(pendingMutex);
 
 201 pendingPackets.push_back(p);
 
 204 if (pendingCount > maxPending)
 
 205     maxPending = pendingCount;
 
 207 pthread_cond_signal(&pendingCond);
 
 210 //-----------------------------------------------------------------------------
 
 211 void STG::TRAFFCOUNTER::AddIP(uint32_t ip)
 
 214  *  AddIP is calling from users and affect DeleteIP and AddPacket.
 
 215  * DeleteIP cannot be called concurrently with AddIP - it's the same
 
 216  * thread. AddPacket is calling from capturer's thread - concurrently
 
 219  * May be locked by AddPacket (from capturer's thread)
 
 220  * Logarithmic lock time
 
 222  * Lock AddPacket (capturer's thread)
 
 223  * Logarithmic lock time
 
 225 SCOPED_LOCK lock(ipMutex);
 
 226 IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip));
 
 228 if (it != ips.end() && *it == ip)
 
 235 //-----------------------------------------------------------------------------
 
 236 void STG::TRAFFCOUNTER::DeleteIP(uint32_t ip, STG::TRAFF_DATA * traff)
 
 239  *  DeleteIP is calling from users and affect AddIP, AddPacket, GetIP and
 
 240  * Process. AddIP and GetIP cannot be called concurrently with DeleteIP - it's
 
 241  * the same thread. AddPacket is calling from capturer's thread - concurrently
 
 242  * with DeleteIP. Process is calling from internal thread - concurrently with
 
 245  * May be locked by AddPacket (from capturer's thread)
 
 246  * Logarithmic lock time
 
 248  * Lock AddPacket (capturer's thread)
 
 249  * Logarithmic lock time
 
 253     SCOPED_LOCK lock(ipMutex);
 
 255     IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip));
 
 268 // Get sessions for this ip
 
 269 std::pair<INDEX_ITER,
 
 272 SCOPED_LOCK lock(sessionMutex);
 
 273 range = ip2sessions.equal_range(ip);
 
 274 std::list<INDEX_ITER> toDelete;
 
 276 // Lock session growing
 
 277 for (INDEX_ITER it = range.first; it != range.second; ++it)
 
 279     traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second));
 
 282     toDelete.push_back(it);
 
 284     /*if (ip == it->second->first.saddr)
 
 286         toDelete.push_back(it->second->second.dIdx);
 
 290         toDelete.push_back(it->second->second.sIdx);
 
 293     --it->second->second.refCount;
 
 297      *  Normally we will lock here only in case of session between
 
 298      *  two users from ips list
 
 300     if (!it->second->second.refCount)
 
 302         sessions.erase(it->second);
 
 307 for (std::list<INDEX_ITER>::iterator it = toDelete.begin();
 
 308      it != toDelete.end();
 
 311     ip2sessions.erase(*it);
 
 314 //-----------------------------------------------------------------------------
 
 315 void STG::TRAFFCOUNTER::GetIP(uint32_t ip, STG::TRAFF_DATA * traff)
 
 318  *  Normally we will lock here only in case of session between
 
 319  *  two users from ips list
 
 321 std::pair<INDEX_ITER,
 
 324 SCOPED_LOCK lock(sessionMutex);
 
 325 range = ip2sessions.equal_range(ip);
 
 326 std::list<INDEX_ITER> toDelete;
 
 328 // TODO: replace with foreach
 
 329 for (SESSION_INDEX::iterator it = range.first;
 
 333     traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second));
 
 334     toDelete.push_back(it);
 
 335     --it->second->second.refCount;
 
 336     if (!it->second->second.refCount)
 
 338         sessions.erase(it->second);
 
 342 for (std::list<INDEX_ITER>::iterator it = toDelete.begin();
 
 343      it != toDelete.end();
 
 346     ip2sessions.erase(*it);
 
 349 //-----------------------------------------------------------------------------
 
 350 void * STG::TRAFFCOUNTER::Run(void * data)
 
 352 STG::TRAFFCOUNTER * tc = static_cast<STG::TRAFFCOUNTER *>(data);
 
 357     STG::PENDING_PACKET packet;
 
 359         SCOPED_LOCK lock(tc->pendingMutex);
 
 360         if (tc->pendingPackets.empty())
 
 362             pthread_cond_wait(&tc->pendingCond, &tc->pendingMutex);
 
 368         packet = *tc->pendingPackets.begin();
 
 369         tc->pendingPackets.pop_front();
 
 378 //-----------------------------------------------------------------------------
 
 379 void STG::TRAFFCOUNTER::Process(const STG::PENDING_PACKET & p)
 
 385 // Fail on foreign packets
 
 386 if (p.direction == PENDING_PACKET::FOREIGN) {
 
 390 // Searching a new packet in a tree.
 
 393     SCOPED_LOCK lock(sessionMutex);
 
 394     si = sessions.find(STG::SESSION_ID(p));
 
 397 // Packet found - update length and time
 
 398 if (si != sessions.end())
 
 401     SCOPED_LOCK lock(sessionMutex);
 
 402     si->second.length += p.length;
 
 409 // Packet not found - add new packet
 
 411 // This packet is alowed to create session
 
 412 STG::SESSION_ID sid(p);
 
 413 SESSION_FULL_DATA sd;
 
 417     SCOPED_LOCK lock(rulesMutex);
 
 418     sd.dir = rulesFinder.GetDir(p);
 
 421 sd.length = p.length;
 
 423 if (p.direction == PENDING_PACKET::LOCAL)
 
 433 std::pair<SESSION_ITER,
 
 434           bool> sIt(sessions.insert(std::make_pair(sid, sd)));
 
 436     SCOPED_LOCK lock(sessionMutex);
 
 437     std::pair<SESSION_ITER,
 
 438               bool> sIt(sessions.insert(std::make_pair(sid, sd)));
 
 441     sIt.first->second.sIdx = ip2sessions.insert(std::make_pair(p.saddr, sIt.first));
 
 442     sIt.first->second.dIdx = ip2sessions.insert(std::make_pair(p.daddr, sIt.first));
 
 446 //-----------------------------------------------------------------------------
 
 447 void STG::TRAFFCOUNTER::SetRules(const STG::RULES & data)
 
 450  *  SetRules is calling from outside internel thread. Process is calling
 
 451  * from internal thread and calls DeterminateDir which use rules data.
 
 453  * May be locked by DeterminateDir (Process) from internal thread.
 
 455  * Lock DeterminateDir (Process) - internal thread.
 
 458 SCOPED_LOCK lock(rulesMutex);
 
 459 rulesFinder.SetRules(data);