root/src/stratcon_datastore.c

Revision 4201a420ec590bb2be00011f3d52f670320141e6, 23.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fixes #183

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "stratcon_datastore.h"
38 #include "stratcon_realtime_http.h"
39 #include "stratcon_iep.h"
40 #include "noit_conf.h"
41 #include "noit_check.h"
42 #include <unistd.h>
43 #include <netinet/in.h>
44 #include <sys/un.h>
45 #include <arpa/inet.h>
46 #include <libpq-fe.h>
47 #include <zlib.h>
48 #include <assert.h>
49
50 static char *check_loadall = NULL;
51 static const char *check_loadall_conf = "/stratcon/database/statements/allchecks";
52 static char *check_find = NULL;
53 static const char *check_find_conf = "/stratcon/database/statements/findcheck";
54 static char *check_insert = NULL;
55 static const char *check_insert_conf = "/stratcon/database/statements/check";
56 static char *status_insert = NULL;
57 static const char *status_insert_conf = "/stratcon/database/statements/status";
58 static char *metric_insert_numeric = NULL;
59 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
60 static char *metric_insert_text = NULL;
61 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
62 static char *config_insert = NULL;
63 static const char *config_insert_conf = "/stratcon/database/statements/config";
64
65 static struct datastore_onlooker_list {
66   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, void *);
67   struct datastore_onlooker_list *next;
68 } *onlookers = NULL;
69
70 #define GET_QUERY(a) do { \
71   if(a == NULL) \
72     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
73       goto bad_row; \
74 } while(0)
75
76 #define MAX_PARAMS 8
77 #define POSTGRES_PARTS \
78   PGresult *res; \
79   int rv; \
80   time_t whence; \
81   int nparams; \
82   int metric_type; \
83   char *paramValues[MAX_PARAMS]; \
84   int paramLengths[MAX_PARAMS]; \
85   int paramFormats[MAX_PARAMS]; \
86   int paramAllocd[MAX_PARAMS];
87
88 typedef struct ds_single_detail {
89   POSTGRES_PARTS
90 } ds_single_detail;
91 typedef struct ds_job_detail {
92   /* Postgres specific stuff */
93   POSTGRES_PARTS
94
95   char *data;  /* The raw string, NULL means the stream is done -- commit. */
96   struct realtime_tracker *rt;
97
98   int problematic;
99   eventer_t completion_event; /* This event should be registered if non NULL */
100   struct ds_job_detail *next;
101 } ds_job_detail;
102
103 typedef struct {
104   struct sockaddr *remote;
105   eventer_jobq_t  *jobq;
106   /* Postgres specific stuff */
107   PGconn          *dbh;
108   ds_job_detail   *head;
109   ds_job_detail   *tail;
110 } conn_q;
111
112 static int stratcon_database_connect(conn_q *cq);
113
114 static void
115 free_params(ds_single_detail *d) {
116   int i;
117   for(i=0; i<d->nparams; i++)
118     if(d->paramAllocd[i] && d->paramValues[i])
119       free(d->paramValues[i]);
120 }
121
122 static void
123 __append(conn_q *q, ds_job_detail *d) {
124   d->next = NULL;
125   if(!q->head) q->head = q->tail = d;
126   else {
127     q->tail->next = d;
128     q->tail = d;
129   }
130 }
131 static void
132 __remove_until(conn_q *q, ds_job_detail *d) {
133   ds_job_detail *next;
134   while(q->head && q->head != d) {
135     next = q->head;
136     q->head = q->head->next;
137     free_params((ds_single_detail *)next);
138     if(next->data) free(next->data);
139     free(next);
140   }
141   if(!q->head) q->tail = NULL;
142 }
143
144 noit_hash_table ds_conns;
145
146 conn_q *
147 __get_conn_q_for_remote(struct sockaddr *remote) {
148   void *vcq;
149   conn_q *cq;
150   char queue_name[128] = "datastore_";
151   static const char __zeros[4] = { 0 };
152   int len = 0;
153   if(remote) {
154     switch(remote->sa_family) {
155       case AF_INET:
156         len = sizeof(struct sockaddr_in);
157         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
158                   queue_name + strlen("datastore_"), len);
159         break;
160       case AF_INET6:
161        len = sizeof(struct sockaddr_in6);
162         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
163                   queue_name + strlen("datastore_"), len);
164        break;
165       case AF_UNIX:
166         len = SUN_LEN(((struct sockaddr_un *)remote));
167         snprintf(queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path);
168         break;
169       default: return NULL;
170     }
171   }
172   else {
173     /* This is a dummy connection */
174     remote = (struct sockaddr *)__zeros;
175     snprintf(queue_name, sizeof(queue_name), "datastore_default");
176     len = 4;
177   }
178   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, &vcq))
179     return vcq;
180   cq = calloc(1, sizeof(*cq));
181   cq->remote = malloc(len);
182   memcpy(cq->remote, remote, len);
183   cq->jobq = calloc(1, sizeof(*cq->jobq));
184   eventer_jobq_init(cq->jobq, queue_name);
185   cq->jobq->backq = eventer_default_backq();
186   /* Add one thread */
187   eventer_jobq_increase_concurrency(cq->jobq);
188   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
189   return cq;
190 }
191
192 typedef enum {
193   DS_EXEC_SUCCESS = 0,
194   DS_EXEC_ROW_FAILED = 1,
195   DS_EXEC_TXN_FAILED = 2,
196 } execute_outcome_t;
197
198 static char *
199 __noit__strndup(const char *src, int len) {
200   int slen;
201   char *dst;
202   for(slen = 0; slen < len; slen++)
203     if(src[slen] == '\0') break;
204   dst = malloc(slen + 1);
205   memcpy(dst, src, slen);
206   dst[slen] = '\0';
207   return dst;
208 }
209 #define DECLARE_PARAM_STR(str, len) do { \
210   d->paramValues[d->nparams] = __noit__strndup(str, len); \
211   d->paramLengths[d->nparams] = len; \
212   d->paramFormats[d->nparams] = 0; \
213   d->paramAllocd[d->nparams] = 1; \
214   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
215     free(d->paramValues[d->nparams]); \
216     d->paramValues[d->nparams] = NULL; \
217     d->paramLengths[d->nparams] = 0; \
218     d->paramAllocd[d->nparams] = 0; \
219   } \
220   d->nparams++; \
221 } while(0)
222 #define DECLARE_PARAM_INT(i) do { \
223   int buffer__len; \
224   char buffer__[32]; \
225   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
226   buffer__len = strlen(buffer__); \
227   DECLARE_PARAM_STR(buffer__, buffer__len); \
228 } while(0)
229
230 #define PG_GET_STR_COL(dest, row, name) do { \
231   int colnum = PQfnumber(d->res, name); \
232   dest = NULL; \
233   if (colnum >= 0) \
234     dest = PQgetisnull(d->res, row, colnum) \
235          ? NULL : PQgetvalue(d->res, row, colnum); \
236 } while(0)
237
238 #define PG_EXEC(cmd) do { \
239   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
240                         (const char * const *)d->paramValues, \
241                         d->paramLengths, d->paramFormats, 0); \
242   d->rv = PQresultStatus(d->res); \
243   if(d->rv != PGRES_COMMAND_OK && \
244      d->rv != PGRES_TUPLES_OK) { \
245     noitL(noit_error, "stratcon datasource bad (%d): %s\n'%s'\n", \
246           d->rv, PQresultErrorMessage(d->res), cmd); \
247     PQclear(d->res); \
248     goto bad_row; \
249   } \
250 } while(0)
251
252 #define PG_TM_EXEC(cmd, whence) do { \
253   time_t __w = whence; \
254   char cmdbuf[4096]; \
255   struct tm tbuf, *tm; \
256   tm = gmtime_r(&__w, &tbuf); \
257   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
258   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
259                         (const char * const *)d->paramValues, \
260                         d->paramLengths, d->paramFormats, 0); \
261   d->rv = PQresultStatus(d->res); \
262   if(d->rv != PGRES_COMMAND_OK && \
263      d->rv != PGRES_TUPLES_OK) { \
264     noitL(noit_error, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \
265           d->rv, PQresultErrorMessage(d->res), cmdbuf, \
266           (long long unsigned)whence); \
267     PQclear(d->res); \
268     goto bad_row; \
269   } \
270 } while(0)
271
272 static int
273 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
274                                     struct timeval *now) {
275   conn_q *cq = closure;
276   ds_job_detail *d;
277   int i, row_count = 0, good = 0;
278   char buff[1024];
279
280   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
281   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
282
283   stratcon_database_connect(cq);
284   d = calloc(1, sizeof(*d));
285   GET_QUERY(check_loadall);
286   PG_EXEC(check_loadall);
287   row_count = PQntuples(d->res);
288  
289   for(i=0; i<row_count; i++) {
290     int rv;
291     int8_t family;
292     struct sockaddr *sin;
293     struct sockaddr_in sin4 = { .sin_family = AF_INET };
294     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
295     char *remote, *id, *target, *module, *name;
296     PG_GET_STR_COL(remote, i, "remote_address");
297     PG_GET_STR_COL(id, i, "id");
298     PG_GET_STR_COL(target, i, "target");
299     PG_GET_STR_COL(module, i, "module");
300     PG_GET_STR_COL(name, i, "name");
301     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
302
303     family = AF_INET;
304     sin = (struct sockaddr *)&sin4;
305     rv = inet_pton(family, remote, &sin4.sin_addr);
306     if(rv != 1) {
307       family = AF_INET6;
308       sin = (struct sockaddr *)&sin6;
309       rv = inet_pton(family, remote, &sin6.sin6_addr);
310       if(rv != 1) {
311         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
312         sin = NULL;
313       }
314     }
315
316     /* stratcon_iep_line_processor takes an allocated operand and frees it */
317     stratcon_iep_line_processor(DS_OP_INSERT, sin, strdup(buff));
318     good++;
319   }
320   noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count);
321  bad_row:
322   PQclear(d->res);
323   free(d);
324   return 0;
325 }
326 void
327 stratcon_datastore_iep_check_preload() {
328   eventer_t e;
329   conn_q *cq;
330   cq = __get_conn_q_for_remote(NULL);
331
332   e = eventer_alloc();
333   e->mask = EVENTER_ASYNCH;
334   e->callback = stratcon_datastore_asynch_drive_iep;
335   e->closure = cq;
336   eventer_add_asynch(cq->jobq, e);
337 }
338 execute_outcome_t
339 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) {
340   char *val;
341   int row_count;
342
343   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid);
344   GET_QUERY(check_find);
345   PG_EXEC(check_find);
346   row_count = PQntuples(d->res);
347   if(row_count != 1) goto bad_row;
348
349   /* Get the check uuid */
350   PG_GET_STR_COL(val, 0, "id");
351   if(!val) goto bad_row;
352   if(uuid_parse(val, d->rt->checkid)) goto bad_row;
353
354   /* Get the remote_address (which noit owns this) */
355   PG_GET_STR_COL(val, 0, "remote_address");
356   if(!val) goto bad_row;
357   d->rt->noit = strdup(val);
358
359   PQclear(d->res);
360   return DS_EXEC_SUCCESS;
361  bad_row:
362   return DS_EXEC_ROW_FAILED;
363 }
364 execute_outcome_t
365 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
366   int type, len;
367   char *final_buff;
368   uLong final_len, actual_final_len;;
369   char *token;
370
371   type = d->data[0];
372
373   /* Parse the log line, but only if we haven't already */
374   if(!d->nparams) {
375     char raddr[128];
376     char *scp, *ecp;
377
378     /* setup our remote address */
379     raddr[0] = '\0';
380     switch(r->sa_family) {
381       case AF_INET:
382         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
383                   raddr, sizeof(raddr));
384         break;
385       case AF_INET6:
386         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
387                   raddr, sizeof(raddr));
388         break;
389       default:
390         noitL(noit_error, "remote address of family %d\n", r->sa_family);
391     }
392  
393     scp = d->data;
394 #define PROCESS_NEXT_FIELD(t,l) do { \
395   if(!*scp) goto bad_row; \
396   ecp = strchr(scp, '\t'); \
397   if(!ecp) goto bad_row; \
398   token = scp; \
399   len = (ecp-scp); \
400   scp = ecp + 1; \
401 } while(0)
402 #define PROCESS_LAST_FIELD(t,l) do { \
403   if(!*scp) ecp = scp; \
404   else { \
405     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
406     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
407   } \
408   t = scp; \
409   l = (ecp-scp); \
410 } while(0)
411
412     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
413     switch(type) {
414       /* See noit_check_log.c for log description */
415       case 'n':
416         DECLARE_PARAM_STR(raddr, strlen(raddr));
417         DECLARE_PARAM_STR("noitd",5); /* node_type */
418         PROCESS_NEXT_FIELD(token,len);
419         d->whence = (time_t)strtoul(token, NULL, 10);
420         DECLARE_PARAM_STR(token,len); /* timestamp */
421
422         /* This is the expected uncompressed len */
423         PROCESS_NEXT_FIELD(token,len);
424         final_len = atoi(token);
425         final_buff = malloc(final_len);
426         if(!final_buff) goto bad_row;
427  
428         /* The last token is b64 endoded and compressed.
429          * we need to decode it, declare it and then free it.
430          */
431         PROCESS_LAST_FIELD(token, len);
432         /* We can in-place decode this */
433         len = noit_b64_decode((char *)token, len,
434                               (unsigned char *)token, len);
435         if(len <= 0) {
436           noitL(noit_error, "noitd config base64 decoding error.\n");
437           free(final_buff);
438           goto bad_row;
439         }
440         actual_final_len = final_len;
441         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
442                               (unsigned char *)token, len)) {
443           noitL(noit_error, "noitd config decompression failure.\n");
444           free(final_buff);
445           goto bad_row;
446         }
447         if(final_len != actual_final_len) {
448           noitL(noit_error, "noitd config decompression error.\n");
449           free(final_buff);
450           goto bad_row;
451         }
452         DECLARE_PARAM_STR(final_buff, final_len);
453         free(final_buff);
454         break;
455       case 'C':
456         DECLARE_PARAM_STR(raddr, strlen(raddr));
457         PROCESS_NEXT_FIELD(token,len);
458         DECLARE_PARAM_STR(token,len); /* timestamp */
459         d->whence = (time_t)strtoul(token, NULL, 10);
460         PROCESS_NEXT_FIELD(token, len);
461         DECLARE_PARAM_STR(token,len); /* uuid */
462         PROCESS_NEXT_FIELD(token, len);
463         DECLARE_PARAM_STR(token,len); /* target */
464         PROCESS_NEXT_FIELD(token, len);
465         DECLARE_PARAM_STR(token,len); /* module */
466         PROCESS_LAST_FIELD(token, len);
467         DECLARE_PARAM_STR(token,len); /* name */
468         break;
469       case 'M':
470         PROCESS_NEXT_FIELD(token,len);
471         DECLARE_PARAM_STR(token,len); /* timestamp */
472         d->whence = (time_t)strtoul(token, NULL, 10);
473         PROCESS_NEXT_FIELD(token, len);
474         DECLARE_PARAM_STR(token,len); /* uuid */
475         PROCESS_NEXT_FIELD(token, len);
476         DECLARE_PARAM_STR(token,len); /* name */
477         PROCESS_NEXT_FIELD(token,len);
478         d->metric_type = *token;
479         PROCESS_LAST_FIELD(token,len);
480         DECLARE_PARAM_STR(token,len); /* value */
481         break;
482       case 'S':
483         PROCESS_NEXT_FIELD(token,len);
484         DECLARE_PARAM_STR(token,len); /* timestamp */
485         d->whence = (time_t)strtoul(token, NULL, 10);
486         PROCESS_NEXT_FIELD(token, len);
487         DECLARE_PARAM_STR(token,len); /* uuid */
488         PROCESS_NEXT_FIELD(token, len);
489         DECLARE_PARAM_STR(token,len); /* state */
490         PROCESS_NEXT_FIELD(token, len);
491         DECLARE_PARAM_STR(token,len); /* availability */
492         PROCESS_NEXT_FIELD(token, len);
493         DECLARE_PARAM_STR(token,len); /* duration */
494         PROCESS_LAST_FIELD(token,len);
495         DECLARE_PARAM_STR(token,len); /* status */
496         break;
497       default:
498         goto bad_row;
499     }
500
501   }
502
503   /* Now execute the query */
504   switch(type) {
505     case 'n':
506       GET_QUERY(config_insert);
507       PG_EXEC(config_insert);
508       PQclear(d->res);
509       break;
510     case 'C':
511       GET_QUERY(check_insert);
512       PG_EXEC(check_insert);
513       PQclear(d->res);
514       break;
515     case 'S':
516       GET_QUERY(status_insert);
517       PG_TM_EXEC(status_insert, d->whence);
518       PQclear(d->res);
519       break;
520     case 'M':
521       switch(d->metric_type) {
522         case METRIC_INT32:
523         case METRIC_UINT32:
524         case METRIC_INT64:
525         case METRIC_UINT64:
526         case METRIC_DOUBLE:
527           GET_QUERY(metric_insert_numeric);
528           PG_TM_EXEC(metric_insert_numeric, d->whence);
529           PQclear(d->res);
530           break;
531         case METRIC_STRING:
532           GET_QUERY(metric_insert_text);
533           PG_TM_EXEC(metric_insert_text, d->whence);
534           PQclear(d->res);
535           break;
536         default:
537           goto bad_row;
538       }
539       break;
540     default:
541       /* should never get here */
542       goto bad_row;
543   }
544   return DS_EXEC_SUCCESS;
545  bad_row:
546   return DS_EXEC_ROW_FAILED;
547 }
548 static int
549 stratcon_database_connect(conn_q *cq) {
550   char dsn[512];
551   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
552   const char *k, *v;
553   int klen;
554   noit_hash_table *t;
555
556   dsn[0] = '\0';
557   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
558   while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
559     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
560     strlcat(dsn, k, sizeof(dsn));
561     strlcat(dsn, "=", sizeof(dsn));
562     strlcat(dsn, v, sizeof(dsn));
563   }
564   noit_hash_destroy(t, free, free);
565   free(t);
566
567   if(cq->dbh) {
568     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
569     PQreset(cq->dbh);
570     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
571     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
572           dsn, PQerrorMessage(cq->dbh));
573     return -1;
574   }
575
576   cq->dbh = PQconnectdb(dsn);
577   if(!cq->dbh) return -1;
578   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
579   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
580         dsn, PQerrorMessage(cq->dbh));
581   return -1;
582 }
583 static int
584 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
585                                 const char *name) {
586   int rv = -1;
587   PGresult *res;
588   char cmd[128];
589   strlcpy(cmd, p, sizeof(cmd));
590   strlcat(cmd, name, sizeof(cmd));
591   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
592   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
593   PQclear(res);
594   return rv;
595 }
596 static int
597 stratcon_datastore_do(conn_q *cq, const char *cmd) {
598   PGresult *res;
599   int rv = -1;
600   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
601   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
602   PQclear(res);
603   return rv;
604 }
605 #define BUSTED(cq) do { \
606   PQfinish((cq)->dbh); \
607   (cq)->dbh = NULL; \
608   goto full_monty; \
609 } while(0)
610 #define SAVEPOINT(name) do { \
611   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
612   last_sp = current; \
613 } while(0)
614 #define ROLLBACK_TO_SAVEPOINT(name) do { \
615   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
616     BUSTED(cq); \
617   last_sp = NULL; \
618 } while(0)
619 #define RELEASE_SAVEPOINT(name) do { \
620   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
621     BUSTED(cq); \
622   last_sp = NULL; \
623 } while(0)
624 int
625 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
626                                  struct timeval *now) {
627   conn_q *cq = closure;
628   ds_job_detail *current, *next;
629   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
630
631   if(!cq->head) return 0;
632
633   stratcon_database_connect(cq);
634
635   current = cq->head;
636   while(current) {
637     if(current->rt) {
638       next = current->next;
639       stratcon_datastore_find(cq, current);
640       current = next;
641     }
642     else if(current->completion_event) {
643       next = current->next;
644       eventer_add(current->completion_event);
645       current = next;
646       __remove_until(cq, current);
647     }
648     else current = current->next;
649   }
650   return 0;
651 }
652 int
653 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
654                                   struct timeval *now) {
655   int i;
656   conn_q *cq = closure;
657   ds_job_detail *current, *last_sp;
658   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
659   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
660   if(!cq->head) return 0;
661
662  full_monty:
663   /* Make sure we have a connection */
664   i = 1;
665   while(stratcon_database_connect(cq)) {
666     noitL(noit_error, "Error connecting to database\n");
667     sleep(i);
668     i *= 2;
669     i = MIN(i, 16);
670   }
671
672   current = cq->head;
673   last_sp = NULL;
674   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
675   while(current) {
676     execute_outcome_t rv;
677     if(current->data) {
678       if(!last_sp) SAVEPOINT("batch");
679  
680       if(current->problematic) {
681         noitL(noit_error, "[%s] Failed noit line: %s", cq->jobq->queue_name, current->data);
682         RELEASE_SAVEPOINT("batch");
683         current = current->next;
684         continue;
685       }
686       rv = stratcon_datastore_execute(cq, cq->remote, current);
687       switch(rv) {
688         case DS_EXEC_SUCCESS:
689           current = current->next;
690           break;
691         case DS_EXEC_ROW_FAILED:
692           /* rollback to savepoint, mark this record as bad and start again */
693           current->problematic = 1;
694           current = last_sp;
695           ROLLBACK_TO_SAVEPOINT("batch");
696           break;
697         case DS_EXEC_TXN_FAILED:
698           BUSTED(cq);
699       }
700     }
701     if(current->completion_event) {
702       if(last_sp) RELEASE_SAVEPOINT("batch");
703       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
704       eventer_add(current->completion_event);
705       current = current->next;
706       __remove_until(cq, current);
707     }
708   }
709   return 0;
710 }
711 void
712 stratcon_datastore_push(stratcon_datastore_op_t op,
713                         struct sockaddr *remote, void *operand) {
714   conn_q *cq;
715   eventer_t e;
716   ds_job_detail *dsjd;
717   struct datastore_onlooker_list *nnode;
718
719   for(nnode = onlookers; nnode; nnode = nnode->next)
720     nnode->dispatch(op,remote,operand);
721
722   cq = __get_conn_q_for_remote(remote);
723   dsjd = calloc(1, sizeof(*dsjd));
724   switch(op) {
725     case DS_OP_FIND:
726       dsjd->rt = operand;
727       __append(cq, dsjd);
728       break;
729     case DS_OP_INSERT:
730       dsjd->data = operand;
731       __append(cq, dsjd);
732       break;
733     case DS_OP_FIND_COMPLETE:
734     case DS_OP_CHKPT:
735       dsjd->completion_event = operand;
736       __append(cq,dsjd);
737       e = eventer_alloc();
738       e->mask = EVENTER_ASYNCH;
739       if(op == DS_OP_FIND_COMPLETE)
740         e->callback = stratcon_datastore_asynch_lookup;
741       else if(op == DS_OP_CHKPT)
742         e->callback = stratcon_datastore_asynch_execute;
743       e->closure = cq;
744       eventer_add_asynch(cq->jobq, e);
745       break;
746   }
747 }
748
749 int
750 stratcon_datastore_saveconfig(void *unused) {
751   int rv = -1;
752   conn_q _cq = { 0 }, *cq = &_cq;
753   char *buff;
754   ds_single_detail _d = { 0 }, *d = &_d;
755
756   if(stratcon_database_connect(cq) == 0) {
757     char time_as_str[20];
758     size_t len;
759     buff = noit_conf_xml_in_mem(&len);
760     if(!buff) goto bad_row;
761
762     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
763     DECLARE_PARAM_STR("0.0.0.0", 7);
764     DECLARE_PARAM_STR("stratcond", 9);
765     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
766     DECLARE_PARAM_STR(buff, len);
767     free(buff);
768
769     GET_QUERY(config_insert);
770     PG_EXEC(config_insert);
771     PQclear(d->res);
772     rv = 0;
773
774     bad_row:
775       free_params(d);
776   }
777   if(cq->dbh) PQfinish(cq->dbh);
778   return rv;
779 }
780
781 void
782 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
783                                                struct sockaddr *, void *)) {
784   struct datastore_onlooker_list *nnode;
785   nnode = calloc(1, sizeof(*nnode));
786   nnode->dispatch = f;
787   nnode->next = onlookers;
788   while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (void *)nnode->next)
789     nnode->next = onlookers;
790 }
Note: See TracBrowser for help on using the browser.