]> git.stg.codes - stg.git/blob - projects/stargazer/plugins/other/smux/smux.cpp
Change map<std::string, USER_PROPERTY_BASE *> on REGISTRY in USER_PROPERTY_LOGGED
[stg.git] / projects / stargazer / plugins / other / smux / smux.cpp
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <arpa/inet.h>
4
5 #include <cstring>
6 #include <cerrno>
7 #include <ctime>
8 #include <csignal>
9 #include <cassert>
10
11 #include <vector>
12 #include <algorithm>
13 #include <iterator>
14 #include <stdexcept>
15 #include <utility>
16
17 #include "stg/common.h"
18 #include "stg/plugin_creator.h"
19
20 #include "smux.h"
21 #include "utils.h"
22
23 namespace
24 {
25 PLUGIN_CREATOR<SMUX> smc;
26
27 bool SPrefixLess(const Sensors::value_type & a,
28                  const Sensors::value_type & b);
29
30 bool SPrefixLess(const Sensors::value_type & a,
31                  const Sensors::value_type & b)
32 {
33 return a.first.PrefixLess(b.first);
34 }
35
36 }
37
38 extern "C" PLUGIN * GetPlugin();
39
40 PLUGIN * GetPlugin()
41 {
42 return smc.GetPlugin();
43 }
44
45 SMUX_SETTINGS::SMUX_SETTINGS()
46     : errorStr(),
47       ip(0),
48       port(0),
49       password()
50 {}
51
52 int SMUX_SETTINGS::ParseSettings(const MODULE_SETTINGS & s)
53 {
54 PARAM_VALUE pv;
55 std::vector<PARAM_VALUE>::const_iterator pvi;
56 int p;
57
58 pv.param = "Port";
59 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
60 if (pvi == s.moduleParams.end())
61     {
62     errorStr = "Parameter \'Port\' not found.";
63     printfd(__FILE__, "Parameter 'Port' not found\n");
64     return -1;
65     }
66 if (ParseIntInRange(pvi->value[0], 2, 65535, &p))
67     {
68     errorStr = "Cannot parse parameter \'Port\': " + errorStr;
69     printfd(__FILE__, "Cannot parse parameter 'Port'\n");
70     return -1;
71     }
72 port = static_cast<uint16_t>(p);
73
74 pv.param = "Password";
75 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
76 if (pvi == s.moduleParams.end())
77     {
78     errorStr = "Parameter \'Password\' not found.";
79     printfd(__FILE__, "Parameter 'Password' not found\n");
80     password = "";
81     }
82 else
83     {
84     password = pvi->value[0];
85     }
86
87 pv.param = "Server";
88 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
89 if (pvi == s.moduleParams.end())
90     {
91     errorStr = "Parameter \'Server\' not found.";
92     printfd(__FILE__, "Parameter 'Server' not found\n");
93     return -1;
94     }
95 ip = inet_strington(pvi->value[0]);
96
97 return 0;
98 }
99
100 SMUX::SMUX()
101     : PLUGIN(),
102       users(NULL),
103       tariffs(NULL),
104       admins(NULL),
105       services(NULL),
106       corporations(NULL),
107       traffcounter(NULL),
108       errorStr(),
109       smuxSettings(),
110       settings(),
111       thread(),
112       mutex(),
113       running(false),
114       stopped(true),
115       needReconnect(false),
116       lastReconnectTry(0),
117       reconnectTimeout(1),
118       sock(-1),
119       smuxHandlers(),
120       pdusHandlers(),
121       sensors(),
122       tables(),
123       notifiers(),
124       addUserNotifier(*this),
125       delUserNotifier(*this),
126       addDelTariffNotifier(*this),
127       logger(GetPluginLogger(GetStgLogger(), "smux"))
128 {
129 pthread_mutex_init(&mutex, NULL);
130
131 smuxHandlers[SMUX_PDUs_PR_close] = &SMUX::CloseHandler;
132 smuxHandlers[SMUX_PDUs_PR_registerResponse] = &SMUX::RegisterResponseHandler;
133 smuxHandlers[SMUX_PDUs_PR_pdus] = &SMUX::PDUsRequestHandler;
134 smuxHandlers[SMUX_PDUs_PR_commitOrRollback] = &SMUX::CommitOrRollbackHandler;
135
136 pdusHandlers[PDUs_PR_get_request] = &SMUX::GetRequestHandler;
137 pdusHandlers[PDUs_PR_get_next_request] = &SMUX::GetNextRequestHandler;
138 pdusHandlers[PDUs_PR_set_request] = &SMUX::SetRequestHandler;
139 }
140
141 SMUX::~SMUX()
142 {
143     {
144     Sensors::iterator it;
145     for (it = sensors.begin(); it != sensors.end(); ++it)
146         delete it->second;
147     }
148     {
149     Tables::iterator it;
150     for (it = tables.begin(); it != tables.end(); ++it)
151         delete it->second;
152     }
153 printfd(__FILE__, "SMUX::~SMUX()\n");
154 pthread_mutex_destroy(&mutex);
155 }
156
157 int SMUX::ParseSettings()
158 {
159 return smuxSettings.ParseSettings(settings);
160 }
161
162 int SMUX::Start()
163 {
164 assert(users != NULL && "users must not be NULL");
165 assert(tariffs != NULL && "tariffs must not be NULL");
166 assert(admins != NULL && "admins must not be NULL");
167 assert(services != NULL && "services must not be NULL");
168 assert(corporations != NULL && "corporations must not be NULL");
169 assert(traffcounter != NULL && "traffcounter must not be NULL");
170
171 if (PrepareNet())
172     needReconnect = true;
173
174 // Users
175 sensors[OID(".1.3.6.1.4.1.38313.1.1.1")] = new TotalUsersSensor(*users);
176 sensors[OID(".1.3.6.1.4.1.38313.1.1.2")] = new ConnectedUsersSensor(*users);
177 sensors[OID(".1.3.6.1.4.1.38313.1.1.3")] = new AuthorizedUsersSensor(*users);
178 sensors[OID(".1.3.6.1.4.1.38313.1.1.4")] = new AlwaysOnlineUsersSensor(*users);
179 sensors[OID(".1.3.6.1.4.1.38313.1.1.5")] = new NoCashUsersSensor(*users);
180 sensors[OID(".1.3.6.1.4.1.38313.1.1.6")] = new DisabledDetailStatsUsersSensor(*users);
181 sensors[OID(".1.3.6.1.4.1.38313.1.1.7")] = new DisabledUsersSensor(*users);
182 sensors[OID(".1.3.6.1.4.1.38313.1.1.8")] = new PassiveUsersSensor(*users);
183 sensors[OID(".1.3.6.1.4.1.38313.1.1.9")] = new CreditUsersSensor(*users);
184 sensors[OID(".1.3.6.1.4.1.38313.1.1.10")] = new FreeMbUsersSensor(*users);
185 sensors[OID(".1.3.6.1.4.1.38313.1.1.11")] = new TariffChangeUsersSensor(*users);
186 sensors[OID(".1.3.6.1.4.1.38313.1.1.12")] = new ActiveUsersSensor(*users);
187 // Tariffs
188 sensors[OID(".1.3.6.1.4.1.38313.1.2.1")] = new TotalTariffsSensor(*tariffs);
189 // Admins
190 sensors[OID(".1.3.6.1.4.1.38313.1.3.1")] = new TotalAdminsSensor(*admins);
191 // Services
192 sensors[OID(".1.3.6.1.4.1.38313.1.4.1")] = new TotalServicesSensor(*services);
193 // Corporations
194 sensors[OID(".1.3.6.1.4.1.38313.1.5.1")] = new TotalCorporationsSensor(*corporations);
195 // Traffcounter
196 sensors[OID(".1.3.6.1.4.1.38313.1.6.1")] = new TotalRulesSensor(*traffcounter);
197
198 // Table data
199 tables[".1.3.6.1.4.1.38313.1.2.2"] = new TariffUsersTable(".1.3.6.1.4.1.38313.1.2.2", *tariffs, *users);
200
201 UpdateTables();
202 SetNotifiers();
203
204 #ifdef SMUX_DEBUG
205 Sensors::const_iterator it(sensors.begin());
206 while (it != sensors.end())
207     {
208     printfd(__FILE__, "%s = %s\n",
209             it->first.ToString().c_str(),
210             it->second->ToString().c_str());
211     ++it;
212     }
213 #endif
214
215 if (!running)
216     {
217     if (pthread_create(&thread, NULL, Runner, this))
218         {
219         errorStr = "Cannot create thread.";
220         logger("Cannot create thread.");
221         printfd(__FILE__, "Cannot create thread\n");
222         return -1;
223         }
224     }
225
226 return 0;
227 }
228
229 int SMUX::Stop()
230 {
231 printfd(__FILE__, "SMUX::Stop() - Before\n");
232 running = false;
233
234 if (!stopped)
235     {
236     //5 seconds to thread stops itself
237     for (int i = 0; i < 25 && !stopped; i++)
238         {
239         struct timespec ts = {0, 200000000};
240         nanosleep(&ts, NULL);
241         }
242     }
243
244 if (stopped)
245     pthread_join(thread, NULL);
246
247 ResetNotifiers();
248
249     {
250     Tables::iterator it;
251     for (it = tables.begin(); it != tables.end(); ++it)
252         delete it->second;
253     }
254     {
255     Sensors::iterator it;
256     for (it = sensors.begin(); it != sensors.end(); ++it)
257         delete it->second;
258     }
259
260 tables.erase(tables.begin(), tables.end());
261 sensors.erase(sensors.begin(), sensors.end());
262
263 close(sock);
264
265 if (!stopped)
266     {
267     running = true;
268     return -1;
269     }
270
271 printfd(__FILE__, "SMUX::Stop() - After\n");
272 return 0;
273 }
274
275 int SMUX::Reload()
276 {
277 if (Stop())
278     return -1;
279 if (Start())
280     return -1;
281 if (!needReconnect)
282     {
283     printfd(__FILE__, "SMUX reconnected succesfully.\n");
284     logger("Reconnected successfully.");
285     }
286 return 0;
287 }
288
289 void * SMUX::Runner(void * d)
290 {
291 sigset_t signalSet;
292 sigfillset(&signalSet);
293 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
294
295 SMUX * smux = static_cast<SMUX *>(d);
296
297 smux->Run();
298
299 return NULL;
300 }
301
302 void SMUX::Run()
303 {
304 stopped = true;
305 if (!SendOpenPDU(sock))
306     needReconnect = true;
307 if (!SendRReqPDU(sock))
308     needReconnect = true;
309 running = true;
310 stopped = false;
311
312 while(running)
313     {
314     if (WaitPackets(sock) && !needReconnect)
315         {
316         SMUX_PDUs_t * pdus = RecvSMUXPDUs(sock);
317         if (pdus)
318             {
319             DispatchPDUs(pdus);
320             ASN_STRUCT_FREE(asn_DEF_SMUX_PDUs, pdus);
321             }
322         else if (running)
323             Reconnect();
324         }
325     else if (running && needReconnect)
326         Reconnect();
327     if (!running)
328         break;
329     }
330 SendClosePDU(sock);
331 stopped = true;
332 }
333
334 bool SMUX::PrepareNet()
335 {
336 sock = socket(AF_INET, SOCK_STREAM, 0);
337
338 if (sock < 0)
339     {
340     errorStr = "Cannot create socket.";
341     logger("Cannot create a socket: %s", strerror(errno));
342     printfd(__FILE__, "Cannot create socket\n");
343     return true;
344     }
345
346 struct sockaddr_in addr;
347
348 addr.sin_family = AF_INET;
349 addr.sin_port = htons(smuxSettings.GetPort());
350 addr.sin_addr.s_addr = smuxSettings.GetIP();
351
352 if (connect(sock, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)))
353     {
354     errorStr = "Cannot connect.";
355     logger("Cannot connect the socket: %s", strerror(errno));
356     printfd(__FILE__, "Cannot connect. Message: '%s'\n", strerror(errno));
357     return true;
358     }
359
360 return false;
361 }
362
363 bool SMUX::Reconnect()
364 {
365 if (needReconnect && difftime(time(NULL), lastReconnectTry) < reconnectTimeout)
366     return true;
367
368 time(&lastReconnectTry);
369 SendClosePDU(sock);
370 close(sock);
371 if (!PrepareNet())
372     if (SendOpenPDU(sock))
373         if (SendRReqPDU(sock))
374             {
375             reconnectTimeout = 1;
376             needReconnect = false;
377             logger("Connected successfully");
378             printfd(__FILE__, "Connected successfully\n");
379             return false;
380             }
381
382 if (needReconnect)
383     if (reconnectTimeout < 60)
384         reconnectTimeout *= 2;
385
386 needReconnect = true;
387 return true;
388 }
389
390 bool SMUX::DispatchPDUs(const SMUX_PDUs_t * pdus)
391 {
392 SMUXHandlers::iterator it(smuxHandlers.find(pdus->present));
393 if (it != smuxHandlers.end())
394     {
395     return (this->*(it->second))(pdus);
396     }
397 #ifdef SMUX_DEBUG
398 else
399     {
400     switch (pdus->present)
401         {
402         case SMUX_PDUs_PR_NOTHING:
403             printfd(__FILE__, "PDUs: nothing\n");
404             break;
405         case SMUX_PDUs_PR_open:
406             printfd(__FILE__, "PDUs: open\n");
407             break;
408         case SMUX_PDUs_PR_registerRequest:
409             printfd(__FILE__, "PDUs: registerRequest\n");
410             break;
411         default:
412             printfd(__FILE__, "PDUs: undefined\n");
413         }
414     asn_fprint(stderr, &asn_DEF_SMUX_PDUs, pdus);
415     }
416 #endif
417 return false;
418 }
419
420 bool SMUX::UpdateTables()
421 {
422 Sensors newSensors;
423 bool done = true;
424 Tables::iterator it(tables.begin());
425 while (it != tables.end())
426     {
427     try
428         {
429         it->second->UpdateSensors(newSensors);
430         }
431     catch (const std::runtime_error & ex)
432         {
433         printfd(__FILE__,
434                 "SMUX::UpdateTables - failed to update table '%s': '%s'\n",
435                 it->first.c_str(), ex.what());
436         done = false;
437         break;
438         }
439     ++it;
440     }
441 if (!done)
442     {
443     Sensors::iterator it(newSensors.begin());
444     while (it != newSensors.end())
445         {
446         delete it->second;
447         ++it;
448         }
449     return false;
450     }
451
452 it = tables.begin();
453 while (it != tables.end())
454     {
455     std::pair<Sensors::iterator, Sensors::iterator> res;
456     res = std::equal_range(sensors.begin(),
457                            sensors.end(),
458                            std::pair<OID, Sensor *>(OID(it->first), NULL),
459                            SPrefixLess);
460     Sensors::iterator sit(res.first);
461     while (sit != res.second)
462         {
463         delete sit->second;
464         ++sit;
465         }
466     sensors.erase(res.first, res.second);
467     ++it;
468     }
469
470 sensors.insert(newSensors.begin(), newSensors.end());
471
472 return true;
473 }
474
475 void SMUX::SetNotifier(USER_PTR userPtr)
476 {
477 notifiers.push_back(CHG_AFTER_NOTIFIER(*this, userPtr));
478 userPtr->GetProperty().tariffName.AddAfterNotifier(&notifiers.back());
479 }
480
481 void SMUX::UnsetNotifier(USER_PTR userPtr)
482 {
483 std::list<CHG_AFTER_NOTIFIER>::iterator it = notifiers.begin();
484 while (it != notifiers.end())
485     {
486     if (it->GetUserPtr() == userPtr)
487         {
488         userPtr->GetProperty().tariffName.DelAfterNotifier(&(*it));
489         notifiers.erase(it);
490         break;
491         }
492     ++it;
493     }
494 }
495
496 void SMUX::SetNotifiers()
497 {
498 int h = users->OpenSearch();
499 assert(h && "USERS::OpenSearch is always correct");
500
501 USER_PTR u;
502 while (!users->SearchNext(h, &u))
503     SetNotifier(u);
504
505 users->CloseSearch(h);
506
507 users->AddNotifierUserAdd(&addUserNotifier);
508 users->AddNotifierUserDel(&delUserNotifier);
509
510 tariffs->AddNotifierAdd(&addDelTariffNotifier);
511 tariffs->AddNotifierDel(&addDelTariffNotifier);
512 }
513
514 void SMUX::ResetNotifiers()
515 {
516 tariffs->DelNotifierDel(&addDelTariffNotifier);
517 tariffs->DelNotifierAdd(&addDelTariffNotifier);
518
519 users->DelNotifierUserDel(&delUserNotifier);
520 users->DelNotifierUserAdd(&addUserNotifier);
521
522 std::list<CHG_AFTER_NOTIFIER>::iterator it(notifiers.begin());
523 while (it != notifiers.end())
524     {
525     it->GetUserPtr()->GetProperty().tariffName.DelAfterNotifier(&(*it));
526     ++it;
527     }
528 notifiers.clear();
529 }