root/src/stratcon_datastore.c

Revision 8fd52de5ff1b33c4e303bd9c73fcd27898353757, 21.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fixes #134

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "stratcon_datastore.h"
38 #include "stratcon_realtime_http.h"
39 #include "stratcon_iep.h"
40 #include "noit_conf.h"
41 #include "noit_check.h"
42 #include <unistd.h>
43 #include <netinet/in.h>
44 #include <sys/un.h>
45 #include <arpa/inet.h>
46 #include <libpq-fe.h>
47 #include <zlib.h>
48
49 static char *check_loadall = NULL;
50 static const char *check_loadall_conf = "/stratcon/database/statements/allchecks";
51 static char *check_find = NULL;
52 static const char *check_find_conf = "/stratcon/database/statements/findcheck";
53 static char *check_insert = NULL;
54 static const char *check_insert_conf = "/stratcon/database/statements/check";
55 static char *status_insert = NULL;
56 static const char *status_insert_conf = "/stratcon/database/statements/status";
57 static char *metric_insert_numeric = NULL;
58 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
59 static char *metric_insert_text = NULL;
60 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
61 static char *config_insert = NULL;
62 static const char *config_insert_conf = "/stratcon/database/statements/config";
63
64 static struct datastore_onlooker_list {
65   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, void *);
66   struct datastore_onlooker_list *next;
67 } *onlookers = NULL;
68
69 #define GET_QUERY(a) do { \
70   if(a == NULL) \
71     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
72       goto bad_row; \
73 } while(0)
74
75 #define MAX_PARAMS 8
76 #define POSTGRES_PARTS \
77   PGresult *res; \
78   int rv; \
79   int nparams; \
80   int metric_type; \
81   char *paramValues[MAX_PARAMS]; \
82   int paramLengths[MAX_PARAMS]; \
83   int paramFormats[MAX_PARAMS]; \
84   int paramAllocd[MAX_PARAMS];
85
86 typedef struct ds_single_detail {
87   POSTGRES_PARTS
88 } ds_single_detail;
89 typedef struct ds_job_detail {
90   /* Postgres specific stuff */
91   POSTGRES_PARTS
92
93   char *data;  /* The raw string, NULL means the stream is done -- commit. */
94   struct realtime_tracker *rt;
95
96   int problematic;
97   eventer_t completion_event; /* This event should be registered if non NULL */
98   struct ds_job_detail *next;
99 } ds_job_detail;
100
101 typedef struct {
102   struct sockaddr *remote;
103   eventer_jobq_t  *jobq;
104   /* Postgres specific stuff */
105   PGconn          *dbh;
106   ds_job_detail   *head;
107   ds_job_detail   *tail;
108 } conn_q;
109
110 static int stratcon_database_connect(conn_q *cq);
111
112 static void
113 free_params(ds_single_detail *d) {
114   int i;
115   for(i=0; i<d->nparams; i++)
116     if(d->paramAllocd[i] && d->paramValues[i])
117       free(d->paramValues[i]);
118 }
119
120 static void
121 __append(conn_q *q, ds_job_detail *d) {
122   d->next = NULL;
123   if(!q->head) q->head = q->tail = d;
124   else {
125     q->tail->next = d;
126     q->tail = d;
127   }
128 }
129 static void
130 __remove_until(conn_q *q, ds_job_detail *d) {
131   ds_job_detail *next;
132   while(q->head && q->head != d) {
133     next = q->head;
134     q->head = q->head->next;
135     free_params((ds_single_detail *)next);
136     if(next->data) free(next->data);
137     free(next);
138   }
139   if(!q->head) q->tail = NULL;
140 }
141
142 noit_hash_table ds_conns;
143
144 conn_q *
145 __get_conn_q_for_remote(struct sockaddr *remote) {
146   void *vcq;
147   conn_q *cq;
148   char queue_name[128] = "datastore_";
149   static const char __zeros[4] = { 0 };
150   int len = 0;
151   if(remote) {
152     switch(remote->sa_family) {
153       case AF_INET:
154         len = sizeof(struct sockaddr_in);
155         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
156                   queue_name + strlen("datastore_"), len);
157         break;
158       case AF_INET6:
159        len = sizeof(struct sockaddr_in6);
160         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
161                   queue_name + strlen("datastore_"), len);
162        break;
163       case AF_UNIX:
164         len = SUN_LEN(((struct sockaddr_un *)remote));
165         snprintf(queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path);
166         break;
167       default: return NULL;
168     }
169   }
170   else {
171     /* This is a dummy connection */
172     remote = (struct sockaddr *)__zeros;
173     snprintf(queue_name, sizeof(queue_name), "datastore_default");
174     len = 4;
175   }
176   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, &vcq))
177     return vcq;
178   cq = calloc(1, sizeof(*cq));
179   cq->remote = malloc(len);
180   memcpy(cq->remote, remote, len);
181   cq->jobq = calloc(1, sizeof(*cq->jobq));
182   eventer_jobq_init(cq->jobq, queue_name);
183   cq->jobq->backq = eventer_default_backq();
184   /* Add one thread */
185   eventer_jobq_increase_concurrency(cq->jobq);
186   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
187   return cq;
188 }
189
190 typedef enum {
191   DS_EXEC_SUCCESS = 0,
192   DS_EXEC_ROW_FAILED = 1,
193   DS_EXEC_TXN_FAILED = 2,
194 } execute_outcome_t;
195
196 static char *
197 __noit__strndup(const char *src, int len) {
198   int slen;
199   char *dst;
200   for(slen = 0; slen < len; slen++)
201     if(src[slen] == '\0') break;
202   dst = malloc(slen + 1);
203   memcpy(dst, src, slen);
204   dst[slen] = '\0';
205   return dst;
206 }
207 #define DECLARE_PARAM_STR(str, len) do { \
208   d->paramValues[d->nparams] = __noit__strndup(str, len); \
209   d->paramLengths[d->nparams] = len; \
210   d->paramFormats[d->nparams] = 0; \
211   d->paramAllocd[d->nparams] = 1; \
212   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
213     free(d->paramValues[d->nparams]); \
214     d->paramValues[d->nparams] = NULL; \
215     d->paramLengths[d->nparams] = 0; \
216     d->paramAllocd[d->nparams] = 0; \
217   } \
218   d->nparams++; \
219 } while(0)
220 #define DECLARE_PARAM_INT(i) do { \
221   int buffer__len; \
222   char buffer__[32]; \
223   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
224   buffer__len = strlen(buffer__); \
225   DECLARE_PARAM_STR(buffer__, buffer__len); \
226 } while(0)
227
228 #define PG_GET_STR_COL(dest, row, name) do { \
229   int colnum = PQfnumber(d->res, name); \
230   dest = NULL; \
231   if (colnum >= 0) \
232     dest = PQgetisnull(d->res, row, colnum) \
233          ? NULL : PQgetvalue(d->res, row, colnum); \
234 } while(0)
235
236 #define PG_EXEC(cmd) do { \
237   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
238                         (const char * const *)d->paramValues, \
239                         d->paramLengths, d->paramFormats, 0); \
240   d->rv = PQresultStatus(d->res); \
241   if(d->rv != PGRES_COMMAND_OK && \
242      d->rv != PGRES_TUPLES_OK) { \
243     noitL(noit_error, "stratcon datasource bad (%d): %s\n", \
244           d->rv, PQresultErrorMessage(d->res)); \
245     PQclear(d->res); \
246     goto bad_row; \
247   } \
248 } while(0)
249
250 static int
251 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
252                                     struct timeval *now) {
253   conn_q *cq = closure;
254   ds_job_detail *d;
255   int i, row_count = 0, good = 0;
256   char buff[1024];
257
258   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
259   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
260
261   stratcon_database_connect(cq);
262   d = calloc(1, sizeof(*d));
263   GET_QUERY(check_loadall);
264   PG_EXEC(check_loadall);
265   row_count = PQntuples(d->res);
266  
267   for(i=0; i<row_count; i++) {
268     char *remote, *id, *target, *module, *name;
269     PG_GET_STR_COL(remote, i, "remote_address");
270     PG_GET_STR_COL(id, i, "id");
271     PG_GET_STR_COL(target, i, "target");
272     PG_GET_STR_COL(module, i, "module");
273     PG_GET_STR_COL(name, i, "name");
274     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
275     stratcon_iep_line_processor(DS_OP_INSERT, NULL, buff);
276     good++;
277   }
278   noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count);
279  bad_row:
280   PQclear(d->res);
281   free(d);
282   return 0;
283 }
284 void
285 stratcon_datastore_iep_check_preload() {
286   eventer_t e;
287   conn_q *cq;
288   cq = __get_conn_q_for_remote(NULL);
289
290   e = eventer_alloc();
291   e->mask = EVENTER_ASYNCH;
292   e->callback = stratcon_datastore_asynch_drive_iep;
293   e->closure = cq;
294   eventer_add_asynch(cq->jobq, e);
295 }
296 execute_outcome_t
297 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) {
298   char *val;
299   int row_count;
300
301   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid);
302   GET_QUERY(check_find);
303   PG_EXEC(check_find);
304   row_count = PQntuples(d->res);
305   if(row_count != 1) goto bad_row;
306
307   /* Get the check uuid */
308   PG_GET_STR_COL(val, 0, "id");
309   if(!val) goto bad_row;
310   if(uuid_parse(val, d->rt->checkid)) goto bad_row;
311
312   /* Get the remote_address (which noit owns this) */
313   PG_GET_STR_COL(val, 0, "remote_address");
314   if(!val) goto bad_row;
315   d->rt->noit = strdup(val);
316
317   PQclear(d->res);
318   return DS_EXEC_SUCCESS;
319  bad_row:
320   return DS_EXEC_ROW_FAILED;
321 }
322 execute_outcome_t
323 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
324   int type, len;
325   char *final_buff;
326   uLong final_len, actual_final_len;;
327   char *token;
328
329   type = d->data[0];
330
331   /* Parse the log line, but only if we haven't already */
332   if(!d->nparams) {
333     char raddr[128];
334     char *scp, *ecp;
335
336     /* setup our remote address */
337     raddr[0] = '\0';
338     switch(r->sa_family) {
339       case AF_INET:
340         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
341                   raddr, sizeof(raddr));
342         break;
343       case AF_INET6:
344         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
345                   raddr, sizeof(raddr));
346         break;
347       default:
348         noitL(noit_error, "remote address of family %d\n", r->sa_family);
349     }
350  
351     scp = d->data;
352 #define PROCESS_NEXT_FIELD(t,l) do { \
353   if(!*scp) goto bad_row; \
354   ecp = strchr(scp, '\t'); \
355   if(!ecp) goto bad_row; \
356   token = scp; \
357   len = (ecp-scp); \
358   scp = ecp + 1; \
359 } while(0)
360 #define PROCESS_LAST_FIELD(t,l) do { \
361   if(!*scp) ecp = scp; \
362   else { \
363     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
364     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
365   } \
366   t = scp; \
367   l = (ecp-scp); \
368 } while(0)
369
370     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
371     switch(type) {
372       /* See noit_check_log.c for log description */
373       case 'n':
374         DECLARE_PARAM_STR(raddr, strlen(raddr));
375         DECLARE_PARAM_STR("noitd",5); /* node_type */
376         PROCESS_NEXT_FIELD(token,len);
377         DECLARE_PARAM_STR(token,len); /* timestamp */
378
379         /* This is the expected uncompressed len */
380         PROCESS_NEXT_FIELD(token,len);
381         final_len = atoi(token);
382         final_buff = malloc(final_len);
383         if(!final_buff) goto bad_row;
384  
385         /* The last token is b64 endoded and compressed.
386          * we need to decode it, declare it and then free it.
387          */
388         PROCESS_LAST_FIELD(token, len);
389         /* We can in-place decode this */
390         len = noit_b64_decode((char *)token, len,
391                               (unsigned char *)token, len);
392         if(len <= 0) {
393           noitL(noit_error, "noitd config base64 decoding error.\n");
394           free(final_buff);
395           goto bad_row;
396         }
397         actual_final_len = final_len;
398         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
399                               (unsigned char *)token, len)) {
400           noitL(noit_error, "noitd config decompression failure.\n");
401           free(final_buff);
402           goto bad_row;
403         }
404         if(final_len != actual_final_len) {
405           noitL(noit_error, "noitd config decompression error.\n");
406           free(final_buff);
407           goto bad_row;
408         }
409         DECLARE_PARAM_STR(final_buff, final_len);
410         free(final_buff);
411         break;
412       case 'C':
413         DECLARE_PARAM_STR(raddr, strlen(raddr));
414         PROCESS_NEXT_FIELD(token,len);
415         DECLARE_PARAM_STR(token,len); /* timestamp */
416         PROCESS_NEXT_FIELD(token, len);
417         DECLARE_PARAM_STR(token,len); /* uuid */
418         PROCESS_NEXT_FIELD(token, len);
419         DECLARE_PARAM_STR(token,len); /* target */
420         PROCESS_NEXT_FIELD(token, len);
421         DECLARE_PARAM_STR(token,len); /* module */
422         PROCESS_LAST_FIELD(token, len);
423         DECLARE_PARAM_STR(token,len); /* name */
424         break;
425       case 'M':
426         PROCESS_NEXT_FIELD(token,len);
427         DECLARE_PARAM_STR(token,len); /* timestamp */
428         PROCESS_NEXT_FIELD(token, len);
429         DECLARE_PARAM_STR(token,len); /* uuid */
430         PROCESS_NEXT_FIELD(token, len);
431         DECLARE_PARAM_STR(token,len); /* name */
432         PROCESS_NEXT_FIELD(token,len);
433         d->metric_type = *token;
434         PROCESS_LAST_FIELD(token,len);
435         DECLARE_PARAM_STR(token,len); /* value */
436         break;
437       case 'S':
438         PROCESS_NEXT_FIELD(token,len);
439         DECLARE_PARAM_STR(token,len); /* timestamp */
440         PROCESS_NEXT_FIELD(token, len);
441         DECLARE_PARAM_STR(token,len); /* uuid */
442         PROCESS_NEXT_FIELD(token, len);
443         DECLARE_PARAM_STR(token,len); /* state */
444         PROCESS_NEXT_FIELD(token, len);
445         DECLARE_PARAM_STR(token,len); /* availability */
446         PROCESS_NEXT_FIELD(token, len);
447         DECLARE_PARAM_STR(token,len); /* duration */
448         PROCESS_LAST_FIELD(token,len);
449         DECLARE_PARAM_STR(token,len); /* status */
450         break;
451       default:
452         goto bad_row;
453     }
454
455   }
456
457   /* Now execute the query */
458   switch(type) {
459     case 'n':
460       GET_QUERY(config_insert);
461       PG_EXEC(config_insert);
462       PQclear(d->res);
463       break;
464     case 'C':
465       GET_QUERY(check_insert);
466       PG_EXEC(check_insert);
467       PQclear(d->res);
468       break;
469     case 'S':
470       GET_QUERY(status_insert);
471       PG_EXEC(status_insert);
472       PQclear(d->res);
473       break;
474     case 'M':
475       switch(d->metric_type) {
476         case METRIC_INT32:
477         case METRIC_UINT32:
478         case METRIC_INT64:
479         case METRIC_UINT64:
480         case METRIC_DOUBLE:
481           GET_QUERY(metric_insert_numeric);
482           PG_EXEC(metric_insert_numeric);
483           PQclear(d->res);
484           break;
485         case METRIC_STRING:
486           GET_QUERY(metric_insert_text);
487           PG_EXEC(metric_insert_text);
488           PQclear(d->res);
489           break;
490         default:
491           goto bad_row;
492       }
493       break;
494     default:
495       /* should never get here */
496       goto bad_row;
497   }
498   return DS_EXEC_SUCCESS;
499  bad_row:
500   return DS_EXEC_ROW_FAILED;
501 }
502 static int
503 stratcon_database_connect(conn_q *cq) {
504   char dsn[512];
505   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
506   const char *k, *v;
507   int klen;
508   noit_hash_table *t;
509
510   dsn[0] = '\0';
511   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
512   while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
513     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
514     strlcat(dsn, k, sizeof(dsn));
515     strlcat(dsn, "=", sizeof(dsn));
516     strlcat(dsn, v, sizeof(dsn));
517   }
518   noit_hash_destroy(t, free, free);
519   free(t);
520
521   if(cq->dbh) {
522     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
523     PQreset(cq->dbh);
524     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
525     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
526           dsn, PQerrorMessage(cq->dbh));
527     return -1;
528   }
529
530   cq->dbh = PQconnectdb(dsn);
531   if(!cq->dbh) return -1;
532   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
533   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
534         dsn, PQerrorMessage(cq->dbh));
535   return -1;
536 }
537 static int
538 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
539                                 const char *name) {
540   int rv = -1;
541   PGresult *res;
542   char cmd[128];
543   strlcpy(cmd, p, sizeof(cmd));
544   strlcat(cmd, name, sizeof(cmd));
545   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
546   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
547   PQclear(res);
548   return rv;
549 }
550 static int
551 stratcon_datastore_do(conn_q *cq, const char *cmd) {
552   PGresult *res;
553   int rv = -1;
554   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
555   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
556   PQclear(res);
557   return rv;
558 }
559 #define BUSTED(cq) do { \
560   PQfinish((cq)->dbh); \
561   (cq)->dbh = NULL; \
562   goto full_monty; \
563 } while(0)
564 #define SAVEPOINT(name) do { \
565   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
566   last_sp = current; \
567 } while(0)
568 #define ROLLBACK_TO_SAVEPOINT(name) do { \
569   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
570     BUSTED(cq); \
571   last_sp = NULL; \
572 } while(0)
573 #define RELEASE_SAVEPOINT(name) do { \
574   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
575     BUSTED(cq); \
576   last_sp = NULL; \
577 } while(0)
578 int
579 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
580                                  struct timeval *now) {
581   conn_q *cq = closure;
582   ds_job_detail *current, *next;
583   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
584
585   if(!cq->head) return 0;
586
587   stratcon_database_connect(cq);
588
589   current = cq->head;
590   while(current) {
591     if(current->rt) {
592       next = current->next;
593       stratcon_datastore_find(cq, current);
594       current = next;
595     }
596     else if(current->completion_event) {
597       next = current->next;
598       eventer_add(current->completion_event);
599       current = next;
600       __remove_until(cq, current);
601     }
602     else current = current->next;
603   }
604   return 0;
605 }
606 int
607 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
608                                   struct timeval *now) {
609   int i;
610   conn_q *cq = closure;
611   ds_job_detail *current, *last_sp;
612   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
613   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
614   if(!cq->head) return 0;
615
616  full_monty:
617   /* Make sure we have a connection */
618   i = 1;
619   while(stratcon_database_connect(cq)) {
620     noitL(noit_error, "Error connecting to database\n");
621     sleep(i);
622     i *= 2;
623     i = MIN(i, 16);
624   }
625
626   current = cq->head;
627   last_sp = NULL;
628   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
629   while(current) {
630     execute_outcome_t rv;
631     if(current->data) {
632       if(!last_sp) SAVEPOINT("batch");
633  
634       if(current->problematic) {
635         noitL(noit_error, "[%s] Failed noit line: %s", cq->jobq->queue_name, current->data);
636         RELEASE_SAVEPOINT("batch");
637         current = current->next;
638         continue;
639       }
640       rv = stratcon_datastore_execute(cq, cq->remote, current);
641       switch(rv) {
642         case DS_EXEC_SUCCESS:
643           current = current->next;
644           break;
645         case DS_EXEC_ROW_FAILED:
646           /* rollback to savepoint, mark this record as bad and start again */
647           current->problematic = 1;
648           current = last_sp;
649           ROLLBACK_TO_SAVEPOINT("batch");
650           break;
651         case DS_EXEC_TXN_FAILED:
652           BUSTED(cq);
653       }
654     }
655     if(current->completion_event) {
656       if(last_sp) RELEASE_SAVEPOINT("batch");
657       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
658       eventer_add(current->completion_event);
659       current = current->next;
660       __remove_until(cq, current);
661     }
662   }
663   return 0;
664 }
665 void
666 stratcon_datastore_push(stratcon_datastore_op_t op,
667                         struct sockaddr *remote, void *operand) {
668   conn_q *cq;
669   eventer_t e;
670   ds_job_detail *dsjd;
671   struct datastore_onlooker_list *nnode;
672
673   for(nnode = onlookers; nnode; nnode = nnode->next)
674     nnode->dispatch(op,remote,operand);
675
676   cq = __get_conn_q_for_remote(remote);
677   dsjd = calloc(1, sizeof(*dsjd));
678   switch(op) {
679     case DS_OP_FIND:
680       dsjd->rt = operand;
681       __append(cq, dsjd);
682       break;
683     case DS_OP_INSERT:
684       dsjd->data = operand;
685       __append(cq, dsjd);
686       break;
687     case DS_OP_FIND_COMPLETE:
688     case DS_OP_CHKPT:
689       dsjd->completion_event = operand;
690       __append(cq,dsjd);
691       e = eventer_alloc();
692       e->mask = EVENTER_ASYNCH;
693       if(op == DS_OP_FIND_COMPLETE)
694         e->callback = stratcon_datastore_asynch_lookup;
695       else if(op == DS_OP_CHKPT)
696         e->callback = stratcon_datastore_asynch_execute;
697       e->closure = cq;
698       eventer_add_asynch(cq->jobq, e);
699       break;
700   }
701 }
702
703 int
704 stratcon_datastore_saveconfig(void *unused) {
705   int rv = -1;
706   conn_q _cq = { 0 }, *cq = &_cq;
707   char *buff;
708   ds_single_detail _d = { 0 }, *d = &_d;
709
710   if(stratcon_database_connect(cq) == 0) {
711     char time_as_str[20];
712     size_t len;
713     buff = noit_conf_xml_in_mem(&len);
714     if(!buff) goto bad_row;
715
716     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
717     DECLARE_PARAM_STR("0.0.0.0", 7);
718     DECLARE_PARAM_STR("stratcond", 9);
719     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
720     DECLARE_PARAM_STR(buff, len);
721     free(buff);
722
723     GET_QUERY(config_insert);
724     PG_EXEC(config_insert);
725     PQclear(d->res);
726     rv = 0;
727
728     bad_row:
729       free_params(d);
730   }
731   if(cq->dbh) PQfinish(cq->dbh);
732   return rv;
733 }
734
735 void
736 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
737                                                struct sockaddr *, void *)) {
738   struct datastore_onlooker_list *nnode;
739   nnode = calloc(1, sizeof(*nnode));
740   nnode->dispatch = f;
741   nnode->next = onlookers;
742   while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (void *)nnode->next)
743     nnode->next = onlookers;
744 }
Note: See TracBrowser for help on using the browser.