]> git.stg.codes - stg.git/blob - projects/stargazer/plugins/other/smux/smux.cpp
Ticket 37. The 'ALTER TABLE tariffs' query added for the 'change_policy'
[stg.git] / projects / stargazer / plugins / other / smux / smux.cpp
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <arpa/inet.h>
4
5 #include <cstring>
6 #include <cerrno>
7 #include <ctime>
8 #include <csignal>
9 #include <cassert>
10
11 #include <vector>
12 #include <algorithm>
13 #include <iterator>
14 #include <stdexcept>
15 #include <utility>
16
17 #include "stg/common.h"
18 #include "stg/plugin_creator.h"
19
20 #include "smux.h"
21 #include "utils.h"
22
23 namespace
24 {
25 PLUGIN_CREATOR<SMUX> smc;
26
27 bool SPrefixLess(const Sensors::value_type & a,
28                  const Sensors::value_type & b)
29 {
30 return a.first.PrefixLess(b.first);
31 }
32
33 }
34
35 extern "C" PLUGIN * GetPlugin();
36
37 PLUGIN * GetPlugin()
38 {
39 return smc.GetPlugin();
40 }
41
42 SMUX_SETTINGS::SMUX_SETTINGS()
43     : errorStr(),
44       ip(0),
45       port(0),
46       password()
47 {}
48
49 int SMUX_SETTINGS::ParseSettings(const MODULE_SETTINGS & s)
50 {
51 PARAM_VALUE pv;
52 std::vector<PARAM_VALUE>::const_iterator pvi;
53 int p;
54
55 pv.param = "Port";
56 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
57 if (pvi == s.moduleParams.end() || pvi->value.empty())
58     {
59     errorStr = "Parameter \'Port\' not found.";
60     printfd(__FILE__, "Parameter 'Port' not found\n");
61     return -1;
62     }
63 if (ParseIntInRange(pvi->value[0], 2, 65535, &p))
64     {
65     errorStr = "Cannot parse parameter \'Port\': " + errorStr;
66     printfd(__FILE__, "Cannot parse parameter 'Port'\n");
67     return -1;
68     }
69 port = static_cast<uint16_t>(p);
70
71 pv.param = "Password";
72 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
73 if (pvi == s.moduleParams.end() || pvi->value.empty())
74     {
75     errorStr = "Parameter \'Password\' not found.";
76     printfd(__FILE__, "Parameter 'Password' not found\n");
77     password = "";
78     }
79 else
80     {
81     password = pvi->value[0];
82     }
83
84 pv.param = "Server";
85 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
86 if (pvi == s.moduleParams.end() || pvi->value.empty())
87     {
88     errorStr = "Parameter \'Server\' not found.";
89     printfd(__FILE__, "Parameter 'Server' not found\n");
90     return -1;
91     }
92 ip = inet_strington(pvi->value[0]);
93
94 return 0;
95 }
96
97 SMUX::SMUX()
98     : PLUGIN(),
99       users(NULL),
100       tariffs(NULL),
101       admins(NULL),
102       services(NULL),
103       corporations(NULL),
104       traffcounter(NULL),
105       errorStr(),
106       smuxSettings(),
107       settings(),
108       thread(),
109       mutex(),
110       running(false),
111       stopped(true),
112       needReconnect(false),
113       lastReconnectTry(0),
114       reconnectTimeout(1),
115       sock(-1),
116       smuxHandlers(),
117       pdusHandlers(),
118       sensors(),
119       tables(),
120       notifiers(),
121       addUserNotifier(*this),
122       delUserNotifier(*this),
123       addDelTariffNotifier(*this),
124       logger(GetPluginLogger(GetStgLogger(), "smux"))
125 {
126 pthread_mutex_init(&mutex, NULL);
127
128 smuxHandlers[SMUX_PDUs_PR_close] = &SMUX::CloseHandler;
129 smuxHandlers[SMUX_PDUs_PR_registerResponse] = &SMUX::RegisterResponseHandler;
130 smuxHandlers[SMUX_PDUs_PR_pdus] = &SMUX::PDUsRequestHandler;
131 smuxHandlers[SMUX_PDUs_PR_commitOrRollback] = &SMUX::CommitOrRollbackHandler;
132
133 pdusHandlers[PDUs_PR_get_request] = &SMUX::GetRequestHandler;
134 pdusHandlers[PDUs_PR_get_next_request] = &SMUX::GetNextRequestHandler;
135 pdusHandlers[PDUs_PR_set_request] = &SMUX::SetRequestHandler;
136 }
137
138 SMUX::~SMUX()
139 {
140     {
141     Sensors::iterator it;
142     for (it = sensors.begin(); it != sensors.end(); ++it)
143         delete it->second;
144     }
145     {
146     Tables::iterator it;
147     for (it = tables.begin(); it != tables.end(); ++it)
148         delete it->second;
149     }
150 printfd(__FILE__, "SMUX::~SMUX()\n");
151 pthread_mutex_destroy(&mutex);
152 }
153
154 int SMUX::ParseSettings()
155 {
156 return smuxSettings.ParseSettings(settings);
157 }
158
159 int SMUX::Start()
160 {
161 assert(users != NULL && "users must not be NULL");
162 assert(tariffs != NULL && "tariffs must not be NULL");
163 assert(admins != NULL && "admins must not be NULL");
164 assert(services != NULL && "services must not be NULL");
165 assert(corporations != NULL && "corporations must not be NULL");
166 assert(traffcounter != NULL && "traffcounter must not be NULL");
167
168 if (PrepareNet())
169     needReconnect = true;
170
171 // Users
172 sensors[OID(".1.3.6.1.4.1.38313.1.1.1")] = new TotalUsersSensor(*users);
173 sensors[OID(".1.3.6.1.4.1.38313.1.1.2")] = new ConnectedUsersSensor(*users);
174 sensors[OID(".1.3.6.1.4.1.38313.1.1.3")] = new AuthorizedUsersSensor(*users);
175 sensors[OID(".1.3.6.1.4.1.38313.1.1.4")] = new AlwaysOnlineUsersSensor(*users);
176 sensors[OID(".1.3.6.1.4.1.38313.1.1.5")] = new NoCashUsersSensor(*users);
177 sensors[OID(".1.3.6.1.4.1.38313.1.1.6")] = new DisabledDetailStatsUsersSensor(*users);
178 sensors[OID(".1.3.6.1.4.1.38313.1.1.7")] = new DisabledUsersSensor(*users);
179 sensors[OID(".1.3.6.1.4.1.38313.1.1.8")] = new PassiveUsersSensor(*users);
180 sensors[OID(".1.3.6.1.4.1.38313.1.1.9")] = new CreditUsersSensor(*users);
181 sensors[OID(".1.3.6.1.4.1.38313.1.1.10")] = new FreeMbUsersSensor(*users);
182 sensors[OID(".1.3.6.1.4.1.38313.1.1.11")] = new TariffChangeUsersSensor(*users);
183 sensors[OID(".1.3.6.1.4.1.38313.1.1.12")] = new ActiveUsersSensor(*users);
184 // Tariffs
185 sensors[OID(".1.3.6.1.4.1.38313.1.2.1")] = new TotalTariffsSensor(*tariffs);
186 // Admins
187 sensors[OID(".1.3.6.1.4.1.38313.1.3.1")] = new TotalAdminsSensor(*admins);
188 // Services
189 sensors[OID(".1.3.6.1.4.1.38313.1.4.1")] = new TotalServicesSensor(*services);
190 // Corporations
191 sensors[OID(".1.3.6.1.4.1.38313.1.5.1")] = new TotalCorporationsSensor(*corporations);
192 // Traffcounter
193 sensors[OID(".1.3.6.1.4.1.38313.1.6.1")] = new TotalRulesSensor(*traffcounter);
194
195 // Table data
196 tables[".1.3.6.1.4.1.38313.1.2.2"] = new TariffUsersTable(".1.3.6.1.4.1.38313.1.2.2", *tariffs, *users);
197
198 UpdateTables();
199 SetNotifiers();
200
201 #ifdef SMUX_DEBUG
202 Sensors::const_iterator it(sensors.begin());
203 while (it != sensors.end())
204     {
205     printfd(__FILE__, "%s = %s\n",
206             it->first.ToString().c_str(),
207             it->second->ToString().c_str());
208     ++it;
209     }
210 #endif
211
212 if (!running)
213     {
214     if (pthread_create(&thread, NULL, Runner, this))
215         {
216         errorStr = "Cannot create thread.";
217         logger("Cannot create thread.");
218         printfd(__FILE__, "Cannot create thread\n");
219         return -1;
220         }
221     }
222
223 return 0;
224 }
225
226 int SMUX::Stop()
227 {
228 printfd(__FILE__, "SMUX::Stop() - Before\n");
229 running = false;
230
231 if (!stopped)
232     {
233     //5 seconds to thread stops itself
234     for (int i = 0; i < 25 && !stopped; i++)
235         {
236         struct timespec ts = {0, 200000000};
237         nanosleep(&ts, NULL);
238         }
239     }
240
241 if (stopped)
242     pthread_join(thread, NULL);
243
244 ResetNotifiers();
245
246     {
247     Tables::iterator it;
248     for (it = tables.begin(); it != tables.end(); ++it)
249         delete it->second;
250     }
251     {
252     Sensors::iterator it;
253     for (it = sensors.begin(); it != sensors.end(); ++it)
254         delete it->second;
255     }
256
257 tables.erase(tables.begin(), tables.end());
258 sensors.erase(sensors.begin(), sensors.end());
259
260 close(sock);
261
262 if (!stopped)
263     {
264     running = true;
265     return -1;
266     }
267
268 printfd(__FILE__, "SMUX::Stop() - After\n");
269 return 0;
270 }
271
272 int SMUX::Reload(const MODULE_SETTINGS & /*ms*/)
273 {
274 if (Stop())
275     return -1;
276 if (Start())
277     return -1;
278 if (!needReconnect)
279     {
280     printfd(__FILE__, "SMUX reconnected succesfully.\n");
281     logger("Reconnected successfully.");
282     }
283 return 0;
284 }
285
286 void * SMUX::Runner(void * d)
287 {
288 sigset_t signalSet;
289 sigfillset(&signalSet);
290 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
291
292 SMUX * smux = static_cast<SMUX *>(d);
293
294 smux->Run();
295
296 return NULL;
297 }
298
299 void SMUX::Run()
300 {
301 stopped = true;
302 if (!SendOpenPDU(sock))
303     needReconnect = true;
304 if (!SendRReqPDU(sock))
305     needReconnect = true;
306 running = true;
307 stopped = false;
308
309 while(running)
310     {
311     if (WaitPackets(sock) && !needReconnect)
312         {
313         SMUX_PDUs_t * pdus = RecvSMUXPDUs(sock);
314         if (pdus)
315             {
316             DispatchPDUs(pdus);
317             ASN_STRUCT_FREE(asn_DEF_SMUX_PDUs, pdus);
318             }
319         else if (running)
320             Reconnect();
321         }
322     else if (running && needReconnect)
323         Reconnect();
324     if (!running)
325         break;
326     }
327 SendClosePDU(sock);
328 stopped = true;
329 }
330
331 bool SMUX::PrepareNet()
332 {
333 sock = socket(AF_INET, SOCK_STREAM, 0);
334
335 if (sock < 0)
336     {
337     errorStr = "Cannot create socket.";
338     logger("Cannot create a socket: %s", strerror(errno));
339     printfd(__FILE__, "Cannot create socket\n");
340     return true;
341     }
342
343 struct sockaddr_in addr;
344
345 addr.sin_family = AF_INET;
346 addr.sin_port = htons(smuxSettings.GetPort());
347 addr.sin_addr.s_addr = smuxSettings.GetIP();
348
349 if (connect(sock, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)))
350     {
351     errorStr = "Cannot connect.";
352     logger("Cannot connect the socket: %s", strerror(errno));
353     printfd(__FILE__, "Cannot connect. Message: '%s'\n", strerror(errno));
354     return true;
355     }
356
357 return false;
358 }
359
360 bool SMUX::Reconnect()
361 {
362 if (needReconnect && difftime(time(NULL), lastReconnectTry) < reconnectTimeout)
363     return true;
364
365 time(&lastReconnectTry);
366 SendClosePDU(sock);
367 close(sock);
368 if (!PrepareNet())
369     if (SendOpenPDU(sock))
370         if (SendRReqPDU(sock))
371             {
372             reconnectTimeout = 1;
373             needReconnect = false;
374             logger("Connected successfully");
375             printfd(__FILE__, "Connected successfully\n");
376             return false;
377             }
378
379 if (needReconnect)
380     if (reconnectTimeout < 60)
381         reconnectTimeout *= 2;
382
383 needReconnect = true;
384 return true;
385 }
386
387 bool SMUX::DispatchPDUs(const SMUX_PDUs_t * pdus)
388 {
389 SMUXHandlers::iterator it(smuxHandlers.find(pdus->present));
390 if (it != smuxHandlers.end())
391     {
392     return (this->*(it->second))(pdus);
393     }
394 #ifdef SMUX_DEBUG
395 else
396     {
397     switch (pdus->present)
398         {
399         case SMUX_PDUs_PR_NOTHING:
400             printfd(__FILE__, "PDUs: nothing\n");
401             break;
402         case SMUX_PDUs_PR_open:
403             printfd(__FILE__, "PDUs: open\n");
404             break;
405         case SMUX_PDUs_PR_registerRequest:
406             printfd(__FILE__, "PDUs: registerRequest\n");
407             break;
408         default:
409             printfd(__FILE__, "PDUs: undefined\n");
410         }
411     asn_fprint(stderr, &asn_DEF_SMUX_PDUs, pdus);
412     }
413 #endif
414 return false;
415 }
416
417 bool SMUX::UpdateTables()
418 {
419 Sensors newSensors;
420 bool done = true;
421 Tables::iterator it(tables.begin());
422 while (it != tables.end())
423     {
424     try
425         {
426         it->second->UpdateSensors(newSensors);
427         }
428     catch (const std::runtime_error & ex)
429         {
430         printfd(__FILE__,
431                 "SMUX::UpdateTables - failed to update table '%s': '%s'\n",
432                 it->first.c_str(), ex.what());
433         done = false;
434         break;
435         }
436     ++it;
437     }
438 if (!done)
439     {
440     Sensors::iterator it(newSensors.begin());
441     while (it != newSensors.end())
442         {
443         delete it->second;
444         ++it;
445         }
446     return false;
447     }
448
449 it = tables.begin();
450 while (it != tables.end())
451     {
452     std::pair<Sensors::iterator, Sensors::iterator> res;
453     res = std::equal_range(sensors.begin(),
454                            sensors.end(),
455                            std::pair<OID, Sensor *>(OID(it->first), NULL),
456                            SPrefixLess);
457     Sensors::iterator sit(res.first);
458     while (sit != res.second)
459         {
460         delete sit->second;
461         ++sit;
462         }
463     sensors.erase(res.first, res.second);
464     ++it;
465     }
466
467 sensors.insert(newSensors.begin(), newSensors.end());
468
469 return true;
470 }
471
472 void SMUX::SetNotifier(USER_PTR userPtr)
473 {
474 notifiers.push_back(CHG_AFTER_NOTIFIER(*this, userPtr));
475 userPtr->GetProperty().tariffName.AddAfterNotifier(&notifiers.back());
476 }
477
478 void SMUX::UnsetNotifier(USER_PTR userPtr)
479 {
480 std::list<CHG_AFTER_NOTIFIER>::iterator it = notifiers.begin();
481 while (it != notifiers.end())
482     {
483     if (it->GetUserPtr() == userPtr)
484         {
485         userPtr->GetProperty().tariffName.DelAfterNotifier(&(*it));
486         notifiers.erase(it);
487         break;
488         }
489     ++it;
490     }
491 }
492
493 void SMUX::SetNotifiers()
494 {
495 int h = users->OpenSearch();
496 assert(h && "USERS::OpenSearch is always correct");
497
498 USER_PTR u;
499 while (!users->SearchNext(h, &u))
500     SetNotifier(u);
501
502 users->CloseSearch(h);
503
504 users->AddNotifierUserAdd(&addUserNotifier);
505 users->AddNotifierUserDel(&delUserNotifier);
506
507 tariffs->AddNotifierAdd(&addDelTariffNotifier);
508 tariffs->AddNotifierDel(&addDelTariffNotifier);
509 }
510
511 void SMUX::ResetNotifiers()
512 {
513 tariffs->DelNotifierDel(&addDelTariffNotifier);
514 tariffs->DelNotifierAdd(&addDelTariffNotifier);
515
516 users->DelNotifierUserDel(&delUserNotifier);
517 users->DelNotifierUserAdd(&addUserNotifier);
518
519 std::list<CHG_AFTER_NOTIFIER>::iterator it(notifiers.begin());
520 while (it != notifiers.end())
521     {
522     it->GetUserPtr()->GetProperty().tariffName.DelAfterNotifier(&(*it));
523     ++it;
524     }
525 notifiers.clear();
526 }