root/src/stratcon_datastore.c

Revision f6540baac0ed25babb2a91d9b715988abb653bcb, 23.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

flag-day(stratcond,database) partition the loading docks, refs #140

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "stratcon_datastore.h"
38 #include "stratcon_realtime_http.h"
39 #include "stratcon_iep.h"
40 #include "noit_conf.h"
41 #include "noit_check.h"
42 #include <unistd.h>
43 #include <netinet/in.h>
44 #include <sys/un.h>
45 #include <arpa/inet.h>
46 #include <libpq-fe.h>
47 #include <zlib.h>
48 #include <assert.h>
49
50 static char *check_loadall = NULL;
51 static const char *check_loadall_conf = "/stratcon/database/statements/allchecks";
52 static char *check_find = NULL;
53 static const char *check_find_conf = "/stratcon/database/statements/findcheck";
54 static char *check_insert = NULL;
55 static const char *check_insert_conf = "/stratcon/database/statements/check";
56 static char *status_insert = NULL;
57 static const char *status_insert_conf = "/stratcon/database/statements/status";
58 static char *metric_insert_numeric = NULL;
59 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
60 static char *metric_insert_text = NULL;
61 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
62 static char *config_insert = NULL;
63 static const char *config_insert_conf = "/stratcon/database/statements/config";
64
65 static struct datastore_onlooker_list {
66   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, void *);
67   struct datastore_onlooker_list *next;
68 } *onlookers = NULL;
69
70 #define GET_QUERY(a) do { \
71   if(a == NULL) \
72     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
73       goto bad_row; \
74 } while(0)
75
76 #define MAX_PARAMS 8
77 #define POSTGRES_PARTS \
78   PGresult *res; \
79   int rv; \
80   int nparams; \
81   int metric_type; \
82   char *paramValues[MAX_PARAMS]; \
83   int paramLengths[MAX_PARAMS]; \
84   int paramFormats[MAX_PARAMS]; \
85   int paramAllocd[MAX_PARAMS];
86
87 typedef struct ds_single_detail {
88   POSTGRES_PARTS
89 } ds_single_detail;
90 typedef struct ds_job_detail {
91   /* Postgres specific stuff */
92   POSTGRES_PARTS
93
94   char *data;  /* The raw string, NULL means the stream is done -- commit. */
95   struct realtime_tracker *rt;
96
97   int problematic;
98   eventer_t completion_event; /* This event should be registered if non NULL */
99   struct ds_job_detail *next;
100 } ds_job_detail;
101
102 typedef struct {
103   struct sockaddr *remote;
104   eventer_jobq_t  *jobq;
105   /* Postgres specific stuff */
106   PGconn          *dbh;
107   ds_job_detail   *head;
108   ds_job_detail   *tail;
109 } conn_q;
110
111 static int stratcon_database_connect(conn_q *cq);
112
113 static void
114 free_params(ds_single_detail *d) {
115   int i;
116   for(i=0; i<d->nparams; i++)
117     if(d->paramAllocd[i] && d->paramValues[i])
118       free(d->paramValues[i]);
119 }
120
121 static void
122 __append(conn_q *q, ds_job_detail *d) {
123   d->next = NULL;
124   if(!q->head) q->head = q->tail = d;
125   else {
126     q->tail->next = d;
127     q->tail = d;
128   }
129 }
130 static void
131 __remove_until(conn_q *q, ds_job_detail *d) {
132   ds_job_detail *next;
133   while(q->head && q->head != d) {
134     next = q->head;
135     q->head = q->head->next;
136     free_params((ds_single_detail *)next);
137     if(next->data) free(next->data);
138     free(next);
139   }
140   if(!q->head) q->tail = NULL;
141 }
142
143 noit_hash_table ds_conns;
144
145 conn_q *
146 __get_conn_q_for_remote(struct sockaddr *remote) {
147   void *vcq;
148   conn_q *cq;
149   char queue_name[128] = "datastore_";
150   static const char __zeros[4] = { 0 };
151   int len = 0;
152   if(remote) {
153     switch(remote->sa_family) {
154       case AF_INET:
155         len = sizeof(struct sockaddr_in);
156         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
157                   queue_name + strlen("datastore_"), len);
158         break;
159       case AF_INET6:
160        len = sizeof(struct sockaddr_in6);
161         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
162                   queue_name + strlen("datastore_"), len);
163        break;
164       case AF_UNIX:
165         len = SUN_LEN(((struct sockaddr_un *)remote));
166         snprintf(queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path);
167         break;
168       default: return NULL;
169     }
170   }
171   else {
172     /* This is a dummy connection */
173     remote = (struct sockaddr *)__zeros;
174     snprintf(queue_name, sizeof(queue_name), "datastore_default");
175     len = 4;
176   }
177   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, &vcq))
178     return vcq;
179   cq = calloc(1, sizeof(*cq));
180   cq->remote = malloc(len);
181   memcpy(cq->remote, remote, len);
182   cq->jobq = calloc(1, sizeof(*cq->jobq));
183   eventer_jobq_init(cq->jobq, queue_name);
184   cq->jobq->backq = eventer_default_backq();
185   /* Add one thread */
186   eventer_jobq_increase_concurrency(cq->jobq);
187   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
188   return cq;
189 }
190
191 typedef enum {
192   DS_EXEC_SUCCESS = 0,
193   DS_EXEC_ROW_FAILED = 1,
194   DS_EXEC_TXN_FAILED = 2,
195 } execute_outcome_t;
196
197 static char *
198 __noit__strndup(const char *src, int len) {
199   int slen;
200   char *dst;
201   for(slen = 0; slen < len; slen++)
202     if(src[slen] == '\0') break;
203   dst = malloc(slen + 1);
204   memcpy(dst, src, slen);
205   dst[slen] = '\0';
206   return dst;
207 }
208 #define DECLARE_PARAM_STR(str, len) do { \
209   d->paramValues[d->nparams] = __noit__strndup(str, len); \
210   d->paramLengths[d->nparams] = len; \
211   d->paramFormats[d->nparams] = 0; \
212   d->paramAllocd[d->nparams] = 1; \
213   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
214     free(d->paramValues[d->nparams]); \
215     d->paramValues[d->nparams] = NULL; \
216     d->paramLengths[d->nparams] = 0; \
217     d->paramAllocd[d->nparams] = 0; \
218   } \
219   d->nparams++; \
220 } while(0)
221 #define DECLARE_PARAM_INT(i) do { \
222   int buffer__len; \
223   char buffer__[32]; \
224   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
225   buffer__len = strlen(buffer__); \
226   DECLARE_PARAM_STR(buffer__, buffer__len); \
227 } while(0)
228
229 #define PG_GET_STR_COL(dest, row, name) do { \
230   int colnum = PQfnumber(d->res, name); \
231   dest = NULL; \
232   if (colnum >= 0) \
233     dest = PQgetisnull(d->res, row, colnum) \
234          ? NULL : PQgetvalue(d->res, row, colnum); \
235 } while(0)
236
237 #define PG_EXEC(cmd) do { \
238   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
239                         (const char * const *)d->paramValues, \
240                         d->paramLengths, d->paramFormats, 0); \
241   d->rv = PQresultStatus(d->res); \
242   if(d->rv != PGRES_COMMAND_OK && \
243      d->rv != PGRES_TUPLES_OK) { \
244     noitL(noit_error, "stratcon datasource bad (%d): %s\n", \
245           d->rv, PQresultErrorMessage(d->res)); \
246     PQclear(d->res); \
247     goto bad_row; \
248   } \
249 } while(0)
250
251 #define PG_TM_EXEC(cmd, whence) do { \
252   time_t __w = whence; \
253   char cmdbuf[4096]; \
254   struct tm tbuf, *tm; \
255   tm = gmtime_r(&__w, &tbuf); \
256   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
257   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
258                         (const char * const *)d->paramValues, \
259                         d->paramLengths, d->paramFormats, 0); \
260   d->rv = PQresultStatus(d->res); \
261   if(d->rv != PGRES_COMMAND_OK && \
262      d->rv != PGRES_TUPLES_OK) { \
263     noitL(noit_error, "stratcon datasource bad (%d): %s\n", \
264           d->rv, PQresultErrorMessage(d->res)); \
265     PQclear(d->res); \
266     goto bad_row; \
267   } \
268 } while(0)
269
270 static int
271 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
272                                     struct timeval *now) {
273   conn_q *cq = closure;
274   ds_job_detail *d;
275   int i, row_count = 0, good = 0;
276   char buff[1024];
277
278   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
279   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
280
281   stratcon_database_connect(cq);
282   d = calloc(1, sizeof(*d));
283   GET_QUERY(check_loadall);
284   PG_EXEC(check_loadall);
285   row_count = PQntuples(d->res);
286  
287   for(i=0; i<row_count; i++) {
288     int rv;
289     int8_t family;
290     struct sockaddr *sin;
291     struct sockaddr_in sin4 = { .sin_family = AF_INET };
292     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
293     char *remote, *id, *target, *module, *name;
294     PG_GET_STR_COL(remote, i, "remote_address");
295     PG_GET_STR_COL(id, i, "id");
296     PG_GET_STR_COL(target, i, "target");
297     PG_GET_STR_COL(module, i, "module");
298     PG_GET_STR_COL(name, i, "name");
299     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
300
301     family = AF_INET;
302     sin = (struct sockaddr *)&sin4;
303     rv = inet_pton(family, remote, &sin4.sin_addr);
304     if(rv != 1) {
305       family = AF_INET6;
306       sin = (struct sockaddr *)&sin6;
307       rv = inet_pton(family, remote, &sin6.sin6_addr);
308       if(rv != 1) {
309         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
310         sin = NULL;
311       }
312     }
313
314     /* stratcon_iep_line_processor takes an allocated operand and frees it */
315     stratcon_iep_line_processor(DS_OP_INSERT, sin, strdup(buff));
316     good++;
317   }
318   noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count);
319  bad_row:
320   PQclear(d->res);
321   free(d);
322   return 0;
323 }
324 void
325 stratcon_datastore_iep_check_preload() {
326   eventer_t e;
327   conn_q *cq;
328   cq = __get_conn_q_for_remote(NULL);
329
330   e = eventer_alloc();
331   e->mask = EVENTER_ASYNCH;
332   e->callback = stratcon_datastore_asynch_drive_iep;
333   e->closure = cq;
334   eventer_add_asynch(cq->jobq, e);
335 }
336 execute_outcome_t
337 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) {
338   char *val;
339   int row_count;
340
341   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid);
342   GET_QUERY(check_find);
343   PG_EXEC(check_find);
344   row_count = PQntuples(d->res);
345   if(row_count != 1) goto bad_row;
346
347   /* Get the check uuid */
348   PG_GET_STR_COL(val, 0, "id");
349   if(!val) goto bad_row;
350   if(uuid_parse(val, d->rt->checkid)) goto bad_row;
351
352   /* Get the remote_address (which noit owns this) */
353   PG_GET_STR_COL(val, 0, "remote_address");
354   if(!val) goto bad_row;
355   d->rt->noit = strdup(val);
356
357   PQclear(d->res);
358   return DS_EXEC_SUCCESS;
359  bad_row:
360   return DS_EXEC_ROW_FAILED;
361 }
362 execute_outcome_t
363 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
364   time_t whence = 0;
365   int type, len;
366   char *final_buff;
367   uLong final_len, actual_final_len;;
368   char *token;
369
370   type = d->data[0];
371
372   /* Parse the log line, but only if we haven't already */
373   if(!d->nparams) {
374     char raddr[128];
375     char *scp, *ecp;
376
377     /* setup our remote address */
378     raddr[0] = '\0';
379     switch(r->sa_family) {
380       case AF_INET:
381         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
382                   raddr, sizeof(raddr));
383         break;
384       case AF_INET6:
385         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
386                   raddr, sizeof(raddr));
387         break;
388       default:
389         noitL(noit_error, "remote address of family %d\n", r->sa_family);
390     }
391  
392     scp = d->data;
393 #define PROCESS_NEXT_FIELD(t,l) do { \
394   if(!*scp) goto bad_row; \
395   ecp = strchr(scp, '\t'); \
396   if(!ecp) goto bad_row; \
397   token = scp; \
398   len = (ecp-scp); \
399   scp = ecp + 1; \
400 } while(0)
401 #define PROCESS_LAST_FIELD(t,l) do { \
402   if(!*scp) ecp = scp; \
403   else { \
404     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
405     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
406   } \
407   t = scp; \
408   l = (ecp-scp); \
409 } while(0)
410
411     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
412     switch(type) {
413       /* See noit_check_log.c for log description */
414       case 'n':
415         DECLARE_PARAM_STR(raddr, strlen(raddr));
416         DECLARE_PARAM_STR("noitd",5); /* node_type */
417         PROCESS_NEXT_FIELD(token,len);
418         whence = (time_t)strtoul(token, NULL, 10);
419         DECLARE_PARAM_STR(token,len); /* timestamp */
420
421         /* This is the expected uncompressed len */
422         PROCESS_NEXT_FIELD(token,len);
423         final_len = atoi(token);
424         final_buff = malloc(final_len);
425         if(!final_buff) goto bad_row;
426  
427         /* The last token is b64 endoded and compressed.
428          * we need to decode it, declare it and then free it.
429          */
430         PROCESS_LAST_FIELD(token, len);
431         /* We can in-place decode this */
432         len = noit_b64_decode((char *)token, len,
433                               (unsigned char *)token, len);
434         if(len <= 0) {
435           noitL(noit_error, "noitd config base64 decoding error.\n");
436           free(final_buff);
437           goto bad_row;
438         }
439         actual_final_len = final_len;
440         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
441                               (unsigned char *)token, len)) {
442           noitL(noit_error, "noitd config decompression failure.\n");
443           free(final_buff);
444           goto bad_row;
445         }
446         if(final_len != actual_final_len) {
447           noitL(noit_error, "noitd config decompression error.\n");
448           free(final_buff);
449           goto bad_row;
450         }
451         DECLARE_PARAM_STR(final_buff, final_len);
452         free(final_buff);
453         break;
454       case 'C':
455         DECLARE_PARAM_STR(raddr, strlen(raddr));
456         PROCESS_NEXT_FIELD(token,len);
457         DECLARE_PARAM_STR(token,len); /* timestamp */
458         whence = (time_t)strtoul(token, NULL, 10);
459         PROCESS_NEXT_FIELD(token, len);
460         DECLARE_PARAM_STR(token,len); /* uuid */
461         PROCESS_NEXT_FIELD(token, len);
462         DECLARE_PARAM_STR(token,len); /* target */
463         PROCESS_NEXT_FIELD(token, len);
464         DECLARE_PARAM_STR(token,len); /* module */
465         PROCESS_LAST_FIELD(token, len);
466         DECLARE_PARAM_STR(token,len); /* name */
467         break;
468       case 'M':
469         PROCESS_NEXT_FIELD(token,len);
470         DECLARE_PARAM_STR(token,len); /* timestamp */
471         whence = (time_t)strtoul(token, NULL, 10);
472         PROCESS_NEXT_FIELD(token, len);
473         DECLARE_PARAM_STR(token,len); /* uuid */
474         PROCESS_NEXT_FIELD(token, len);
475         DECLARE_PARAM_STR(token,len); /* name */
476         PROCESS_NEXT_FIELD(token,len);
477         d->metric_type = *token;
478         PROCESS_LAST_FIELD(token,len);
479         DECLARE_PARAM_STR(token,len); /* value */
480         break;
481       case 'S':
482         PROCESS_NEXT_FIELD(token,len);
483         DECLARE_PARAM_STR(token,len); /* timestamp */
484         whence = (time_t)strtoul(token, NULL, 10);
485         PROCESS_NEXT_FIELD(token, len);
486         DECLARE_PARAM_STR(token,len); /* uuid */
487         PROCESS_NEXT_FIELD(token, len);
488         DECLARE_PARAM_STR(token,len); /* state */
489         PROCESS_NEXT_FIELD(token, len);
490         DECLARE_PARAM_STR(token,len); /* availability */
491         PROCESS_NEXT_FIELD(token, len);
492         DECLARE_PARAM_STR(token,len); /* duration */
493         PROCESS_LAST_FIELD(token,len);
494         DECLARE_PARAM_STR(token,len); /* status */
495         break;
496       default:
497         goto bad_row;
498     }
499
500   }
501
502   /* Now execute the query */
503   switch(type) {
504     case 'n':
505       GET_QUERY(config_insert);
506       PG_EXEC(config_insert);
507       PQclear(d->res);
508       break;
509     case 'C':
510       GET_QUERY(check_insert);
511       PG_EXEC(check_insert);
512       PQclear(d->res);
513       break;
514     case 'S':
515       GET_QUERY(status_insert);
516       PG_TM_EXEC(status_insert, whence);
517       PQclear(d->res);
518       break;
519     case 'M':
520       switch(d->metric_type) {
521         case METRIC_INT32:
522         case METRIC_UINT32:
523         case METRIC_INT64:
524         case METRIC_UINT64:
525         case METRIC_DOUBLE:
526           GET_QUERY(metric_insert_numeric);
527           PG_TM_EXEC(metric_insert_numeric, whence);
528           PQclear(d->res);
529           break;
530         case METRIC_STRING:
531           GET_QUERY(metric_insert_text);
532           PG_TM_EXEC(metric_insert_text, whence);
533           PQclear(d->res);
534           break;
535         default:
536           goto bad_row;
537       }
538       break;
539     default:
540       /* should never get here */
541       goto bad_row;
542   }
543   return DS_EXEC_SUCCESS;
544  bad_row:
545   return DS_EXEC_ROW_FAILED;
546 }
547 static int
548 stratcon_database_connect(conn_q *cq) {
549   char dsn[512];
550   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
551   const char *k, *v;
552   int klen;
553   noit_hash_table *t;
554
555   dsn[0] = '\0';
556   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
557   while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
558     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
559     strlcat(dsn, k, sizeof(dsn));
560     strlcat(dsn, "=", sizeof(dsn));
561     strlcat(dsn, v, sizeof(dsn));
562   }
563   noit_hash_destroy(t, free, free);
564   free(t);
565
566   if(cq->dbh) {
567     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
568     PQreset(cq->dbh);
569     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
570     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
571           dsn, PQerrorMessage(cq->dbh));
572     return -1;
573   }
574
575   cq->dbh = PQconnectdb(dsn);
576   if(!cq->dbh) return -1;
577   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
578   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
579         dsn, PQerrorMessage(cq->dbh));
580   return -1;
581 }
582 static int
583 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
584                                 const char *name) {
585   int rv = -1;
586   PGresult *res;
587   char cmd[128];
588   strlcpy(cmd, p, sizeof(cmd));
589   strlcat(cmd, name, sizeof(cmd));
590   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
591   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
592   PQclear(res);
593   return rv;
594 }
595 static int
596 stratcon_datastore_do(conn_q *cq, const char *cmd) {
597   PGresult *res;
598   int rv = -1;
599   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
600   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
601   PQclear(res);
602   return rv;
603 }
604 #define BUSTED(cq) do { \
605   PQfinish((cq)->dbh); \
606   (cq)->dbh = NULL; \
607   goto full_monty; \
608 } while(0)
609 #define SAVEPOINT(name) do { \
610   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
611   last_sp = current; \
612 } while(0)
613 #define ROLLBACK_TO_SAVEPOINT(name) do { \
614   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
615     BUSTED(cq); \
616   last_sp = NULL; \
617 } while(0)
618 #define RELEASE_SAVEPOINT(name) do { \
619   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
620     BUSTED(cq); \
621   last_sp = NULL; \
622 } while(0)
623 int
624 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
625                                  struct timeval *now) {
626   conn_q *cq = closure;
627   ds_job_detail *current, *next;
628   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
629
630   if(!cq->head) return 0;
631
632   stratcon_database_connect(cq);
633
634   current = cq->head;
635   while(current) {
636     if(current->rt) {
637       next = current->next;
638       stratcon_datastore_find(cq, current);
639       current = next;
640     }
641     else if(current->completion_event) {
642       next = current->next;
643       eventer_add(current->completion_event);
644       current = next;
645       __remove_until(cq, current);
646     }
647     else current = current->next;
648   }
649   return 0;
650 }
651 int
652 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
653                                   struct timeval *now) {
654   int i;
655   conn_q *cq = closure;
656   ds_job_detail *current, *last_sp;
657   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
658   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
659   if(!cq->head) return 0;
660
661  full_monty:
662   /* Make sure we have a connection */
663   i = 1;
664   while(stratcon_database_connect(cq)) {
665     noitL(noit_error, "Error connecting to database\n");
666     sleep(i);
667     i *= 2;
668     i = MIN(i, 16);
669   }
670
671   current = cq->head;
672   last_sp = NULL;
673   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
674   while(current) {
675     execute_outcome_t rv;
676     if(current->data) {
677       if(!last_sp) SAVEPOINT("batch");
678  
679       if(current->problematic) {
680         noitL(noit_error, "[%s] Failed noit line: %s", cq->jobq->queue_name, current->data);
681         RELEASE_SAVEPOINT("batch");
682         current = current->next;
683         continue;
684       }
685       rv = stratcon_datastore_execute(cq, cq->remote, current);
686       switch(rv) {
687         case DS_EXEC_SUCCESS:
688           current = current->next;
689           break;
690         case DS_EXEC_ROW_FAILED:
691           /* rollback to savepoint, mark this record as bad and start again */
692           current->problematic = 1;
693           current = last_sp;
694           ROLLBACK_TO_SAVEPOINT("batch");
695           break;
696         case DS_EXEC_TXN_FAILED:
697           BUSTED(cq);
698       }
699     }
700     if(current->completion_event) {
701       if(last_sp) RELEASE_SAVEPOINT("batch");
702       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
703       eventer_add(current->completion_event);
704       current = current->next;
705       __remove_until(cq, current);
706     }
707   }
708   return 0;
709 }
710 void
711 stratcon_datastore_push(stratcon_datastore_op_t op,
712                         struct sockaddr *remote, void *operand) {
713   conn_q *cq;
714   eventer_t e;
715   ds_job_detail *dsjd;
716   struct datastore_onlooker_list *nnode;
717
718   for(nnode = onlookers; nnode; nnode = nnode->next)
719     nnode->dispatch(op,remote,operand);
720
721   cq = __get_conn_q_for_remote(remote);
722   dsjd = calloc(1, sizeof(*dsjd));
723   switch(op) {
724     case DS_OP_FIND:
725       dsjd->rt = operand;
726       __append(cq, dsjd);
727       break;
728     case DS_OP_INSERT:
729       dsjd->data = operand;
730       __append(cq, dsjd);
731       break;
732     case DS_OP_FIND_COMPLETE:
733     case DS_OP_CHKPT:
734       dsjd->completion_event = operand;
735       __append(cq,dsjd);
736       e = eventer_alloc();
737       e->mask = EVENTER_ASYNCH;
738       if(op == DS_OP_FIND_COMPLETE)
739         e->callback = stratcon_datastore_asynch_lookup;
740       else if(op == DS_OP_CHKPT)
741         e->callback = stratcon_datastore_asynch_execute;
742       e->closure = cq;
743       eventer_add_asynch(cq->jobq, e);
744       break;
745   }
746 }
747
748 int
749 stratcon_datastore_saveconfig(void *unused) {
750   int rv = -1;
751   conn_q _cq = { 0 }, *cq = &_cq;
752   char *buff;
753   ds_single_detail _d = { 0 }, *d = &_d;
754
755   if(stratcon_database_connect(cq) == 0) {
756     char time_as_str[20];
757     size_t len;
758     buff = noit_conf_xml_in_mem(&len);
759     if(!buff) goto bad_row;
760
761     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
762     DECLARE_PARAM_STR("0.0.0.0", 7);
763     DECLARE_PARAM_STR("stratcond", 9);
764     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
765     DECLARE_PARAM_STR(buff, len);
766     free(buff);
767
768     GET_QUERY(config_insert);
769     PG_EXEC(config_insert);
770     PQclear(d->res);
771     rv = 0;
772
773     bad_row:
774       free_params(d);
775   }
776   if(cq->dbh) PQfinish(cq->dbh);
777   return rv;
778 }
779
780 void
781 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
782                                                struct sockaddr *, void *)) {
783   struct datastore_onlooker_list *nnode;
784   nnode = calloc(1, sizeof(*nnode));
785   nnode->dispatch = f;
786   nnode->next = onlookers;
787   while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (void *)nnode->next)
788     nnode->next = onlookers;
789 }
Note: See TracBrowser for help on using the browser.