]> git.stg.codes - stg.git/blob - projects/rscriptd/listener.cpp
Ticket 37. st->Prepare(query) and st->Set(7, id) added in the
[stg.git] / projects / rscriptd / listener.cpp
1 /*
2  *    This program is free software; you can redistribute it and/or modify
3  *    it under the terms of the GNU General Public License as published by
4  *    the Free Software Foundation; either version 2 of the License, or
5  *    (at your option) any later version.
6  *
7  *    This program is distributed in the hope that it will be useful,
8  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  *    GNU General Public License for more details.
11  *
12  *    You should have received a copy of the GNU General Public License
13  *    along with this program; if not, write to the Free Software
14  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15  */
16
17 /*
18  *    Author : Boris Mikhailenko <stg34@stargazer.dp.ua>
19  *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
20  */
21
22 #include <arpa/inet.h>
23 #include <sys/uio.h> // readv
24 #include <sys/types.h> // for historical versions of BSD
25 #include <sys/socket.h>
26 #include <netinet/in.h>
27 #include <unistd.h>
28
29 #include <csignal>
30 #include <cerrno>
31 #include <ctime>
32 #include <cstring>
33 #include <sstream>
34 #include <algorithm>
35
36 #include "stg/scriptexecuter.h"
37 #include "stg/locker.h"
38 #include "stg/common.h"
39 #include "listener.h"
40
41 void InitEncrypt(BLOWFISH_CTX * ctx, const std::string & password);
42 void Decrypt(BLOWFISH_CTX * ctx, char * dst, const char * src, int len8);
43
44 //-----------------------------------------------------------------------------
45 LISTENER::LISTENER()
46     : WriteServLog(GetStgLogger()),
47       port(0),
48       running(false),
49       receiverStopped(true),
50       processorStopped(true),
51       userTimeout(0),
52       listenSocket(0),
53       version("rscriptd listener v.1.2")
54 {
55 pthread_mutex_init(&mutex, NULL);
56 }
57 //-----------------------------------------------------------------------------
58 void LISTENER::SetPassword(const std::string & p)
59 {
60 password = p;
61 printfd(__FILE__, "Encryption initiated with password \'%s\'\n", password.c_str());
62 InitEncrypt(&ctxS, password);
63 }
64 //-----------------------------------------------------------------------------
65 bool LISTENER::Start()
66 {
67 printfd(__FILE__, "LISTENER::Start()\n");
68 running = true;
69
70 if (PrepareNet())
71     {
72     return true;
73     }
74
75 if (receiverStopped)
76     {
77     if (pthread_create(&receiverThread, NULL, Run, this))
78         {
79         errorStr = "Cannot create thread.";
80         return true;
81         }
82     }
83
84 if (processorStopped)
85     {
86     if (pthread_create(&processorThread, NULL, RunProcessor, this))
87         {
88         errorStr = "Cannot create thread.";
89         return true;
90         }
91     }
92
93 errorStr = "";
94
95 return false;
96 }
97 //-----------------------------------------------------------------------------
98 bool LISTENER::Stop()
99 {
100 running = false;
101
102 printfd(__FILE__, "LISTENER::Stop()\n");
103
104 struct timespec ts = {0, 500000000};
105 nanosleep(&ts, NULL);
106
107 if (!processorStopped)
108     {
109     //5 seconds to thread stops itself
110     for (int i = 0; i < 25 && !processorStopped; i++)
111         {
112         struct timespec ts = {0, 200000000};
113         nanosleep(&ts, NULL);
114         }
115
116     //after 5 seconds waiting thread still running. now killing it
117     if (!processorStopped)
118         {
119         //TODO pthread_cancel()
120         if (pthread_kill(processorThread, SIGINT))
121             {
122             errorStr = "Cannot kill thread.";
123             return true;
124             }
125         printfd(__FILE__, "LISTENER killed Timeouter\n");
126         }
127     }
128
129 if (!receiverStopped)
130     {
131     //5 seconds to thread stops itself
132     for (int i = 0; i < 25 && !receiverStopped; i++)
133         {
134         struct timespec ts = {0, 200000000};
135         nanosleep(&ts, NULL);
136         }
137
138     //after 5 seconds waiting thread still running. now killing it
139     if (!receiverStopped)
140         {
141         //TODO pthread_cancel()
142         if (pthread_kill(receiverThread, SIGINT))
143             {
144             errorStr = "Cannot kill thread.";
145             return true;
146             }
147         printfd(__FILE__, "LISTENER killed Run\n");
148         }
149     }
150
151 pthread_join(receiverThread, NULL);
152 pthread_join(processorThread, NULL);
153
154 pthread_mutex_destroy(&mutex);
155
156 FinalizeNet();
157
158 std::for_each(users.begin(), users.end(), DisconnectUser(*this));
159
160 printfd(__FILE__, "LISTENER::Stoped successfully.\n");
161
162 return false;
163 }
164 //-----------------------------------------------------------------------------
165 void * LISTENER::Run(void * d)
166 {
167 sigset_t signalSet;
168 sigfillset(&signalSet);
169 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
170
171 LISTENER * listener = static_cast<LISTENER *>(d);
172
173 listener->Runner();
174
175 return NULL;
176 }
177 //-----------------------------------------------------------------------------
178 void LISTENER::Runner()
179 {
180 receiverStopped = false;
181
182 while (running)
183     {
184     RecvPacket();
185     }
186
187 receiverStopped = true;
188 }
189 //-----------------------------------------------------------------------------
190 void * LISTENER::RunProcessor(void * d)
191 {
192 sigset_t signalSet;
193 sigfillset(&signalSet);
194 pthread_sigmask(SIG_BLOCK, &signalSet, NULL);
195
196 LISTENER * listener = static_cast<LISTENER *>(d);
197
198 listener->ProcessorRunner();
199
200 return NULL;
201 }
202 //-----------------------------------------------------------------------------
203 void LISTENER::ProcessorRunner()
204 {
205 processorStopped = false;
206
207 while (running)
208     {
209     struct timespec ts = {0, 500000000};
210     nanosleep(&ts, NULL);
211     if (!pending.empty())
212         ProcessPending();
213     ProcessTimeouts();
214     }
215
216 processorStopped = true;
217 }
218 //-----------------------------------------------------------------------------
219 bool LISTENER::PrepareNet()
220 {
221 listenSocket = socket(AF_INET, SOCK_DGRAM, 0);
222
223 if (listenSocket < 0)
224     {
225     errorStr = "Cannot create socket.";
226     return true;
227     }
228
229 struct sockaddr_in listenAddr;
230 listenAddr.sin_family = AF_INET;
231 listenAddr.sin_port = htons(port);
232 listenAddr.sin_addr.s_addr = inet_addr("0.0.0.0");
233
234 if (bind(listenSocket, (struct sockaddr*)&listenAddr, sizeof(listenAddr)) < 0)
235     {
236     errorStr = "LISTENER: Bind failed.";
237     return true;
238     }
239
240 printfd(__FILE__, "LISTENER::PrepareNet() >>>> Start successfull.\n");
241
242 return false;
243 }
244 //-----------------------------------------------------------------------------
245 bool LISTENER::FinalizeNet()
246 {
247 close(listenSocket);
248
249 return false;
250 }
251 //-----------------------------------------------------------------------------
252 bool LISTENER::RecvPacket()
253 {
254 struct iovec iov[2];
255
256 char buffer[RS_MAX_PACKET_LEN];
257 RS::PACKET_HEADER packetHead;
258
259 iov[0].iov_base = reinterpret_cast<char *>(&packetHead);
260 iov[0].iov_len = sizeof(packetHead);
261 iov[1].iov_base = buffer;
262 iov[1].iov_len = sizeof(buffer) - sizeof(packetHead);
263
264 size_t dataLen = 0;
265 while (dataLen < sizeof(buffer))
266     {
267     if (!WaitPackets(listenSocket))
268         {
269         if (!running)
270             return false;
271         continue;
272         }
273     int portion = readv(listenSocket, iov, 2);
274     if (portion < 0)
275         {
276         return true;
277         }
278     dataLen += portion;
279     }
280
281 if (CheckHeader(packetHead))
282     {
283     printfd(__FILE__, "Invalid packet or incorrect protocol version!\n");
284     return true;
285     }
286
287 std::string userLogin((char *)packetHead.login);
288 PendingData data;
289 data.login = userLogin;
290 data.ip = ntohl(packetHead.ip);
291 data.id = ntohl(packetHead.id);
292
293 if (packetHead.packetType == RS_ALIVE_PACKET)
294     {
295     data.type = PendingData::ALIVE;
296     }
297 else if (packetHead.packetType == RS_CONNECT_PACKET)
298     {
299     data.type = PendingData::CONNECT;
300     if (GetParams(buffer, data))
301         {
302         return true;
303         }
304     }
305 else if (packetHead.packetType == RS_DISCONNECT_PACKET)
306     {
307     data.type = PendingData::DISCONNECT;
308     if (GetParams(buffer, data))
309         {
310         return true;
311         }
312     }
313
314 STG_LOCKER lock(&mutex);
315 pending.push_back(data);
316
317 return false;
318 }
319 //-----------------------------------------------------------------------------
320 bool LISTENER::GetParams(char * buffer, UserData & data)
321 {
322 RS::PACKET_TAIL packetTail;
323
324 Decrypt(&ctxS, (char *)&packetTail, buffer, sizeof(packetTail) / 8);
325
326 if (strncmp((char *)packetTail.magic, RS_ID, RS_MAGIC_LEN))
327     {
328     printfd(__FILE__, "Invalid crypto magic\n");
329     return true;
330     }
331
332 std::ostringstream params;
333 params << "\"" << data.login << "\" "
334        << inet_ntostring(data.ip) << " "
335        << data.id << " "
336        << (char *)packetTail.params;
337
338 data.params = params.str();
339
340 return false;
341 }
342 //-----------------------------------------------------------------------------
343 void LISTENER::ProcessPending()
344 {
345 std::list<PendingData>::iterator it(pending.begin());
346 size_t count = 0;
347 printfd(__FILE__, "Pending: %d\n", pending.size());
348 while (it != pending.end() && count < 256)
349     {
350     std::vector<AliveData>::iterator uit(
351             std::lower_bound(
352                 users.begin(),
353                 users.end(),
354                 it->login)
355             );
356     if (it->type == PendingData::CONNECT)
357         {
358         printfd(__FILE__, "Connect packet\n");
359         if (uit == users.end() || uit->login != it->login)
360             {
361             printfd(__FILE__, "Connect new user '%s'\n", it->login.c_str());
362             // Add new user
363             Connect(*it);
364             users.insert(uit, AliveData(static_cast<UserData>(*it)));
365             }
366         else if (uit->login == it->login)
367             {
368             printfd(__FILE__, "Update existing user '%s'\n", it->login.c_str());
369             // Update already existing user
370             time(&uit->lastAlive);
371             uit->params = it->params;
372             }
373         else
374             {
375             printfd(__FILE__, "Hmmm... Strange connect for '%s'\n", it->login.c_str());
376             }
377         }
378     else if (it->type == PendingData::ALIVE)
379         {
380         printfd(__FILE__, "Alive packet\n");
381         if (uit != users.end() && uit->login == it->login)
382             {
383             printfd(__FILE__, "Alive user '%s'\n", it->login.c_str());
384             // Update existing user
385             time(&uit->lastAlive);
386             }
387         else
388             {
389             printfd(__FILE__, "Alive user '%s' is not found\n", it->login.c_str());
390             }
391         }
392     else if (it->type == PendingData::DISCONNECT)
393         {
394         printfd(__FILE__, "Disconnect packet\n");
395         if (uit != users.end() && uit->login == it->login.c_str())
396             {
397             printfd(__FILE__, "Disconnect user '%s'\n", it->login.c_str());
398             // Disconnect existing user
399             Disconnect(*uit);
400             users.erase(uit);
401             }
402         else
403             {
404             printfd(__FILE__, "Cannot find user '%s' for disconnect\n", it->login.c_str());
405             }
406         }
407     else
408         {
409         printfd(__FILE__, "Unknown packet type\n");
410         }
411     ++it;
412     ++count;
413     }
414 STG_LOCKER lock(&mutex);
415 pending.erase(pending.begin(), it);
416 }
417 //-----------------------------------------------------------------------------
418 void LISTENER::ProcessTimeouts()
419 {
420 const std::vector<AliveData>::iterator it(
421         std::stable_partition(
422             users.begin(),
423             users.end(),
424             IsNotTimedOut(userTimeout)
425         )
426     );
427
428 if (it != users.end())
429     {
430     printfd(__FILE__, "Total users: %d, users to disconnect: %d\n", users.size(), std::distance(it, users.end()));
431
432     std::for_each(
433             it,
434             users.end(),
435             DisconnectUser(*this)
436         );
437
438     users.erase(it, users.end());
439     }
440 }
441 //-----------------------------------------------------------------------------
442 bool LISTENER::Connect(const UserData & data) const
443 {
444 printfd(__FILE__, "Connect %s\n", data.login.c_str());
445 if (access(scriptOnConnect.c_str(), X_OK) == 0)
446     {
447     if (ScriptExec((scriptOnConnect + " " + data.params).c_str()))
448         {
449         WriteServLog("Script %s cannot be executed for an unknown reason.", scriptOnConnect.c_str());
450         return true;
451         }
452     }
453 else
454     {
455     WriteServLog("Script %s cannot be executed. File not found.", scriptOnConnect.c_str());
456     return true;
457     }
458 return false;
459 }
460 //-----------------------------------------------------------------------------
461 bool LISTENER::Disconnect(const UserData & data) const
462 {
463 printfd(__FILE__, "Disconnect %s\n", data.login.c_str());
464 if (access(scriptOnDisconnect.c_str(), X_OK) == 0)
465     {
466     if (ScriptExec((scriptOnDisconnect + " " + data.params).c_str()))
467         {
468         WriteServLog("Script %s cannot be executed for an unknown reson.", scriptOnDisconnect.c_str());
469         return true;
470         }
471     }
472 else
473     {
474     WriteServLog("Script %s cannot be executed. File not found.", scriptOnDisconnect.c_str());
475     return true;
476     }
477 return false;
478 }
479 //-----------------------------------------------------------------------------
480 bool LISTENER::CheckHeader(const RS::PACKET_HEADER & header) const
481 {
482 if (strncmp((char *)header.magic, RS_ID, RS_MAGIC_LEN))
483     {
484     return true;
485     }
486 if (strncmp((char *)header.protoVer, "02", RS_PROTO_VER_LEN))
487     {
488     return true;
489     }
490 return false;
491 }
492 //-----------------------------------------------------------------------------
493 inline
494 void InitEncrypt(BLOWFISH_CTX * ctx, const std::string & password)
495 {
496 unsigned char keyL[PASSWD_LEN];
497 memset(keyL, 0, PASSWD_LEN);
498 strncpy((char *)keyL, password.c_str(), PASSWD_LEN);
499 Blowfish_Init(ctx, keyL, PASSWD_LEN);
500 }
501 //-----------------------------------------------------------------------------
502 inline
503 void Decrypt(BLOWFISH_CTX * ctx, char * dst, const char * src, int len8)
504 {
505 if (dst != src)
506     memcpy(dst, src, len8 * 8);
507
508 for (int i = 0; i < len8; i++)
509     Blowfish_Decrypt(ctx, (uint32_t *)(dst + i * 8), (uint32_t *)(dst + i * 8 + 4));
510 }
511 //-----------------------------------------------------------------------------