]> git.stg.codes - stg.git/blob - projects/stargazer/plugins/other/smux/smux.cpp
c5d5132b6450c1dd4bd1f3fa8e786d2a93a7cad8
[stg.git] / projects / stargazer / plugins / other / smux / smux.cpp
1 #include <sys/types.h>
2 #include <sys/socket.h>
3 #include <arpa/inet.h>
4
5 #include <cstring>
6 #include <cerrno>
7 #include <ctime>
8 #include <csignal>
9 #include <cassert>
10
11 #include <vector>
12 #include <algorithm>
13 #include <iterator>
14 #include <stdexcept>
15 #include <utility>
16
17 #include "stg/common.h"
18 #include "stg/plugin_creator.h"
19
20 #include "smux.h"
21 #include "utils.h"
22
23 namespace
24 {
25 PLUGIN_CREATOR<SMUX> smc;
26
27 bool SPrefixLess(const Sensors::value_type & a,
28                  const Sensors::value_type & b);
29
30 bool SPrefixLess(const Sensors::value_type & a,
31                  const Sensors::value_type & b)
32 {
33 return a.first.PrefixLess(b.first);
34 }
35
36 }
37
38 extern "C" PLUGIN * GetPlugin();
39
40 PLUGIN * GetPlugin()
41 {
42 return smc.GetPlugin();
43 }
44
45 SMUX_SETTINGS::SMUX_SETTINGS()
46     : errorStr(),
47       ip(0),
48       port(0),
49       password()
50 {}
51
52 int SMUX_SETTINGS::ParseSettings(const MODULE_SETTINGS & s)
53 {
54 PARAM_VALUE pv;
55 std::vector<PARAM_VALUE>::const_iterator pvi;
56 int p;
57
58 pv.param = "Port";
59 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
60 if (pvi == s.moduleParams.end())
61     {
62     errorStr = "Parameter \'Port\' not found.";
63     printfd(__FILE__, "Parameter 'Port' not found\n");
64     return -1;
65     }
66 if (ParseIntInRange(pvi->value[0], 2, 65535, &p))
67     {
68     errorStr = "Cannot parse parameter \'Port\': " + errorStr;
69     printfd(__FILE__, "Cannot parse parameter 'Port'\n");
70     return -1;
71     }
72 port = static_cast<uint16_t>(p);
73
74 pv.param = "Password";
75 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
76 if (pvi == s.moduleParams.end())
77     {
78     errorStr = "Parameter \'Password\' not found.";
79     printfd(__FILE__, "Parameter 'Password' not found\n");
80     password = "";
81     }
82 else
83     {
84     password = pvi->value[0];
85     }
86
87 pv.param = "Server";
88 pvi = std::find(s.moduleParams.begin(), s.moduleParams.end(), pv);
89 if (pvi == s.moduleParams.end())
90     {
91     errorStr = "Parameter \'Server\' not found.";
92     printfd(__FILE__, "Parameter 'Server' not found\n");
93     return -1;
94     }
95 ip = inet_strington(pvi->value[0]);
96
97 return 0;
98 }
99
100 SMUX::SMUX()
101     : PLUGIN(),
102       users(NULL),
103       tariffs(NULL),
104       admins(NULL),
105       services(NULL),
106       corporations(NULL),
107       traffcounter(NULL),
108       errorStr(),
109       smuxSettings(),
110       settings(),
111       thread(),
112       mutex(),
113       running(false),
114       stopped(true),
115       needReconnect(false),
116       lastReconnectTry(0),
117       reconnectTimeout(1),
118       sock(-1),
119       smuxHandlers(),
120       pdusHandlers(),
121       sensors(),
122       tables(),
123       notifiers(),
124       addUserNotifier(*this),
125       delUserNotifier(*this),
126       addDelTariffNotifier(*this),
127       logger(GetPluginLogger(GetStgLogger(), "smux"))
128 {
129 pthread_mutex_init(&mutex, NULL);
130
131 smuxHandlers[SMUX_PDUs_PR_close] = &SMUX::CloseHandler;
132 smuxHandlers[SMUX_PDUs_PR_registerResponse] = &SMUX::RegisterResponseHandler;
133 smuxHandlers[SMUX_PDUs_PR_pdus] = &SMUX::PDUsRequestHandler;
134 smuxHandlers[SMUX_PDUs_PR_commitOrRollback] = &SMUX::CommitOrRollbackHandler;
135
136 pdusHandlers[PDUs_PR_get_request] = &SMUX::GetRequestHandler;
137 pdusHandlers[PDUs_PR_get_next_request] = &SMUX::GetNextRequestHandler;
138 pdusHandlers[PDUs_PR_set_request] = &SMUX::SetRequestHandler;
139 }
140
141 SMUX::~SMUX()
142 {
143     {
144     Sensors::iterator it;
145     for (it = sensors.begin(); it != sensors.end(); ++it)
146         delete it->second;
147     }
148     {
149     Tables::iterator it;
150     for (it = tables.begin(); it != tables.end(); ++it)
151         delete it->second;
152     }
153 printfd(__FILE__, "SMUX::~SMUX()\n");
154 pthread_mutex_destroy(&mutex);
155 }
156
157 int SMUX::ParseSettings()
158 {
159 return smuxSettings.ParseSettings(settings);
160 }
161
162 int SMUX::Start()
163 {
164 assert(users != NULL && "users must not be NULL");
165 assert(tariffs != NULL && "tariffs must not be NULL");
166 assert(admins != NULL && "admins must not be NULL");
167 assert(services != NULL && "services must not be NULL");
168 assert(corporations != NULL && "corporations must not be NULL");
169 assert(traffcounter != NULL && "traffcounter must not be NULL");
170
171 if (PrepareNet())
172     needReconnect = true;
173
174 // Users
175 sensors[OID(".1.3.6.1.4.1.38313.1.1.1")] = new TotalUsersSensor(*users);
176 sensors[OID(".1.3.6.1.4.1.38313.1.1.2")] = new ConnectedUsersSensor(*users);
177 sensors[OID(".1.3.6.1.4.1.38313.1.1.3")] = new AuthorizedUsersSensor(*users);
178 sensors[OID(".1.3.6.1.4.1.38313.1.1.4")] = new AlwaysOnlineUsersSensor(*users);
179 sensors[OID(".1.3.6.1.4.1.38313.1.1.5")] = new NoCashUsersSensor(*users);
180 sensors[OID(".1.3.6.1.4.1.38313.1.1.6")] = new DisabledDetailStatsUsersSensor(*users);
181 sensors[OID(".1.3.6.1.4.1.38313.1.1.7")] = new DisabledUsersSensor(*users);
182 sensors[OID(".1.3.6.1.4.1.38313.1.1.8")] = new PassiveUsersSensor(*users);
183 sensors[OID(".1.3.6.1.4.1.38313.1.1.9")] = new CreditUsersSensor(*users);
184 sensors[OID(".1.3.6.1.4.1.38313.1.1.10")] = new FreeMbUsersSensor(*users);
185 sensors[OID(".1.3.6.1.4.1.38313.1.1.11")] = new TariffChangeUsersSensor(*users);
186 sensors[OID(".1.3.6.1.4.1.38313.1.1.12")] = new ActiveUsersSensor(*users);
187 // Tariffs
188 sensors[OID(".1.3.6.1.4.1.38313.1.2.1")] = new TotalTariffsSensor(*tariffs);
189 // Admins
190 sensors[OID(".1.3.6.1.4.1.38313.1.3.1")] = new TotalAdminsSensor(*admins);
191 // Services
192 sensors[OID(".1.3.6.1.4.1.38313.1.4.1")] = new TotalServicesSensor(*services);
193 // Corporations
194 sensors[OID(".1.3.6.1.4.1.38313.1.5.1")] = new TotalCorporationsSensor(*corporations);
195 // Traffcounter
196 sensors[OID(".1.3.6.1.4.1.38313.1.6.1")] = new TotalRulesSensor(*traffcounter);
197
198 // Table data
199 tables[".1.3.6.1.4.1.38313.1.2.2"] = new TariffUsersTable(".1.3.6.1.4.1.38313.1.2.2", *tariffs, *users);
200
201 UpdateTables();
202 SetNotifiers();
203
204 #ifdef SMUX_DEBUG
205 Sensors::const_iterator it(sensors.begin());
206 while (it != sensors.end())
207     {
208     printfd(__FILE__, "%s = %s\n",
209             it->first.ToString().c_str(),
210             it->second->ToString().c_str());
211     ++it;
212     }
213 #endif
214
215 if (!running)
216     {
217     if (pthread_create(&thread, NULL, Runner, this))
218         {
219         errorStr = "Cannot create thread.";
220         logger("Cannot create thread.");
221         printfd(__FILE__, "Cannot create thread\n");
222         return -1;
223         }
224     }
225
226 return 0;
227 }
228
229 int SMUX::Stop()
230 {
231 printfd(__FILE__, "SMUX::Stop() - Before\n");
232 running = false;
233
234 if (!stopped)
235     {
236     //5 seconds to thread stops itself
237     for (int i = 0; i < 25 && !stopped; i++)
238         {
239         struct timespec ts = {0, 200000000};
240         nanosleep(&ts, NULL);
241         }
242     }
243
244 if (stopped)
245     pthread_join(thread, NULL);
246
247 ResetNotifiers();
248
249     {
250     Tables::iterator it;
251     for (it = tables.begin(); it != tables.end(); ++it)
252         delete it->second;
253     }
254     {
255     Sensors::iterator it;
256     for (it = sensors.begin(); it != sensors.end(); ++it)
257         delete it->second;
258     }
259
260 tables.erase(tables.begin(), tables.end());
261 sensors.erase(sensors.begin(), sensors.end());
262
263 close(sock);
264
265 if (!stopped)
266     {
267     running = true;
268     return -1;
269     }
270
271 printfd(__FILE__, "SMUX::Stop() - After\n");
272 return 0;
273 }
274
275 int SMUX::Reload()
276 {
277 if (Stop())
278     return -1;
279 if (Start())
280     return -1;
281 logger("Reconnected successfully.");
282 return 0;
283 }
284
285 void * SMUX::Runner(void * d)
286 {
287 sigset_t signalSet;
288 sigfillset(&signalSet);
289 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
290
291 SMUX * smux = static_cast<SMUX *>(d);
292
293 smux->Run();
294
295 return NULL;
296 }
297
298 void SMUX::Run()
299 {
300 stopped = true;
301 if (!SendOpenPDU(sock))
302     needReconnect = true;
303 if (!SendRReqPDU(sock))
304     needReconnect = true;
305 running = true;
306 stopped = false;
307
308 while(running)
309     {
310     if (WaitPackets(sock) && !needReconnect)
311         {
312         SMUX_PDUs_t * pdus = RecvSMUXPDUs(sock);
313         if (pdus)
314             {
315             DispatchPDUs(pdus);
316             ASN_STRUCT_FREE(asn_DEF_SMUX_PDUs, pdus);
317             }
318         else if (running)
319             Reconnect();
320         }
321     else if (running && needReconnect)
322         Reconnect();
323     if (!running)
324         break;
325     }
326 SendClosePDU(sock);
327 stopped = true;
328 }
329
330 bool SMUX::PrepareNet()
331 {
332 sock = socket(AF_INET, SOCK_STREAM, 0);
333
334 if (sock < 0)
335     {
336     errorStr = "Cannot create socket.";
337     logger("Cannot create a socket: %s", strerror(errno));
338     printfd(__FILE__, "Cannot create socket\n");
339     return true;
340     }
341
342 struct sockaddr_in addr;
343
344 addr.sin_family = AF_INET;
345 addr.sin_port = htons(smuxSettings.GetPort());
346 addr.sin_addr.s_addr = smuxSettings.GetIP();
347
348 if (connect(sock, reinterpret_cast<struct sockaddr *>(&addr), sizeof(addr)))
349     {
350     errorStr = "Cannot connect.";
351     logger("Cannot connect the socket: %s", strerror(errno));
352     printfd(__FILE__, "Cannot connect. Message: '%s'\n", strerror(errno));
353     return true;
354     }
355
356 return false;
357 }
358
359 bool SMUX::Reconnect()
360 {
361 if (needReconnect && difftime(time(NULL), lastReconnectTry) < reconnectTimeout)
362     return true;
363
364 time(&lastReconnectTry);
365 SendClosePDU(sock);
366 close(sock);
367 if (!PrepareNet())
368     if (SendOpenPDU(sock))
369         if (SendRReqPDU(sock))
370             {
371             reconnectTimeout = 1;
372             needReconnect = false;
373             logger("Connected successfully");
374             printfd(__FILE__, "Connected successfully\n");
375             return false;
376             }
377
378 if (needReconnect)
379     if (reconnectTimeout < 60)
380         reconnectTimeout *= 2;
381
382 needReconnect = true;
383 return true;
384 }
385
386 bool SMUX::DispatchPDUs(const SMUX_PDUs_t * pdus)
387 {
388 SMUXHandlers::iterator it(smuxHandlers.find(pdus->present));
389 if (it != smuxHandlers.end())
390     {
391     return (this->*(it->second))(pdus);
392     }
393 #ifdef SMUX_DEBUG
394 else
395     {
396     switch (pdus->present)
397         {
398         case SMUX_PDUs_PR_NOTHING:
399             printfd(__FILE__, "PDUs: nothing\n");
400             break;
401         case SMUX_PDUs_PR_open:
402             printfd(__FILE__, "PDUs: open\n");
403             break;
404         case SMUX_PDUs_PR_registerRequest:
405             printfd(__FILE__, "PDUs: registerRequest\n");
406             break;
407         default:
408             printfd(__FILE__, "PDUs: undefined\n");
409         }
410     asn_fprint(stderr, &asn_DEF_SMUX_PDUs, pdus);
411     }
412 #endif
413 return false;
414 }
415
416 bool SMUX::UpdateTables()
417 {
418 Sensors newSensors;
419 bool done = true;
420 Tables::iterator it(tables.begin());
421 while (it != tables.end())
422     {
423     try
424         {
425         it->second->UpdateSensors(newSensors);
426         }
427     catch (const std::runtime_error & ex)
428         {
429         printfd(__FILE__,
430                 "SMUX::UpdateTables - failed to update table '%s': '%s'\n",
431                 it->first.c_str(), ex.what());
432         done = false;
433         break;
434         }
435     ++it;
436     }
437 if (!done)
438     {
439     Sensors::iterator it(newSensors.begin());
440     while (it != newSensors.end())
441         {
442         delete it->second;
443         ++it;
444         }
445     return false;
446     }
447
448 it = tables.begin();
449 while (it != tables.end())
450     {
451     std::pair<Sensors::iterator, Sensors::iterator> res;
452     res = std::equal_range(sensors.begin(),
453                            sensors.end(),
454                            std::pair<OID, Sensor *>(OID(it->first), NULL),
455                            SPrefixLess);
456     Sensors::iterator sit(res.first);
457     while (sit != res.second)
458         {
459         delete sit->second;
460         ++sit;
461         }
462     sensors.erase(res.first, res.second);
463     ++it;
464     }
465
466 sensors.insert(newSensors.begin(), newSensors.end());
467
468 return true;
469 }
470
471 void SMUX::SetNotifier(USER_PTR userPtr)
472 {
473 notifiers.push_back(CHG_AFTER_NOTIFIER(*this, userPtr));
474 userPtr->GetProperty().tariffName.AddAfterNotifier(&notifiers.back());
475 }
476
477 void SMUX::UnsetNotifier(USER_PTR userPtr)
478 {
479 std::list<CHG_AFTER_NOTIFIER>::iterator it = notifiers.begin();
480 while (it != notifiers.end())
481     {
482     if (it->GetUserPtr() == userPtr)
483         {
484         userPtr->GetProperty().tariffName.DelAfterNotifier(&(*it));
485         notifiers.erase(it);
486         break;
487         }
488     ++it;
489     }
490 }
491
492 void SMUX::SetNotifiers()
493 {
494 int h = users->OpenSearch();
495 assert(h && "USERS::OpenSearch is always correct");
496
497 USER_PTR u;
498 while (!users->SearchNext(h, &u))
499     SetNotifier(u);
500
501 users->CloseSearch(h);
502
503 users->AddNotifierUserAdd(&addUserNotifier);
504 users->AddNotifierUserDel(&delUserNotifier);
505
506 tariffs->AddNotifierAdd(&addDelTariffNotifier);
507 tariffs->AddNotifierDel(&addDelTariffNotifier);
508 }
509
510 void SMUX::ResetNotifiers()
511 {
512 tariffs->DelNotifierDel(&addDelTariffNotifier);
513 tariffs->DelNotifierAdd(&addDelTariffNotifier);
514
515 users->DelNotifierUserDel(&delUserNotifier);
516 users->DelNotifierUserAdd(&addUserNotifier);
517
518 std::list<CHG_AFTER_NOTIFIER>::iterator it(notifiers.begin());
519 while (it != notifiers.end())
520     {
521     it->GetUserPtr()->GetProperty().tariffName.DelAfterNotifier(&(*it));
522     ++it;
523     }
524 notifiers.clear();
525 }