X-Git-Url: https://git.stg.codes/stg.git/blobdiff_plain/53e3e5c4fa09c1ca7e1cb44054ec38441125868c..15c40aa2f808c1d48aab717636238aea13880f88:/projects/stargazer/user.cpp diff --git a/projects/stargazer/user.cpp b/projects/stargazer/user.cpp index 989d11b5..38bab82c 100644 --- a/projects/stargazer/user.cpp +++ b/projects/stargazer/user.cpp @@ -115,7 +115,6 @@ ips = StrToIPS("*"); deleted = false; lastWriteStat = stgTime + random() % settings->GetStatWritePeriod(); lastWriteDeatiledStat = stgTime; -lastSwapDeatiledStat = stgTime; property.tariffName.AddBeforeNotifier(&tariffNotifier); property.passive.AddBeforeNotifier(&passiveNotifier); @@ -124,8 +123,6 @@ currIP.AddAfterNotifier(&ipNotifier); lastScanMessages = 0; -writeFreeMbTraffCost = settings->GetWriteFreeMbTraffCost(); - pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE); @@ -197,7 +194,6 @@ deleted = u.deleted; lastWriteStat = u.lastWriteStat; lastWriteDeatiledStat = u.lastWriteDeatiledStat; -lastSwapDeatiledStat = u.lastSwapDeatiledStat; settings = u.settings; @@ -208,8 +204,6 @@ currIP.AddAfterNotifier(&ipNotifier); lastScanMessages = 0; -writeFreeMbTraffCost = settings->GetWriteFreeMbTraffCost(); - property.SetProperties(u.property); pthread_mutexattr_t attr; @@ -257,6 +251,24 @@ if (tariff == NULL) return -1; } +std::vector hdrsList; + +if (store->GetMessageHdrs(&hdrsList, login)) + { + printfd(__FILE__, "Error GetMessageHdrs %s\n", store->GetStrError().c_str()); + return -1; + } + +std::vector::const_iterator it; +for (it = hdrsList.begin(); it != hdrsList.end(); ++it) + { + STG_MSG msg; + if (store->GetMessage(it->id, &msg, login) == 0) + { + messages.push_back(msg); + } + } + return 0; } //----------------------------------------------------------------------------- @@ -589,7 +601,7 @@ void USER::Run() { STG_LOCKER lock(&mutex, __FILE__, __LINE__); -if (stgTime - lastWriteStat > settings->GetStatWritePeriod()) +if (stgTime > static_cast(lastWriteStat + settings->GetStatWritePeriod())) { printfd(__FILE__, "USER::WriteStat user=%s\n", GetLogin().c_str()); WriteStat(); @@ -754,7 +766,8 @@ sessionUpload[dir] += len; //Add detailed stat -if (!writeFreeMbTraffCost && freeMb.ConstData() >= 0) +if (!settings->GetWriteFreeMbTraffCost() && + freeMb.ConstData() >= 0) cost = 0; #ifdef TRAFF_STAT_WITH_PORTS @@ -844,7 +857,8 @@ sessionDownload[dir] += len; //Add detailed stat -if (!writeFreeMbTraffCost && freeMb.ConstData() >= 0) +if (!settings->GetWriteFreeMbTraffCost() && + freeMb.ConstData() >= 0) cost = 0; #ifdef TRAFF_STAT_WITH_PORTS @@ -940,22 +954,18 @@ Run(); //----------------------------------------------------------------------------- int USER::WriteDetailStat(bool hard) { -printfd(__FILE__, "USER::WriteDetailedStat() - queue size = %d\n", traffStatQueue.size()); +printfd(__FILE__, "USER::WriteDetailedStat() - saved size = %d\n", traffStatSaved.second.size()); -if (!traffStatQueue.empty()) +if (!traffStatSaved.second.empty()) { - std::list >::iterator it; - for (it = traffStatQueue.begin(); it != traffStatQueue.end(); ++it) + if (store->WriteDetailedStat(traffStatSaved.second, traffStatSaved.first, login)) { - if (store->WriteDetailedStat(it->second, it->first, login)) - { - printfd(__FILE__, "USER::WriteDetailStat() - failed to write detail stat from queue\n"); - WriteServLog("Cannot write detail stat from queue (of size %d recs) for user %s.", traffStatQueue.size(), login.c_str()); - WriteServLog("%s", store->GetStrError().c_str()); - return -1; - } - traffStatQueue.erase(it++); + printfd(__FILE__, "USER::WriteDetailStat() - failed to write detail stat from queue\n"); + WriteServLog("Cannot write detail stat from queue (of size %d recs) for user %s.", traffStatSaved.second.size(), login.c_str()); + WriteServLog("%s", store->GetStrError().c_str()); + return -1; } + traffStatSaved.second.erase(traffStatSaved.second.begin(), traffStatSaved.second.end()); } TRAFF_STAT ts; @@ -977,8 +987,11 @@ if (ts.size() && !disabledDetailStat) if (!hard) { printfd(__FILE__, "USER::WriteDetailStat() - pushing detail stat to queue\n"); - traffStatQueue.push_back(std::make_pair(lastWriteDeatiledStat, ts)); + STG_LOCKER lock(&mutex, __FILE__, __LINE__); + traffStatSaved.second.swap(ts); + traffStatSaved.first = lastWriteDeatiledStat; } + return -1; } } lastWriteDeatiledStat = stgTime; @@ -1138,7 +1151,18 @@ int USER::AddMessage(STG_MSG * msg) { STG_LOCKER lock(&mutex, __FILE__, __LINE__); -if (SendMessage(*msg) == 0) +if (SendMessage(*msg)) + { + if (store->AddMessage(msg, login)) + { + errorStr = store->GetStrError(); + WriteServLog("Error adding message: '%s'", errorStr.c_str()); + printfd(__FILE__, "Error adding message: '%s'\n", errorStr.c_str()); + return -1; + } + messages.push_back(*msg); + } +else { if (msg->header.repeat > 0) { @@ -1152,100 +1176,87 @@ if (SendMessage(*msg) == 0) if (store->AddMessage(msg, login)) { errorStr = store->GetStrError(); - STG_LOGGER & WriteServLog = GetStgLogger(); - WriteServLog("Error adding message %s", errorStr.c_str()); - WriteServLog("%s", store->GetStrError().c_str()); + WriteServLog("Error adding repeatable message: '%s'", errorStr.c_str()); + printfd(__FILE__, "Error adding repeatable message: '%s'\n", errorStr.c_str()); return -1; } - } - } -else - { - if (store->AddMessage(msg, login)) - { - errorStr = store->GetStrError(); - STG_LOGGER & WriteServLog = GetStgLogger(); - WriteServLog("Error adding message %s", errorStr.c_str()); - WriteServLog("%s", store->GetStrError().c_str()); - return -1; + messages.push_back(*msg); } } return 0; } //----------------------------------------------------------------------------- -int USER::SendMessage(const STG_MSG & msg) +int USER::SendMessage(STG_MSG & msg) const { -STG_LOCKER lock(&mutex, __FILE__, __LINE__); - -if (authorizedBy.empty()) - { - return -1; - } - +// No lock `cause we are already locked from caller int ret = -1; -set::iterator it; - -it = authorizedBy.begin(); +set::iterator it(authorizedBy.begin()); while (it != authorizedBy.end()) { - if ((*it)->SendMessage(msg, currIP) == 0) + if (!(*it++)->SendMessage(msg, currIP)) ret = 0; - ++it; + } +if (!ret) + { +#ifndef DEBUG + //TODO: gcc v. 4.x generate ICE on x86_64 + msg.header.lastSendTime = time(NULL); +#else + msg.header.lastSendTime = stgTime; +#endif + msg.header.repeat--; } return ret; } //----------------------------------------------------------------------------- -int USER::ScanMessage() +void USER::ScanMessage() { -STG_LOCKER lock(&mutex, __FILE__, __LINE__); - -vector hdrsList; +// No lock `cause we are already locked from caller +// We need not check for the authorizedBy `cause it has already checked by caller -if (store->GetMessageHdrs(&hdrsList, login)) - { - printfd(__FILE__, "Error GetMessageHdrs %s\n", store->GetStrError().c_str()); - return -1; - } - -for (unsigned i = 0; i < hdrsList.size(); i++) +std::list::iterator it(messages.begin()); +while (it != messages.end()) { - - if (hdrsList[i].lastSendTime + hdrsList[i].repeatPeriod * 60 < (unsigned)stgTime) + if (settings->GetMessageTimeout() > 0 && + difftime(stgTime, it->header.creationTime) > settings->GetMessageTimeout()) + { + // Timeout exceeded + if (store->DelMessage(it->header.id, login)) + { + WriteServLog("Error deleting message: '%s'", store->GetStrError().c_str()); + printfd(__FILE__, "Error deleting message: '%s'\n", store->GetStrError().c_str()); + } + messages.erase(it++); + continue; + } + if (it->GetNextSendTime() <= stgTime) { - STG_MSG msg; - if (store->GetMessage(hdrsList[i].id, &msg, login) == 0) + if (SendMessage(*it)) + { + // We need to check all messages in queue for timeout + ++it; + continue; + } + if (it->header.repeat < 0) { - if (SendMessage(msg) == 0) + if (store->DelMessage(it->header.id, login)) { - msg.header.repeat--; - if (msg.header.repeat < 0) - { - printfd(__FILE__, "DelMessage\n"); - store->DelMessage(hdrsList[i].id, login); - } - else - { - #ifndef DEBUG - //TODO: gcc v. 4.x generate ICE on x86_64 - msg.header.lastSendTime = time(NULL); - #else - msg.header.lastSendTime = stgTime; - #endif - if (store->EditMessage(msg, login)) - { - printfd(__FILE__, "EditMessage Error %s\n", store->GetStrError().c_str()); - } - } + WriteServLog("Error deleting message: '%s'", store->GetStrError().c_str()); + printfd(__FILE__, "Error deleting message: '%s'\n", store->GetStrError().c_str()); } + messages.erase(it++); } else { - WriteServLog("Cannot get message for user %s.", login.c_str()); - WriteServLog("%s", store->GetStrError().c_str()); + if (store->EditMessage(*it, login)) + { + WriteServLog("Error modifying message: '%s'", store->GetStrError().c_str()); + printfd(__FILE__, "Error modifying message: '%s'\n", store->GetStrError().c_str()); + } + ++it; } } } -return 0; } //----------------------------------------------------------------------------- //-----------------------------------------------------------------------------