]> git.stg.codes - stg.git/blob - stargazer/plugins/store/postgresql/postgresql_store_messages.cpp
RawPacket-related changes.
[stg.git] / stargazer / plugins / store / postgresql / postgresql_store_messages.cpp
1 /*
2  *    This program is free software; you can redistribute it and/or modify
3  *    it under the terms of the GNU General Public License as published by
4  *    the Free Software Foundation; either version 2 of the License, or
5  *    (at your option) any later version.
6  *
7  *    This program is distributed in the hope that it will be useful,
8  *    but WITHOUT ANY WARRANTY; without even the implied warranty of
9  *    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10  *    GNU General Public License for more details.
11  *
12  *    You should have received a copy of the GNU General Public License
13  *    along with this program; if not, write to the Free Software
14  *    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
15  */
16
17 /*
18  *    Author : Maxim Mamontov <faust@stargazer.dp.ua>
19  */
20
21
22 /*
23  *  Messages manipualtion methods
24  *
25  *  $Revision: 1.6 $
26  *  $Date: 2009/07/15 11:19:42 $
27  *
28  */
29
30 #include "postgresql_store.h"
31
32 #include "stg/common.h"
33 #include "stg/locker.h"
34 #include "stg/message.h"
35
36 #include <string>
37 #include <vector>
38 #include <sstream>
39
40 #include <libpq-fe.h>
41
42 //-----------------------------------------------------------------------------
43 int POSTGRESQL_STORE::AddMessage(STG::Message * msg, const std::string & login) const
44 {
45 STG_LOCKER lock(&mutex);
46
47 if (PQstatus(connection) != CONNECTION_OK)
48     {
49     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
50     if (Reset())
51         {
52         strError = "Connection lost";
53         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): '%s'\n", strError.c_str());
54         return -1;
55         }
56     }
57
58 PGresult * result;
59
60 if (StartTransaction())
61     {
62     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to start transaction'\n");
63     return -1;
64     }
65
66 std::string elogin = login;
67 std::string etext = msg->text;
68
69 if (EscapeString(elogin))
70     {
71     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to escape login'\n");
72     if (RollbackTransaction())
73         {
74         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
75         }
76     return -1;
77     }
78
79 if (EscapeString(etext))
80     {
81     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to escape message text'\n");
82     if (RollbackTransaction())
83         {
84         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
85         }
86     return -1;
87     }
88
89 std::ostringstream query;
90 query << "SELECT sp_add_message("
91       << "'" << elogin << "', "
92       << "CAST(1 AS SMALLINT), " // Here need to be a version, but, it's uninitiated actually
93       << "CAST(" << msg->header.type << " AS SMALLINT), "
94       << "CAST('" << formatTime(msg->header.lastSendTime) << "' AS TIMESTAMP), "
95       << "CAST('" << formatTime(msg->header.creationTime) << "' AS TIMESTAMP), "
96       << msg->header.showTime << ", "
97       << "CAST(" << msg->header.repeat << " AS SMALLINT), "
98       << msg->header.repeatPeriod << ", "
99       << "'" << etext << "')";
100
101 result = PQexec(connection, query.str().c_str());
102
103 if (PQresultStatus(result) != PGRES_TUPLES_OK)
104     {
105     strError = PQresultErrorMessage(result);
106     PQclear(result);
107     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): '%s'\n", strError.c_str());
108     if (RollbackTransaction())
109         {
110         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
111         }
112     return -1;
113     }
114
115 int tuples = PQntuples(result);
116
117 if (tuples != 1)
118     {
119     strError = "Failed to fetch newlly added message ID";
120     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Invalid number of tuples. Wanted 1, actulally %d'\n", tuples);
121     PQclear(result);
122     if (RollbackTransaction())
123         {
124         printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to rollback transaction'\n");
125         }
126     return -1;
127     }
128
129 std::stringstream tuple;
130 tuple << PQgetvalue(result, 0, 0);
131
132 PQclear(result);
133
134 tuple >> msg->header.id;
135
136 if (CommitTransaction())
137     {
138     printfd(__FILE__, "POSTGRESQL_STORE::AddMessage(): 'Failed to commit transaction'\n");
139     return -1;
140     }
141
142 return 0;
143 }
144 //-----------------------------------------------------------------------------
145 int POSTGRESQL_STORE::EditMessage(const STG::Message & msg,
146                                   const std::string & login) const
147 {
148 STG_LOCKER lock(&mutex);
149
150 if (PQstatus(connection) != CONNECTION_OK)
151     {
152     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
153     if (Reset())
154         {
155         strError = "Connection lost";
156         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): '%s'\n", strError.c_str());
157         return -1;
158         }
159     }
160
161 PGresult * result;
162
163 if (StartTransaction())
164     {
165     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to start transaction'\n");
166     return -1;
167     }
168
169 std::string elogin = login;
170 std::string etext = msg.text;
171
172 if (EscapeString(elogin))
173     {
174     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to escape login'\n");
175     if (RollbackTransaction())
176         {
177         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to rollback transaction'\n");
178         }
179     return -1;
180     }
181
182 if (EscapeString(etext))
183     {
184     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to escape message text'\n");
185     if (RollbackTransaction())
186         {
187         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to rollback transaction'\n");
188         }
189     return -1;
190     }
191
192 std::ostringstream query;
193 query << "UPDATE tb_messages SET "
194           << "fk_user = (SELECT pk_user FROM tb_users WHERE name = '" << elogin << "'), "
195           << "ver = " << msg.header.ver << ", "
196           << "msg_type = " << msg.header.type << ", "
197           << "last_send_time = CAST('" << formatTime(msg.header.lastSendTime) << "' AS TIMESTAMP), "
198           << "creation_time = CAST('" << formatTime(msg.header.creationTime) << "' AS TIMESTAMP), "
199           << "show_time = " << msg.header.showTime << ", "
200           << "repeat = " << msg.header.repeat << ", "
201           << "repeat_period = " << msg.header.repeatPeriod << ", "
202           << "msg_text = '" << etext << "' "
203       << "WHERE pk_message = " << msg.header.id;
204
205 result = PQexec(connection, query.str().c_str());
206
207 if (PQresultStatus(result) != PGRES_COMMAND_OK)
208     {
209     strError = PQresultErrorMessage(result);
210     PQclear(result);
211     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): '%s'\n", strError.c_str());
212     if (RollbackTransaction())
213         {
214         printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to rollback transaction'\n");
215         }
216     return -1;
217     }
218
219 PQclear(result);
220
221 if (CommitTransaction())
222     {
223     printfd(__FILE__, "POSTGRESQL_STORE::EditMessage(): 'Failed to commit transaction'\n");
224     return -1;
225     }
226
227 return 0;
228 }
229 //-----------------------------------------------------------------------------
230 int POSTGRESQL_STORE::GetMessage(uint64_t id,
231                                  STG::Message * msg,
232                                  const std::string &) const
233 {
234 STG_LOCKER lock(&mutex);
235
236 if (PQstatus(connection) != CONNECTION_OK)
237     {
238     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
239     if (Reset())
240         {
241         strError = "Connection lost";
242         printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): '%s'\n", strError.c_str());
243         return -1;
244         }
245     }
246
247 PGresult * result;
248
249 if (StartTransaction())
250     {
251     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to start transaction'\n");
252     return -1;
253     }
254
255 std::ostringstream query;
256 query << "SELECT ver, msg_type, last_send_time, \
257                  creation_time, show_time, repeat, \
258                  repeat_period, msg_text \
259           FROM tb_messages \
260           WHERE pk_message = " << id;
261
262 result = PQexec(connection, query.str().c_str());
263
264 if (PQresultStatus(result) != PGRES_TUPLES_OK)
265     {
266     strError = PQresultErrorMessage(result);
267     PQclear(result);
268     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): '%s'\n", strError.c_str());
269     if (RollbackTransaction())
270         {
271         printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to rollback transaction'\n");
272         }
273     return -1;
274     }
275
276 int tuples = PQntuples(result);
277
278 if (tuples != 1)
279     {
280     strError = "Failed to fetch message data";
281     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Invalid number of tuples. Wanted 1, actulally %d'\n", tuples);
282     PQclear(result);
283     if (RollbackTransaction())
284         {
285         printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to rollback transaction'\n");
286         }
287     return -1;
288     }
289
290 str2x(PQgetvalue(result, 0, 0), msg->header.ver);
291 str2x(PQgetvalue(result, 0, 1), msg->header.type);
292 msg->header.lastSendTime = static_cast<unsigned int>(readTime(PQgetvalue(result, 0, 2)));
293 msg->header.creationTime = static_cast<unsigned int>(readTime(PQgetvalue(result, 0, 3)));
294 str2x(PQgetvalue(result, 0, 4), msg->header.showTime);
295 str2x(PQgetvalue(result, 0, 5), msg->header.repeat);
296 str2x(PQgetvalue(result, 0, 6), msg->header.repeatPeriod);
297 msg->text = PQgetvalue(result, 0, 7);
298
299 PQclear(result);
300
301 if (CommitTransaction())
302     {
303     printfd(__FILE__, "POSTGRESQL_STORE::GetMessage(): 'Failed to commit transaction'\n");
304     return -1;
305     }
306
307 return 0;
308 }
309 //-----------------------------------------------------------------------------
310 int POSTGRESQL_STORE::DelMessage(uint64_t id, const std::string &) const
311 {
312 STG_LOCKER lock(&mutex);
313
314 if (PQstatus(connection) != CONNECTION_OK)
315     {
316     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
317     if (Reset())
318         {
319         strError = "Connection lost";
320         printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): '%s'\n", strError.c_str());
321         return -1;
322         }
323     }
324
325 PGresult * result;
326
327 if (StartTransaction())
328     {
329     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Failed to start transaction'\n");
330     return -1;
331     }
332
333 std::ostringstream query;
334 query << "DELETE FROM tb_messages WHERE pk_message = " << id;
335
336 result = PQexec(connection, query.str().c_str());
337
338 if (PQresultStatus(result) != PGRES_COMMAND_OK)
339     {
340     strError = PQresultErrorMessage(result);
341     PQclear(result);
342     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): '%s'\n", strError.c_str());
343     if (RollbackTransaction())
344         {
345         printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Failed to rollback transaction'\n");
346         }
347     return -1;
348     }
349
350 PQclear(result);
351
352 if (CommitTransaction())
353     {
354     printfd(__FILE__, "POSTGRESQL_STORE::DelMessage(): 'Failed to commit transaction'\n");
355     return -1;
356     }
357
358 return 0;
359 }
360 //-----------------------------------------------------------------------------
361 int POSTGRESQL_STORE::GetMessageHdrs(std::vector<STG::Message::Header> * hdrsList,
362                                    const std::string & login) const
363 {
364 STG_LOCKER lock(&mutex);
365
366 if (PQstatus(connection) != CONNECTION_OK)
367     {
368     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Connection lost. Trying to reconnect...'\n", strError.c_str());
369     if (Reset())
370         {
371         strError = "Connection lost";
372         printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): '%s'\n", strError.c_str());
373         return -1;
374         }
375     }
376
377 PGresult * result;
378
379 if (StartTransaction())
380     {
381     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to start transaction'\n");
382     return -1;
383     }
384
385 std::string elogin = login;
386
387 if (EscapeString(elogin))
388     {
389     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to escape login'\n");
390     if (RollbackTransaction())
391         {
392         printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to rollback transaction'\n");
393         }
394     return -1;
395     }
396
397 std::ostringstream query;
398 query << "SELECT pk_message, ver, msg_type, \
399                  last_send_time, creation_time, show_time, \
400                  repeat, repeat_period \
401           FROM tb_messages \
402           WHERE fk_user IN \
403                 (SELECT pk_user FROM tb_users \
404           WHERE name = '" << elogin << "')";
405
406 result = PQexec(connection, query.str().c_str());
407
408 if (PQresultStatus(result) != PGRES_TUPLES_OK)
409     {
410     strError = PQresultErrorMessage(result);
411     PQclear(result);
412     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): '%s'\n", strError.c_str());
413     if (RollbackTransaction())
414         {
415         printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to rollback transaction'\n");
416         }
417     return -1;
418     }
419
420 int tuples = PQntuples(result);
421
422 for (int i = 0; i < tuples; ++i)
423     {
424     std::stringstream tuple;
425     STG::Message::Header header;
426     tuple << PQgetvalue(result, i, 0) << " ";
427     tuple << PQgetvalue(result, i, 1) << " ";
428     tuple << PQgetvalue(result, i, 2) << " ";
429     header.lastSendTime = static_cast<unsigned int>(readTime(PQgetvalue(result, i, 3)));
430     header.creationTime = static_cast<unsigned int>(readTime(PQgetvalue(result, i, 4)));
431     tuple << PQgetvalue(result, i, 5) << " ";
432     tuple << PQgetvalue(result, i, 6) << " ";
433     tuple << PQgetvalue(result, i, 7) << " ";
434
435     tuple >> header.id;
436     tuple >> header.ver;
437     tuple >> header.type;
438     tuple >> header.showTime;
439     tuple >> header.repeat;
440     tuple >> header.repeatPeriod;
441     hdrsList->push_back(header);
442     }
443
444 PQclear(result);
445
446 if (CommitTransaction())
447     {
448     printfd(__FILE__, "POSTGRESQL_STORE::GetMessageHdrs(): 'Failed to commit transaction'\n");
449     return -1;
450     }
451
452 return 0;
453 }
454 //-----------------------------------------------------------------------------
455