]> git.stg.codes - stg.git/blob - projects/stargazer/plugins/other/smux/smux.cpp
Massive refactoring.
[stg.git] / projects / stargazer / plugins / other / smux / smux.cpp
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <arpa/inet.h>
4
5 #include <cstring>
6 #include <cerrno>
7 #include <ctime>
8 #include <csignal>
9 #include <cassert>
10
11 #include <vector>
12 #include <algorithm>
13 #include <iterator>
14 #include <stdexcept>
15 #include <utility>
16
17 #include "stg/common.h"
18 #include "stg/plugin_creator.h"
19
20 #include "smux.h"
21 #include "utils.h"
22
23 namespace
24 {
25 PLUGIN_CREATOR<SMUX> smc;
26
27 bool SPrefixLess(const Sensors::value_type & a,
28                  const Sensors::value_type & b);
29
30 bool SPrefixLess(const Sensors::value_type & a,
31                  const Sensors::value_type & b)
32 {
33 return a.first.PrefixLess(b.first);
34 }
35
36 }
37
38 extern "C" PLUGIN * GetPlugin();
39
40 PLUGIN * GetPlugin()
41 {
42 return smc.GetPlugin();
43 }
44
45 SMUX_SETTINGS::SMUX_SETTINGS()
46     : errorStr(),
47       ip(0),
48       port(0),
49       password()
50 {}
51
52 int SMUX_SETTINGS::ParseSettings(const MODULE_SETTINGS & s)
53 {
54 PARAM_VALUE pv;
55 std::vector<PARAM_VALUE>::const_iterator pvi;
56 int p;
57
58 pv.param = "Port";
59 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
60 if (pvi == s.moduleParams.end())
61     {
62     errorStr = "Parameter \'Port\' not found.";
63     printfd(__FILE__, "Parameter 'Port' not found\n");
64     return -1;
65     }
66 if (ParseIntInRange(pvi->value[0], 2, 65535, &p))
67     {
68     errorStr = "Cannot parse parameter \'Port\': " + errorStr;
69     printfd(__FILE__, "Cannot parse parameter 'Port'\n");
70     return -1;
71     }
72 port = static_cast<uint16_t>(p);
73
74 pv.param = "Password";
75 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
76 if (pvi == s.moduleParams.end())
77     {
78     errorStr = "Parameter \'Password\' not found.";
79     printfd(__FILE__, "Parameter 'Password' not found\n");
80     password = "";
81     }
82 else
83     {
84     password = pvi->value[0];
85     }
86
87 pv.param = "Server";
88 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
89 if (pvi == s.moduleParams.end())
90     {
91     errorStr = "Parameter \'Server\' not found.";
92     printfd(__FILE__, "Parameter 'Server' not found\n");
93     return -1;
94     }
95 ip = inet_strington(pvi->value[0]);
96
97 return 0;
98 }
99
100 SMUX::SMUX()
101     : PLUGIN(),
102       users(NULL),
103       tariffs(NULL),
104       admins(NULL),
105       services(NULL),
106       corporations(NULL),
107       traffcounter(NULL),
108       errorStr(),
109       smuxSettings(),
110       settings(),
111       thread(),
112       mutex(),
113       running(false),
114       stopped(true),
115       needReconnect(false),
116       lastReconnectTry(0),
117       reconnectTimeout(1),
118       sock(-1),
119       smuxHandlers(),
120       pdusHandlers(),
121       sensors(),
122       tables(),
123       notifiers(),
124       addUserNotifier(*this),
125       delUserNotifier(*this),
126       addDelTariffNotifier(*this),
127       logger(GetPluginLogger(GetStgLogger(), "smux"))
128 {
129 pthread_mutex_init(&mutex, NULL);
130
131 smuxHandlers[SMUX_PDUs_PR_close] = &SMUX::CloseHandler;
132 smuxHandlers[SMUX_PDUs_PR_registerResponse] = &SMUX::RegisterResponseHandler;
133 smuxHandlers[SMUX_PDUs_PR_pdus] = &SMUX::PDUsRequestHandler;
134 smuxHandlers[SMUX_PDUs_PR_commitOrRollback] = &SMUX::CommitOrRollbackHandler;
135
136 pdusHandlers[PDUs_PR_get_request] = &SMUX::GetRequestHandler;
137 pdusHandlers[PDUs_PR_get_next_request] = &SMUX::GetNextRequestHandler;
138 pdusHandlers[PDUs_PR_set_request] = &SMUX::SetRequestHandler;
139 }
140
141 SMUX::~SMUX()
142 {
143     {
144     Sensors::iterator it;
145     for (it = sensors.begin(); it != sensors.end(); ++it)
146         delete it->second;
147     }
148     {
149     Tables::iterator it;
150     for (it = tables.begin(); it != tables.end(); ++it)
151         delete it->second;
152     }
153 printfd(__FILE__, "SMUX::~SMUX()\n");
154 pthread_mutex_destroy(&mutex);
155 }
156
157 int SMUX::ParseSettings()
158 {
159 return smuxSettings.ParseSettings(settings);
160 }
161
162 int SMUX::Start()
163 {
164 assert(users != NULL && "users must not be NULL");
165 assert(tariffs != NULL && "tariffs must not be NULL");
166 assert(admins != NULL && "admins must not be NULL");
167 assert(services != NULL && "services must not be NULL");
168 assert(corporations != NULL && "corporations must not be NULL");
169 assert(traffcounter != NULL && "traffcounter must not be NULL");
170
171 if (PrepareNet())
172     needReconnect = true;
173
174 // Users
175 sensors[OID(".1.3.6.1.4.1.38313.1.1.1")] = new TotalUsersSensor(*users);
176 sensors[OID(".1.3.6.1.4.1.38313.1.1.2")] = new ConnectedUsersSensor(*users);
177 sensors[OID(".1.3.6.1.4.1.38313.1.1.3")] = new AuthorizedUsersSensor(*users);
178 sensors[OID(".1.3.6.1.4.1.38313.1.1.4")] = new AlwaysOnlineUsersSensor(*users);
179 sensors[OID(".1.3.6.1.4.1.38313.1.1.5")] = new NoCashUsersSensor(*users);
180 sensors[OID(".1.3.6.1.4.1.38313.1.1.6")] = new DisabledDetailStatsUsersSensor(*users);
181 sensors[OID(".1.3.6.1.4.1.38313.1.1.7")] = new DisabledUsersSensor(*users);
182 sensors[OID(".1.3.6.1.4.1.38313.1.1.8")] = new PassiveUsersSensor(*users);
183 sensors[OID(".1.3.6.1.4.1.38313.1.1.9")] = new CreditUsersSensor(*users);
184 sensors[OID(".1.3.6.1.4.1.38313.1.1.10")] = new FreeMbUsersSensor(*users);
185 sensors[OID(".1.3.6.1.4.1.38313.1.1.11")] = new TariffChangeUsersSensor(*users);
186 sensors[OID(".1.3.6.1.4.1.38313.1.1.12")] = new ActiveUsersSensor(*users);
187 // Tariffs
188 sensors[OID(".1.3.6.1.4.1.38313.1.2.1")] = new TotalTariffsSensor(*tariffs);
189 // Admins
190 sensors[OID(".1.3.6.1.4.1.38313.1.3.1")] = new TotalAdminsSensor(*admins);
191 // Services
192 sensors[OID(".1.3.6.1.4.1.38313.1.4.1")] = new TotalServicesSensor(*services);
193 // Corporations
194 sensors[OID(".1.3.6.1.4.1.38313.1.5.1")] = new TotalCorporationsSensor(*corporations);
195 // Traffcounter
196 sensors[OID(".1.3.6.1.4.1.38313.1.6.1")] = new TotalRulesSensor(*traffcounter);
197
198 // Table data
199 tables[".1.3.6.1.4.1.38313.1.2.2"] = new TariffUsersTable(".1.3.6.1.4.1.38313.1.2.2", *tariffs, *users);
200
201 UpdateTables();
202 SetNotifiers();
203
204 #ifdef SMUX_DEBUG
205 Sensors::const_iterator it(sensors.begin());
206 while (it != sensors.end())
207     {
208     printfd(__FILE__, "%s = %s\n",
209             it->first.ToString().c_str(),
210             it->second->ToString().c_str());
211     ++it;
212     }
213 #endif
214
215 if (!running)
216     {
217     if (pthread_create(&thread, NULL, Runner, this))
218         {
219         errorStr = "Cannot create thread.";
220         logger("Cannot create thread.");
221         printfd(__FILE__, "Cannot create thread\n");
222         return -1;
223         }
224     }
225
226 return 0;
227 }
228
229 int SMUX::Stop()
230 {
231 printfd(__FILE__, "SMUX::Stop() - Before\n");
232 running = false;
233
234 if (!stopped)
235     {
236     //5 seconds to thread stops itself
237     for (int i = 0; i < 25 && !stopped; i++)
238         {
239         struct timespec ts = {0, 200000000};
240         nanosleep(&ts, NULL);
241         }
242     }
243
244 if (stopped)
245     pthread_join(thread, NULL);
246
247 ResetNotifiers();
248
249     {
250     Tables::iterator it;
251     for (it = tables.begin(); it != tables.end(); ++it)
252         delete it->second;
253     }
254     {
255     Sensors::iterator it;
256     for (it = sensors.begin(); it != sensors.end(); ++it)
257         delete it->second;
258     }
259
260 tables.erase(tables.begin(), tables.end());
261 sensors.erase(sensors.begin(), sensors.end());
262
263 close(sock);
264
265 if (!stopped)
266     {
267     running = true;
268     return -1;
269     }
270
271 printfd(__FILE__, "SMUX::Stop() - After\n");
272 return 0;
273 }
274
275 int SMUX::Reload()
276 {
277 if (Stop())
278     return -1;
279 if (Start())
280     return -1;
281 return 0;
282 }
283
284 void * SMUX::Runner(void * d)
285 {
286 sigset_t signalSet;
287 sigfillset(&signalSet);
288 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
289
290 SMUX * smux = static_cast<SMUX *>(d);
291
292 smux->Run();
293
294 return NULL;
295 }
296
297 void SMUX::Run()
298 {
299 stopped = true;
300 if (!SendOpenPDU(sock))
301     needReconnect = true;
302 if (!SendRReqPDU(sock))
303     needReconnect = true;
304 running = true;
305 stopped = false;
306
307 while(running)
308     {
309     if (WaitPackets(sock) && !needReconnect)
310         {
311         SMUX_PDUs_t * pdus = RecvSMUXPDUs(sock);
312         if (pdus)
313             {
314             DispatchPDUs(pdus);
315             ASN_STRUCT_FREE(asn_DEF_SMUX_PDUs, pdus);
316             }
317         else if (running)
318             Reconnect();
319         }
320     else if (running && needReconnect)
321         Reconnect();
322     if (!running)
323         break;
324     }
325 SendClosePDU(sock);
326 stopped = true;
327 }
328
329 bool SMUX::PrepareNet()
330 {
331 sock = socket(AF_INET, SOCK_STREAM, 0);
332
333 if (sock < 0)
334     {
335     errorStr = "Cannot create socket.";
336     logger("Cannot create a socket: %s", strerror(errno));
337     printfd(__FILE__, "Cannot create socket\n");
338     return true;
339     }
340
341 struct sockaddr_in addr;
342
343 addr.sin_family = AF_INET;
344 addr.sin_port = htons(smuxSettings.GetPort());
345 addr.sin_addr.s_addr = smuxSettings.GetIP();
346
347 if (connect(sock, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)))
348     {
349     errorStr = "Cannot connect.";
350     logger("Cannot connect the socket: %s", strerror(errno));
351     printfd(__FILE__, "Cannot connect. Message: '%s'\n", strerror(errno));
352     return true;
353     }
354
355 return false;
356 }
357
358 bool SMUX::Reconnect()
359 {
360 if (needReconnect && difftime(time(NULL), lastReconnectTry) < reconnectTimeout)
361     return true;
362
363 time(&lastReconnectTry);
364 SendClosePDU(sock);
365 close(sock);
366 if (!PrepareNet())
367     if (SendOpenPDU(sock))
368         if (SendRReqPDU(sock))
369             {
370             reconnectTimeout = 1;
371             needReconnect = false;
372             logger("Connected successfully");
373             printfd(__FILE__, "Connected successfully\n");
374             return false;
375             }
376
377 if (needReconnect)
378     if (reconnectTimeout < 60)
379         reconnectTimeout *= 2;
380
381 needReconnect = true;
382 return true;
383 }
384
385 bool SMUX::DispatchPDUs(const SMUX_PDUs_t * pdus)
386 {
387 SMUXHandlers::iterator it(smuxHandlers.find(pdus->present));
388 if (it != smuxHandlers.end())
389     {
390     return (this->*(it->second))(pdus);
391     }
392 #ifdef SMUX_DEBUG
393 else
394     {
395     switch (pdus->present)
396         {
397         case SMUX_PDUs_PR_NOTHING:
398             printfd(__FILE__, "PDUs: nothing\n");
399             break;
400         case SMUX_PDUs_PR_open:
401             printfd(__FILE__, "PDUs: open\n");
402             break;
403         case SMUX_PDUs_PR_registerRequest:
404             printfd(__FILE__, "PDUs: registerRequest\n");
405             break;
406         default:
407             printfd(__FILE__, "PDUs: undefined\n");
408         }
409     asn_fprint(stderr, &asn_DEF_SMUX_PDUs, pdus);
410     }
411 #endif
412 return false;
413 }
414
415 bool SMUX::UpdateTables()
416 {
417 Sensors newSensors;
418 bool done = true;
419 Tables::iterator it(tables.begin());
420 while (it != tables.end())
421     {
422     try
423         {
424         it->second->UpdateSensors(newSensors);
425         }
426     catch (const std::runtime_error & ex)
427         {
428         printfd(__FILE__,
429                 "SMUX::UpdateTables - failed to update table '%s': '%s'\n",
430                 it->first.c_str(), ex.what());
431         done = false;
432         break;
433         }
434     ++it;
435     }
436 if (!done)
437     {
438     Sensors::iterator it(newSensors.begin());
439     while (it != newSensors.end())
440         {
441         delete it->second;
442         ++it;
443         }
444     return false;
445     }
446
447 it = tables.begin();
448 while (it != tables.end())
449     {
450     std::pair<Sensors::iterator, Sensors::iterator> res;
451     res = std::equal_range(sensors.begin(),
452                            sensors.end(),
453                            std::pair<OID, Sensor *>(OID(it->first), NULL),
454                            SPrefixLess);
455     Sensors::iterator sit(res.first);
456     while (sit != res.second)
457         {
458         delete sit->second;
459         ++sit;
460         }
461     sensors.erase(res.first, res.second);
462     ++it;
463     }
464
465 sensors.insert(newSensors.begin(), newSensors.end());
466
467 return true;
468 }
469
470 void SMUX::SetNotifier(USER_PTR userPtr)
471 {
472 notifiers.push_back(CHG_AFTER_NOTIFIER(*this, userPtr));
473 userPtr->GetProperty().tariffName.AddAfterNotifier(&notifiers.back());
474 }
475
476 void SMUX::UnsetNotifier(USER_PTR userPtr)
477 {
478 std::list<CHG_AFTER_NOTIFIER>::iterator it = notifiers.begin();
479 while (it != notifiers.end())
480     {
481     if (it->GetUserPtr() == userPtr)
482         {
483         userPtr->GetProperty().tariffName.DelAfterNotifier(&(*it));
484         notifiers.erase(it);
485         break;
486         }
487     ++it;
488     }
489 }
490
491 void SMUX::SetNotifiers()
492 {
493 int h = users->OpenSearch();
494 assert(h && "USERS::OpenSearch is always correct");
495
496 USER_PTR u;
497 while (!users->SearchNext(h, &u))
498     SetNotifier(u);
499
500 users->CloseSearch(h);
501
502 users->AddNotifierUserAdd(&addUserNotifier);
503 users->AddNotifierUserDel(&delUserNotifier);
504
505 tariffs->AddNotifierAdd(&addDelTariffNotifier);
506 tariffs->AddNotifierDel(&addDelTariffNotifier);
507 }
508
509 void SMUX::ResetNotifiers()
510 {
511 tariffs->DelNotifierDel(&addDelTariffNotifier);
512 tariffs->DelNotifierAdd(&addDelTariffNotifier);
513
514 users->DelNotifierUserDel(&delUserNotifier);
515 users->DelNotifierUserAdd(&addUserNotifier);
516
517 std::list<CHG_AFTER_NOTIFIER>::iterator it(notifiers.begin());
518 while (it != notifiers.end())
519     {
520     it->GetUserPtr()->GetProperty().tariffName.DelAfterNotifier(&(*it));
521     ++it;
522     }
523 notifiers.clear();
524 }