#include #include #include #include #include #include #include #include "snmp_pp/snmp_pp.h" #include "syncer.h" #include "settings.h" #include "switch.h" #include "subscriber.h" #include "datatypes.h" #include "logger.h" using SSMD::Syncer; using SSMD::Timer; using SSMD::Switch; using SSMD::Subscriber; using SSMD::Lines; Timer::Timer(boost::function callback, time_t interval) : _interval(interval), _lastFire(0), _callback(callback) { } Timer::~Timer() { } int Timer::getTimeout() const { double delta = difftime(time(NULL), _lastFire); return _interval - delta; } void Timer::fire() { _callback(); time(&_lastFire); } Syncer::Syncer(SettingsParser & sp, Snmp & snmp) : _settingsParser(sp), _snmp(snmp) { // 1 db syncer _timers.push_back(Timer(boost::bind(&Syncer::syncInfo, this), _settingsParser.settings().infoSyncInterval())); } Syncer::~Syncer() { } void Syncer::run(const bool & running, bool & reload) { logger << "Syncer::run()" << std::endl; while (running) { if (wait()) { logger << "Syncer::run() - wait stopped by signal" << std::endl; if (!running) break; if (reload) { logger << "Syncer::run() - reload" << std::endl; try { _settingsParser.reloadConfig(); } catch (std::exception & ex) { logger << "Syncer::run() - exception: " << ex.what() << std::endl; } reload = false; } } } } bool Syncer::wait() { Timer & timer(getNextTimer()); if (timer.getTimeout() > 0) { fd_set rfds; FD_ZERO(&rfds); struct timeval tv; tv.tv_sec = timer.getTimeout(); tv.tv_usec = 0; int retval = select(1, &rfds, NULL, NULL, &tv); if (retval == -1) { return true; } } timer.fire(); return false; } Timer & Syncer::getNextTimer() { assert(_timers.size() && "Timer list must not be empty!"); int timeout = _timers.begin()->getTimeout(); std::list::iterator it(_timers.begin()); std::list::iterator pos(_timers.begin()); ++it; while (it != _timers.end()) { if (it->getTimeout() < timeout) { pos = it; timeout = pos->getTimeout(); } ++it; } return *pos; } void Syncer::syncInfo() { std::map switches; if (!getSwitchesState(switches)) { logger << "Syncer::syncInfo() - failed to get new switch states" << std::endl; return; } std::list::iterator it(_switches.begin()); while (it != _switches.end()) { _timers.erase(it->second); _switches.erase(it++); } std::map::const_iterator sit; for (sit = switches.begin(); sit != switches.end(); ++sit) { // Insert switch with no timer _switches.push_back(std::make_pair(sit->second, _timers.end())); // Insert timer for this switch TimerIterator tit = _timers.insert( _timers.end(), Timer(boost::bind(&Switch::sync, &_switches.back().first), _settingsParser.settings().switchSyncInterval())); // Set timer iterator for this switch _switches.back().second = tit; } logger << "Syncer::syncInfo() - data synchronization successfull, switches: " << _switches.size() << std::endl; } size_t curlWriteFunction(void * ptr, size_t size, size_t nmemb, void * userdata) { char * data = static_cast(ptr); std::string * dest = static_cast(userdata); dest->append(data, size * nmemb); return size * nmemb; } bool Syncer::getDBData(std::string & data) const { CURL * handle = curl_easy_init(); if (handle) { char errorBuffer[CURL_ERROR_SIZE]; curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0); // Accept self-signed certs curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, 0); // Accept certs for wrong hosts curl_easy_setopt(handle, CURLOPT_LOW_SPEED_LIMIT, 1); // Less than 1 bps curl_easy_setopt(handle, CURLOPT_LOW_SPEED_TIME, 60); // During 60 secs curl_easy_setopt(handle, CURLOPT_URL, _settingsParser.settings().dataURL().c_str()); // Our URL curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteFunction); // Our write callback curl_easy_setopt(handle, CURLOPT_WRITEDATA, &data); // Our callback data curl_easy_setopt(handle, CURLOPT_ERRORBUFFER, errorBuffer); // Buffer for an error messages CURLcode res = curl_easy_perform(handle); if (res) { logger << "Syncer::getDBData() - DB communication error: '" << errorBuffer << "'" << std::endl; curl_easy_cleanup(handle); return false; } curl_easy_cleanup(handle); return true; } logger << "Syncer::getDBData() - failed to init CURL library" << std::endl; return false; } bool Syncer::getSwitchesState(std::map & switches) { if (_settingsParser.settings().dataURL().empty()) { logger << "Switch::getSwitchesState() - data URL is empty" << std::endl; return false; } std::string data; if (!getDBData(data)) { logger << "Syncer::getSwitchesState() - failed to fetch data from the URL: '" << _settingsParser.settings().dataURL() << "'" << std::endl; return false; } Lines lines; if (!parseData(data, lines)) { logger << "Syncer::getSwitchesState() - failed to parse data:\n" << data << std::endl; return false; } Lines::const_iterator it; for (it = lines.begin(); it != lines.end(); ++it) { std::pair::iterator, bool> res( switches.insert( std::make_pair( it->switchIP, Switch( _settingsParser.settings(), _snmp, it->switchIP, it->readCommunity, it->writeCommunity, it->uplinkPort ) ) ) ); res.first->second.addSubscriber( Subscriber( it->mac, it->userPort, it->upShape, it->downShape, it->upBurst, it->downBurst ) ); } return true; }