]> git.stg.codes - stg.git/blob - projects/stargazer/plugins/store/postgresql/postgresql_store_messages.cpp
Added SMUX reconnect. Fixes #18.
[stg.git] / projects / stargazer / plugins / store / postgresql / postgresql_store_messages.cpp
1 /*
2  *    This program is free software; you can redistribute it and/or modify
3  *    it under the terms of the GNU General Public License as published by
4  *    the Free Software Foundation; either version 2 of the License, or
5  *    (at your option) any later version.
6  *
7  *    This program is distributed in the hope that it will be useful,
8  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  *    GNU General Public License for more details.
11  *
12  *    You should have received a copy of the GNU General Public License
13  *    along with this program; if not, write to the Free Software
14  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15  */
16
17 /*
18  *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
19  */
20
21
22 /*
23  *  Messages manipualtion methods
24  *
25  *  $Revision: 1.6 $
26  *  $Date: 2009/07/15 11:19:42 $
27  *
28  */
29
30 #include <string>
31 #include <vector>
32 #include <sstream>
33
34 #include <libpq-fe.h>
35
36 #include "postgresql_store.h"
37 #include "stg/locker.h"
38 #include "stg/message.h"
39
40 //-----------------------------------------------------------------------------
41 int POSTGRESQL_STORE::AddMessage(STG_MSG * msg, const string & login) const
42 {
43 STG_LOCKER lock(&mutex, __FILE__, __LINE__);
44
45 if (PQstatus(connection) != CONNECTION_OK)
46     {
47     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
48     if (Reset())
49         {
50         strError = "Connection lost";
51         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): '%s'\n", strError.c_str());
52         return -1;
53         }
54     }
55
56 PGresult * result;
57
58 if (StartTransaction())
59     {
60     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to start transaction'\n");
61     return -1;
62     }
63
64 std::string elogin = login;
65 std::string etext = msg->text;
66
67 if (EscapeString(elogin))
68     {
69     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to escape login'\n");
70     if (RollbackTransaction())
71         {
72         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
73         }
74     return -1;
75     }
76
77 if (EscapeString(etext))
78     {
79     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to escape message text'\n");
80     if (RollbackTransaction())
81         {
82         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
83         }
84     return -1;
85     }
86
87 std::stringstream query;
88 query << "SELECT sp_add_message("
89       << "'" << elogin << "', "
90       << "CAST(1 AS SMALLINT), " // Here need to be a version, but, it's uninitiated actually
91       << "CAST(" << msg->header.type << " AS SMALLINT), "
92       << "CAST('" << Int2TS(msg->header.lastSendTime) << "' AS TIMESTAMP), "
93       << "CAST('" << Int2TS(msg->header.creationTime) << "' AS TIMESTAMP), "
94       << msg->header.showTime << ", "
95       << "CAST(" << msg->header.repeat << " AS SMALLINT), "
96       << msg->header.repeatPeriod << ", "
97       << "'" << etext << "')";
98
99 result = PQexec(connection, query.str().c_str());
100
101 if (PQresultStatus(result) != PGRES_TUPLES_OK)
102     {
103     strError = PQresultErrorMessage(result);
104     PQclear(result);
105     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): '%s'\n", strError.c_str());
106     if (RollbackTransaction())
107         {
108         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
109         }
110     return -1;
111     }
112
113 int tuples = PQntuples(result);
114
115 if (tuples != 1)
116     {
117     strError = "Failed to fetch newlly added message ID";
118     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Invalid number of tuples. Wanted 1, actulally %d'\n", tuples);
119     PQclear(result);
120     if (RollbackTransaction())
121         {
122         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
123         }
124     return -1;
125     }
126
127 std::stringstream tuple;
128 tuple << PQgetvalue(result, 0, 0);
129
130 PQclear(result);
131
132 tuple >> msg->header.id;
133
134 if (CommitTransaction())
135     {
136     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to commit transaction'\n");
137     return -1;
138     }
139
140 return 0;
141 }
142 //-----------------------------------------------------------------------------
143 int POSTGRESQL_STORE::EditMessage(const STG_MSG & msg,
144                                   const string & login) const
145 {
146 STG_LOCKER lock(&mutex, __FILE__, __LINE__);
147
148 if (PQstatus(connection) != CONNECTION_OK)
149     {
150     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
151     if (Reset())
152         {
153         strError = "Connection lost";
154         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): '%s'\n", strError.c_str());
155         return -1;
156         }
157     }
158
159 PGresult * result;
160
161 if (StartTransaction())
162     {
163     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to start transaction'\n");
164     return -1;
165     }
166
167 std::string elogin = login;
168 std::string etext = msg.text;
169
170 if (EscapeString(elogin))
171     {
172     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to escape login'\n");
173     if (RollbackTransaction())
174         {
175         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to rollback transaction'\n");
176         }
177     return -1;
178     }
179
180 if (EscapeString(etext))
181     {
182     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to escape message text'\n");
183     if (RollbackTransaction())
184         {
185         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to rollback transaction'\n");
186         }
187     return -1;
188     }
189
190 std::stringstream query;
191 query << "UPDATE tb_messages SET "
192           << "fk_user = (SELECT pk_user FROM tb_users WHERE name = '" << elogin << "'), "
193           << "ver = " << msg.header.ver << ", "
194           << "msg_type = " << msg.header.type << ", "
195           << "last_send_time = CAST('" << Int2TS(msg.header.lastSendTime) << "' AS TIMESTAMP), "
196           << "creation_time = CAST('" << Int2TS(msg.header.creationTime) << "' AS TIMESTAMP), "
197           << "show_time = " << msg.header.showTime << ", "
198           << "repeat = " << msg.header.repeat << ", "
199           << "repeat_period = " << msg.header.repeatPeriod << ", "
200           << "msg_text = '" << etext << "' "
201       << "WHERE pk_message = " << msg.header.id;
202
203 result = PQexec(connection, query.str().c_str());
204
205 if (PQresultStatus(result) != PGRES_COMMAND_OK)
206     {
207     strError = PQresultErrorMessage(result);
208     PQclear(result);
209     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): '%s'\n", strError.c_str());
210     if (RollbackTransaction())
211         {
212         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to rollback transaction'\n");
213         }
214     return -1;
215     }
216
217 PQclear(result);
218
219 if (CommitTransaction())
220     {
221     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to commit transaction'\n");
222     return -1;
223     }
224
225 return 0;
226 }
227 //-----------------------------------------------------------------------------
228 int POSTGRESQL_STORE::GetMessage(uint64_t id,
229                                STG_MSG * msg,
230                                const string &) const
231 {
232 STG_LOCKER lock(&mutex, __FILE__, __LINE__);
233
234 if (PQstatus(connection) != CONNECTION_OK)
235     {
236     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
237     if (Reset())
238         {
239         strError = "Connection lost";
240         printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): '%s'\n", strError.c_str());
241         return -1;
242         }
243     }
244
245 string login;
246 PGresult * result;
247
248 if (StartTransaction())
249     {
250     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to start transaction'\n");
251     return -1;
252     }
253
254 std::stringstream query;
255 query << "SELECT ver, msg_type, last_send_time, \
256                  creation_time, show_time, repeat, \
257                  repeat_period, msg_text \
258           FROM tb_messages \
259           WHERE pk_message = " << id;
260
261 result = PQexec(connection, query.str().c_str());
262
263 if (PQresultStatus(result) != PGRES_TUPLES_OK)
264     {
265     strError = PQresultErrorMessage(result);
266     PQclear(result);
267     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): '%s'\n", strError.c_str());
268     if (RollbackTransaction())
269         {
270         printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to rollback transaction'\n");
271         }
272     return -1;
273     }
274
275 int tuples = PQntuples(result);
276
277 if (tuples != 1)
278     {
279     strError = "Failed to fetch message data";
280     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Invalid number of tuples. Wanted 1, actulally %d'\n", tuples);
281     PQclear(result);
282     if (RollbackTransaction())
283         {
284         printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to rollback transaction'\n");
285         }
286     return -1;
287     }
288
289 /*std::stringstream tuple;
290
291 for (int i = 0; i < 8; ++i)
292     {
293     tuple << PQgetvalue(result, 0, i) << " ";
294     }*/
295
296 str2x(PQgetvalue(result, 0, 0), msg->header.ver);
297 str2x(PQgetvalue(result, 0, 1), msg->header.type);
298 msg->header.lastSendTime = TS2Int(PQgetvalue(result, 0, 2));
299 msg->header.creationTime = TS2Int(PQgetvalue(result, 0, 3));
300 str2x(PQgetvalue(result, 0, 4), msg->header.showTime);
301 str2x(PQgetvalue(result, 0, 5), msg->header.repeat);
302 str2x(PQgetvalue(result, 0, 6), msg->header.repeatPeriod);
303 msg->text = PQgetvalue(result, 0, 7);
304
305 PQclear(result);
306
307 /*tuple >> msg->header.ver;
308 tuple >> msg->header.type;
309 tuple >> msg->header.lastSendTime;
310 tuple >> msg->header.creationTime;
311 tuple >> msg->header.showTime;
312 tuple >> msg->header.repeat;
313 tuple >> msg->header.repeatPeriod;
314 tuple >> msg->text;*/
315
316 if (CommitTransaction())
317     {
318     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to commit transaction'\n");
319     return -1;
320     }
321
322 return 0;
323 }
324 //-----------------------------------------------------------------------------
325 int POSTGRESQL_STORE::DelMessage(uint64_t id, const string &) const
326 {
327 STG_LOCKER lock(&mutex, __FILE__, __LINE__);
328
329 if (PQstatus(connection) != CONNECTION_OK)
330     {
331     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
332     if (Reset())
333         {
334         strError = "Connection lost";
335         printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): '%s'\n", strError.c_str());
336         return -1;
337         }
338     }
339
340 PGresult * result;
341
342 if (StartTransaction())
343     {
344     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Failed to start transaction'\n");
345     return -1;
346     }
347
348 std::stringstream query;
349 query << "DELETE FROM tb_messages WHERE pk_message = " << id;
350
351 result = PQexec(connection, query.str().c_str());
352
353 if (PQresultStatus(result) != PGRES_COMMAND_OK)
354     {
355     strError = PQresultErrorMessage(result);
356     PQclear(result);
357     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): '%s'\n", strError.c_str());
358     if (RollbackTransaction())
359         {
360         printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Failed to rollback transaction'\n");
361         }
362     return -1;
363     }
364
365 PQclear(result);
366
367 if (CommitTransaction())
368     {
369     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Failed to commit transaction'\n");
370     return -1;
371     }
372
373 return 0;
374 }
375 //-----------------------------------------------------------------------------
376 int POSTGRESQL_STORE::GetMessageHdrs(vector<STG_MSG_HDR> * hdrsList,
377                                    const string & login) const
378 {
379 STG_LOCKER lock(&mutex, __FILE__, __LINE__);
380
381 if (PQstatus(connection) != CONNECTION_OK)
382     {
383     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
384     if (Reset())
385         {
386         strError = "Connection lost";
387         printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): '%s'\n", strError.c_str());
388         return -1;
389         }
390     }
391
392 PGresult * result;
393
394 if (StartTransaction())
395     {
396     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to start transaction'\n");
397     return -1;
398     }
399
400 std::string elogin = login;
401
402 if (EscapeString(elogin))
403     {
404     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to escape login'\n");
405     if (RollbackTransaction())
406         {
407         printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to rollback transaction'\n");
408         }
409     return -1;
410     }
411
412 std::stringstream query;
413 query << "SELECT pk_message, ver, msg_type, \
414                  last_send_time, creation_time, show_time, \
415                  repeat, repeat_period \
416           FROM tb_messages \
417           WHERE fk_user IN \
418                 (SELECT pk_user FROM tb_users \
419           WHERE name = '" << elogin << "')";
420
421 result = PQexec(connection, query.str().c_str());
422
423 if (PQresultStatus(result) != PGRES_TUPLES_OK)
424     {
425     strError = PQresultErrorMessage(result);
426     PQclear(result);
427     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): '%s'\n", strError.c_str());
428     if (RollbackTransaction())
429         {
430         printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to rollback transaction'\n");
431         }
432     return -1;
433     }
434
435 int tuples = PQntuples(result);
436
437 for (int i = 0; i < tuples; ++i)
438     {
439     std::stringstream tuple;
440     STG_MSG_HDR header;
441     tuple << PQgetvalue(result, i, 0) << " ";
442     tuple << PQgetvalue(result, i, 1) << " ";
443     tuple << PQgetvalue(result, i, 2) << " ";
444     header.lastSendTime = TS2Int(PQgetvalue(result, i, 3));
445     header.creationTime = TS2Int(PQgetvalue(result, i, 4));
446     tuple << PQgetvalue(result, i, 5) << " ";
447     tuple << PQgetvalue(result, i, 6) << " ";
448     tuple << PQgetvalue(result, i, 7) << " ";
449
450     tuple >> header.id;
451     tuple >> header.ver;
452     tuple >> header.type;
453     tuple >> header.showTime;
454     tuple >> header.repeat;
455     tuple >> header.repeatPeriod;
456     hdrsList->push_back(header);
457     }
458
459 PQclear(result);
460
461 if (CommitTransaction())
462     {
463     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to commit transaction'\n");
464     return -1;
465     }
466
467 return 0;
468 }
469 //-----------------------------------------------------------------------------
470