X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/d084d9ae9f7bcd7f7d1926e7eeae921dbad49273..80270bc96f3fd1d1f14b3ef539b73ad2eb0017de:/projects/traffcounter/traffcounter.cpp diff --git a/projects/traffcounter/traffcounter.cpp b/projects/traffcounter/traffcounter.cpp deleted file mode 100644 index baa999a9..00000000 --- a/projects/traffcounter/traffcounter.cpp +++ /dev/null @@ -1,460 +0,0 @@ -/* - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ - -/* - * Author : Maxim Mamontov - */ - -/* - $Revision: 1.5 $ - $Date: 2009/10/12 08:43:32 $ - $Author: faust $ - */ - -#include -#include -#include - -#include "traffcounter.h" -#include "logger.h" -#include "lock.h" -#include "utils.h" - -//----------------------------------------------------------------------------- -STG::TRAFFCOUNTER::TRAFFCOUNTER() - : rulesFinder(), - pendingPackets(), - sessions(), - cacheHits(0), - cacheMisses(0), - pendingCount(0), - maxPending(0), - ip2sessions(), - stopped(true), - running(false) -{ -LOG_IT << "TRAFFCOUNTER::TRAFFCOUNTER()\n"; -pthread_mutex_init(&sessionMutex, NULL); -pthread_mutex_init(&pendingMutex, NULL); -pthread_mutex_init(&ipMutex, NULL); -pthread_mutex_init(&rulesMutex, NULL); -pthread_cond_init(&pendingCond, NULL); -} -//----------------------------------------------------------------------------- -STG::TRAFFCOUNTER::~TRAFFCOUNTER() -{ -LOG_IT << "TRAFFCOUNTER::~TRAFFCOUNTER()\n"; -pthread_cond_destroy(&pendingCond); -pthread_mutex_destroy(&rulesMutex); -pthread_mutex_destroy(&ipMutex); -pthread_mutex_destroy(&pendingMutex); -pthread_mutex_destroy(&sessionMutex); -} -//----------------------------------------------------------------------------- -// Starting processing thread -bool STG::TRAFFCOUNTER::Start() -{ -LOG_IT << "TRAFFCOUNTER::Start()\n"; - -if (running) - return false; - -running = true; -stopped = true; - -if (pthread_create(&thread, NULL, Run, this)) - { - LOG_IT << "TRAFFCOUNTER::Start() Error: Cannot start thread!\n"; - return true; - } - -return false; -} -//----------------------------------------------------------------------------- -bool STG::TRAFFCOUNTER::Stop() -{ -LOG_IT << "TRAFFCOUNTER::Stop()\n"; -LOG_IT << "maxPending: " << maxPending << std::endl; - -if (!running) - return false; - -running = false; -// Awake thread -pthread_cond_signal(&pendingCond); - -//5 seconds to thread stops itself -for (int i = 0; i < 25 && !stopped; ++i) - { - usleep(200000); - } - -//after 5 seconds waiting thread still running. now kill it -if (!stopped) - { - LOG_IT << "TRAFFCOUNTER::Stop() Killing thread\n"; - if (pthread_kill(thread, SIGINT)) - { - return true; - } - LOG_IT << "TRAFFCOUNTER::Stop() Thread killed\n"; - } - -return false; -} -//----------------------------------------------------------------------------- -double STG::TRAFFCOUNTER::StreamQuality() const -{ -if (!cacheHits && !cacheMisses) - { - return 0; - } - -double quality = cacheHits; -return quality / (quality + cacheMisses); -} -//----------------------------------------------------------------------------- -void STG::TRAFFCOUNTER::AddPacket(const iphdr & ipHdr, uint16_t sport, uint16_t dport) -{ -/* - * Intersects with AddIP (from user thread), DeleteIP (from user thread) and - * Process (internal thread). AddPacket is calling from capturer's thread - * - * ips is affected by AddIP (logarithmic lock time) and - * DeleteIP (from user thread) - * - * May be locked by AddIP or DeleteIP (from user thread) - * - * Lock AddIP (user thread) or DeleteIP (user thread) - * Logarithmic lock time - */ - -bool srcExists; -bool dstExists; - - { - SCOPED_LOCK lock(ipMutex); - srcExists = std::binary_search(ips.begin(), ips.end(), ipHdr.saddr); - dstExists = std::binary_search(ips.begin(), ips.end(), ipHdr.daddr); - } - -if (!srcExists && - !dstExists) - { - // Just drop the packet - return; - } - -STG::PENDING_PACKET p(ipHdr, sport, dport); - -// Packet classification -if (srcExists) - { - if (dstExists) - { - // Both src and dst are countable - p.direction = PENDING_PACKET::LOCAL; - } - else - { - // Src is countable - p.direction = PENDING_PACKET::OUTGOING; - } - } -else - { - if (dstExists) - { - // Dst is countable - p.direction = PENDING_PACKET::INCOMING; - } - else - { - assert(0); - // Not src nor dst are countable - p.direction = PENDING_PACKET::FOREIGN; - } - } - -/* - * pendingPackets is affected by Process (from internal thread) - * - * May be locked by Process (internal thread) - * - * Lock Process (internal thread) - * Constant lock time - */ -SCOPED_LOCK lock(pendingMutex); -pendingPackets.push_back(p); -pendingCount++; -#ifdef STATISTIC -if (pendingCount > maxPending) - maxPending = pendingCount; -#endif -pthread_cond_signal(&pendingCond); - -} -//----------------------------------------------------------------------------- -void STG::TRAFFCOUNTER::AddIP(uint32_t ip) -{ -/* - * AddIP is calling from users and affect DeleteIP and AddPacket. - * DeleteIP cannot be called concurrently with AddIP - it's the same - * thread. AddPacket is calling from capturer's thread - concurrently - * with AddIP. - * - * May be locked by AddPacket (from capturer's thread) - * Logarithmic lock time - * - * Lock AddPacket (capturer's thread) - * Logarithmic lock time - */ -SCOPED_LOCK lock(ipMutex); -IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip)); - -if (it != ips.end() && *it == ip) - { - return; - } -// Insertion -ips.insert(it, ip); -} -//----------------------------------------------------------------------------- -void STG::TRAFFCOUNTER::DeleteIP(uint32_t ip, STG::TRAFF_DATA * traff) -{ -/* - * DeleteIP is calling from users and affect AddIP, AddPacket, GetIP and - * Process. AddIP and GetIP cannot be called concurrently with DeleteIP - it's - * the same thread. AddPacket is calling from capturer's thread - concurrently - * with DeleteIP. Process is calling from internal thread - concurrently with - * DeleteIP. - * - * May be locked by AddPacket (from capturer's thread) - * Logarithmic lock time - * - * Lock AddPacket (capturer's thread) - * Logarithmic lock time - */ - - { - SCOPED_LOCK lock(ipMutex); - - IP_ITER it(std::lower_bound(ips.begin(), ips.end(), ip)); - if (it == ips.end()) - { - return; - } - if (*it != ip) - { - return; - } - - ips.erase(it); - } - -// Get sessions for this ip -std::pair range; - -SCOPED_LOCK lock(sessionMutex); -range = ip2sessions.equal_range(ip); -std::list toDelete; - -// Lock session growing -for (INDEX_ITER it = range.first; it != range.second; ++it) - { - traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second)); - - // Include self - toDelete.push_back(it); - - /*if (ip == it->second->first.saddr) - { - toDelete.push_back(it->second->second.dIdx); - } - else - { - toDelete.push_back(it->second->second.sIdx); - }*/ - - --it->second->second.refCount; - - // Remove session - /* - * Normally we will lock here only in case of session between - * two users from ips list - */ - if (!it->second->second.refCount) - { - sessions.erase(it->second); - } - } - -// Remove indexes -for (std::list::iterator it = toDelete.begin(); - it != toDelete.end(); - ++it) - { - ip2sessions.erase(*it); - } -} -//----------------------------------------------------------------------------- -void STG::TRAFFCOUNTER::GetIP(uint32_t ip, STG::TRAFF_DATA * traff) -{ -/* - * Normally we will lock here only in case of session between - * two users from ips list - */ -std::pair range; - -SCOPED_LOCK lock(sessionMutex); -range = ip2sessions.equal_range(ip); -std::list toDelete; - -// TODO: replace with foreach -for (SESSION_INDEX::iterator it = range.first; - it != range.second; - ++it) - { - traff->push_back(STG::TRAFF_ITEM(it->second->first, it->second->second)); - toDelete.push_back(it); - --it->second->second.refCount; - if (!it->second->second.refCount) - { - sessions.erase(it->second); - } - } - -for (std::list::iterator it = toDelete.begin(); - it != toDelete.end(); - ++it) - { - ip2sessions.erase(*it); - } -} -//----------------------------------------------------------------------------- -void * STG::TRAFFCOUNTER::Run(void * data) -{ -STG::TRAFFCOUNTER * tc = static_cast(data); -tc->stopped = false; - -while (tc->running) - { - STG::PENDING_PACKET packet; - { - SCOPED_LOCK lock(tc->pendingMutex); - if (tc->pendingPackets.empty()) - { - pthread_cond_wait(&tc->pendingCond, &tc->pendingMutex); - } - if (!tc->running) - { - break; - } - packet = *tc->pendingPackets.begin(); - tc->pendingPackets.pop_front(); - --tc->pendingCount; - } - tc->Process(packet); - } - -tc->stopped = true; -return NULL; -} -//----------------------------------------------------------------------------- -void STG::TRAFFCOUNTER::Process(const STG::PENDING_PACKET & p) -{ -// Bypass on stop -if (!running) - return; - -// Fail on foreign packets -if (p.direction == PENDING_PACKET::FOREIGN) { - assert(0); -} - -// Searching a new packet in a tree. -SESSION_ITER si; - { - SCOPED_LOCK lock(sessionMutex); - si = sessions.find(STG::SESSION_ID(p)); - } - -// Packet found - update length and time -if (si != sessions.end()) - { - // Grow session - SCOPED_LOCK lock(sessionMutex); - si->second.length += p.length; - ++cacheHits; - return; - } - -++cacheMisses; - -// Packet not found - add new packet - -// This packet is alowed to create session -STG::SESSION_ID sid(p); -SESSION_FULL_DATA sd; - -// Identify a packet - { - SCOPED_LOCK lock(rulesMutex); - sd.dir = rulesFinder.GetDir(p); - } - -sd.length = p.length; - -if (p.direction == PENDING_PACKET::LOCAL) - { - sd.refCount = 2; - } -else - { - sd.refCount = 1; - } - -// Create a session -std::pair sIt(sessions.insert(std::make_pair(sid, sd))); - { - SCOPED_LOCK lock(sessionMutex); - std::pair sIt(sessions.insert(std::make_pair(sid, sd))); - - // Create an indexes - sIt.first->second.sIdx = ip2sessions.insert(std::make_pair(p.saddr, sIt.first)); - sIt.first->second.dIdx = ip2sessions.insert(std::make_pair(p.daddr, sIt.first)); - } - -} -//----------------------------------------------------------------------------- -void STG::TRAFFCOUNTER::SetRules(const STG::RULES & data) -{ -/* - * SetRules is calling from outside internel thread. Process is calling - * from internal thread and calls DeterminateDir which use rules data. - * - * May be locked by DeterminateDir (Process) from internal thread. - * - * Lock DeterminateDir (Process) - internal thread. - * Linear lock time - */ -SCOPED_LOCK lock(rulesMutex); -rulesFinder.SetRules(data); -}