root/src/stratcon_datastore.c

Revision dbaf64bc41991ad1a538a17c4d9cfc5bab31e0a8, 17.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

refs #78, cleanup

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "utils/noit_b64.h"
10 #include "stratcon_datastore.h"
11 #include "stratcon_realtime_http.h"
12 #include "noit_conf.h"
13 #include "noit_check.h"
14 #include <unistd.h>
15 #include <netinet/in.h>
16 #include <sys/un.h>
17 #include <arpa/inet.h>
18 #include <libpq-fe.h>
19 #include <zlib.h>
20
21 static char *check_find = NULL;
22 static const char *check_find_conf = "/stratcon/database/statements/findcheck";
23 static char *check_insert = NULL;
24 static const char *check_insert_conf = "/stratcon/database/statements/check";
25 static char *status_insert = NULL;
26 static const char *status_insert_conf = "/stratcon/database/statements/status";
27 static char *metric_insert_numeric = NULL;
28 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
29 static char *metric_insert_text = NULL;
30 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
31 static char *config_insert = NULL;
32 static const char *config_insert_conf = "/stratcon/database/statements/config";
33
34 #define GET_QUERY(a) do { \
35   if(a == NULL) \
36     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
37       goto bad_row; \
38 } while(0)
39
40 #define MAX_PARAMS 8
41 #define POSTGRES_PARTS \
42   PGresult *res; \
43   int rv; \
44   int nparams; \
45   int metric_type; \
46   char *paramValues[MAX_PARAMS]; \
47   int paramLengths[MAX_PARAMS]; \
48   int paramFormats[MAX_PARAMS]; \
49   int paramAllocd[MAX_PARAMS];
50
51 typedef struct ds_single_detail {
52   POSTGRES_PARTS
53 } ds_single_detail;
54 typedef struct ds_job_detail {
55   /* Postgres specific stuff */
56   POSTGRES_PARTS
57
58   char *data;  /* The raw string, NULL means the stream is done -- commit. */
59   struct realtime_tracker *rt;
60
61   int problematic;
62   eventer_t completion_event; /* This event should be registered if non NULL */
63   struct ds_job_detail *next;
64 } ds_job_detail;
65
66 typedef struct {
67   struct sockaddr *remote;
68   eventer_jobq_t  *jobq;
69   /* Postgres specific stuff */
70   PGconn          *dbh;
71   ds_job_detail   *head;
72   ds_job_detail   *tail;
73 } conn_q;
74
75 static void
76 free_params(ds_single_detail *d) {
77   int i;
78   for(i=0; i<d->nparams; i++)
79     if(d->paramAllocd[i] && d->paramValues[i])
80       free(d->paramValues[i]);
81 }
82
83 static void
84 __append(conn_q *q, ds_job_detail *d) {
85   d->next = NULL;
86   if(!q->head) q->head = q->tail = d;
87   else {
88     q->tail->next = d;
89     q->tail = d;
90   }
91 }
92 static void
93 __remove_until(conn_q *q, ds_job_detail *d) {
94   ds_job_detail *next;
95   while(q->head && q->head != d) {
96     next = q->head;
97     q->head = q->head->next;
98     free_params((ds_single_detail *)next);
99     if(next->data) free(next->data);
100     free(next);
101   }
102   if(!q->head) q->tail = NULL;
103 }
104
105 noit_hash_table ds_conns;
106
107 conn_q *
108 __get_conn_q_for_remote(struct sockaddr *remote) {
109   conn_q *cq;
110   char queue_name[128] = "datastore_";
111   static const char __zeros[4] = { 0 };
112   int len = 0;
113   if(remote) {
114     switch(remote->sa_family) {
115       case AF_INET:
116         len = sizeof(struct sockaddr_in);
117         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
118                   queue_name + strlen("datastore_"), len);
119         break;
120       case AF_INET6:
121        len = sizeof(struct sockaddr_in6);
122         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
123                   queue_name + strlen("datastore_"), len);
124        break;
125       case AF_UNIX:
126         len = SUN_LEN(((struct sockaddr_un *)remote));
127         snprintf(queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path);
128         break;
129       default: return NULL;
130     }
131   }
132   else {
133     /* This is a dummy connection */
134     remote = (struct sockaddr *)__zeros;
135     snprintf(queue_name, sizeof(queue_name), "datastore_default");
136     len = 4;
137   }
138   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq))
139     return cq;
140   cq = calloc(1, sizeof(*cq));
141   cq->remote = malloc(len);
142   memcpy(cq->remote, remote, len);
143   cq->jobq = calloc(1, sizeof(*cq->jobq));
144   eventer_jobq_init(cq->jobq, queue_name);
145   cq->jobq->backq = eventer_default_backq();
146   /* Add one thread */
147   eventer_jobq_increase_concurrency(cq->jobq);
148   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
149   return cq;
150 }
151
152 typedef enum {
153   DS_EXEC_SUCCESS = 0,
154   DS_EXEC_ROW_FAILED = 1,
155   DS_EXEC_TXN_FAILED = 2,
156 } execute_outcome_t;
157
158 static char *
159 __noit__strndup(const char *src, int len) {
160   int slen;
161   char *dst;
162   for(slen = 0; slen < len; slen++)
163     if(src[slen] == '\0') break;
164   dst = malloc(slen + 1);
165   memcpy(dst, src, slen);
166   dst[slen] = '\0';
167   return dst;
168 }
169 #define DECLARE_PARAM_STR(str, len) do { \
170   d->paramValues[d->nparams] = __noit__strndup(str, len); \
171   d->paramLengths[d->nparams] = len; \
172   d->paramFormats[d->nparams] = 0; \
173   d->paramAllocd[d->nparams] = 1; \
174   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
175     free(d->paramValues[d->nparams]); \
176     d->paramValues[d->nparams] = NULL; \
177     d->paramLengths[d->nparams] = 0; \
178     d->paramAllocd[d->nparams] = 0; \
179   } \
180   d->nparams++; \
181 } while(0)
182 #define DECLARE_PARAM_INT(i) do { \
183   int buffer__len; \
184   char buffer__[32]; \
185   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
186   buffer__len = strlen(buffer__); \
187   DECLARE_PARAM_STR(buffer__, buffer__len); \
188 } while(0)
189
190 #define PG_GET_STR_COL(dest, row, name) do { \
191   int colnum = PQfnumber(d->res, name); \
192   dest = NULL; \
193   if (colnum >= 0) \
194     dest = PQgetisnull(d->res, row, colnum) \
195          ? NULL : PQgetvalue(d->res, row, colnum); \
196 } while(0)
197
198 #define PG_EXEC(cmd) do { \
199   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
200                         (const char * const *)d->paramValues, \
201                         d->paramLengths, d->paramFormats, 0); \
202   d->rv = PQresultStatus(d->res); \
203   if(d->rv != PGRES_COMMAND_OK && \
204      d->rv != PGRES_TUPLES_OK) { \
205     noitL(noit_error, "stratcon datasource bad (%d): %s\n", \
206           d->rv, PQresultErrorMessage(d->res)); \
207     PQclear(d->res); \
208     goto bad_row; \
209   } \
210 } while(0)
211
212 execute_outcome_t
213 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) {
214   char *val;
215   int row_count;
216
217   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid);
218   GET_QUERY(check_find);
219   PG_EXEC(check_find);
220   row_count = PQntuples(d->res);
221   if(row_count != 1) goto bad_row;
222
223   /* Get the check uuid */
224   PG_GET_STR_COL(val, 0, "id");
225   if(!val) goto bad_row;
226   if(uuid_parse(val, d->rt->checkid)) goto bad_row;
227
228   /* Get the remote_address (which noit owns this) */
229   PG_GET_STR_COL(val, 0, "remote_address");
230   if(!val) goto bad_row;
231   d->rt->noit = strdup(val);
232
233   PQclear(d->res);
234   return DS_EXEC_SUCCESS;
235  bad_row:
236   return DS_EXEC_ROW_FAILED;
237 }
238 execute_outcome_t
239 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
240   int type, len;
241   char *final_buff;
242   uLong final_len, actual_final_len;;
243   char *token;
244
245   type = d->data[0];
246
247   /* Parse the log line, but only if we haven't already */
248   if(!d->nparams) {
249     char raddr[128];
250     char *scp, *ecp;
251
252     /* setup our remote address */
253     raddr[0] = '\0';
254     switch(r->sa_family) {
255       case AF_INET:
256         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
257                   raddr, sizeof(raddr));
258         break;
259       case AF_INET6:
260         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
261                   raddr, sizeof(raddr));
262         break;
263       default:
264         noitL(noit_error, "remote address of family %d\n", r->sa_family);
265     }
266  
267     scp = d->data;
268 #define PROCESS_NEXT_FIELD(t,l) do { \
269   if(!*scp) goto bad_row; \
270   ecp = strchr(scp, '\t'); \
271   if(!ecp) goto bad_row; \
272   token = scp; \
273   len = (ecp-scp); \
274   scp = ecp + 1; \
275 } while(0)
276 #define PROCESS_LAST_FIELD(t,l) do { \
277   if(!*scp) ecp = scp; \
278   else { \
279     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
280     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
281   } \
282   t = scp; \
283   l = (ecp-scp); \
284 } while(0)
285
286     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
287     switch(type) {
288       /* See noit_check_log.c for log description */
289       case 'n':
290         DECLARE_PARAM_STR(raddr, strlen(raddr));
291         DECLARE_PARAM_STR("noitd",5); /* node_type */
292         PROCESS_NEXT_FIELD(token,len);
293         DECLARE_PARAM_STR(token,len); /* timestamp */
294
295         /* This is the expected uncompressed len */
296         PROCESS_NEXT_FIELD(token,len);
297         final_len = atoi(token);
298         final_buff = malloc(final_len);
299         if(!final_buff) goto bad_row;
300  
301         /* The last token is b64 endoded and compressed.
302          * we need to decode it, declare it and then free it.
303          */
304         PROCESS_LAST_FIELD(token, len);
305         /* We can in-place decode this */
306         len = noit_b64_decode((char *)token, len,
307                               (unsigned char *)token, len);
308         if(len <= 0) {
309           noitL(noit_error, "noitd config base64 decoding error.\n");
310           free(final_buff);
311           goto bad_row;
312         }
313         actual_final_len = final_len;
314         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
315                               (unsigned char *)token, len)) {
316           noitL(noit_error, "noitd config decompression failure.\n");
317           free(final_buff);
318           goto bad_row;
319         }
320         if(final_len != actual_final_len) {
321           noitL(noit_error, "noitd config decompression error.\n");
322           free(final_buff);
323           goto bad_row;
324         }
325         DECLARE_PARAM_STR(final_buff, final_len);
326         free(final_buff);
327         break;
328       case 'C':
329         DECLARE_PARAM_STR(raddr, strlen(raddr));
330         PROCESS_NEXT_FIELD(token,len);
331         DECLARE_PARAM_STR(token,len); /* timestamp */
332         PROCESS_NEXT_FIELD(token, len);
333         DECLARE_PARAM_STR(token,len); /* uuid */
334         PROCESS_NEXT_FIELD(token, len);
335         DECLARE_PARAM_STR(token,len); /* target */
336         PROCESS_NEXT_FIELD(token, len);
337         DECLARE_PARAM_STR(token,len); /* module */
338         PROCESS_LAST_FIELD(token, len);
339         DECLARE_PARAM_STR(token,len); /* name */
340         break;
341       case 'M':
342         PROCESS_NEXT_FIELD(token,len);
343         DECLARE_PARAM_STR(token,len); /* timestamp */
344         PROCESS_NEXT_FIELD(token, len);
345         DECLARE_PARAM_STR(token,len); /* uuid */
346         PROCESS_NEXT_FIELD(token, len);
347         DECLARE_PARAM_STR(token,len); /* name */
348         PROCESS_NEXT_FIELD(token,len);
349         d->metric_type = *token;
350         PROCESS_LAST_FIELD(token,len);
351         DECLARE_PARAM_STR(token,len); /* value */
352         break;
353       case 'S':
354         PROCESS_NEXT_FIELD(token,len);
355         DECLARE_PARAM_STR(token,len); /* timestamp */
356         PROCESS_NEXT_FIELD(token, len);
357         DECLARE_PARAM_STR(token,len); /* uuid */
358         PROCESS_NEXT_FIELD(token, len);
359         DECLARE_PARAM_STR(token,len); /* state */
360         PROCESS_NEXT_FIELD(token, len);
361         DECLARE_PARAM_STR(token,len); /* availability */
362         PROCESS_NEXT_FIELD(token, len);
363         DECLARE_PARAM_STR(token,len); /* duration */
364         PROCESS_LAST_FIELD(token,len);
365         DECLARE_PARAM_STR(token,len); /* status */
366         break;
367       default:
368         goto bad_row;
369     }
370
371   }
372
373   /* Now execute the query */
374   switch(type) {
375     case 'n':
376       GET_QUERY(config_insert);
377       PG_EXEC(config_insert);
378       PQclear(d->res);
379       break;
380     case 'C':
381       GET_QUERY(check_insert);
382       PG_EXEC(check_insert);
383       PQclear(d->res);
384       break;
385     case 'S':
386       GET_QUERY(status_insert);
387       PG_EXEC(status_insert);
388       PQclear(d->res);
389       break;
390     case 'M':
391       switch(d->metric_type) {
392         case METRIC_INT32:
393         case METRIC_UINT32:
394         case METRIC_INT64:
395         case METRIC_UINT64:
396         case METRIC_DOUBLE:
397           GET_QUERY(metric_insert_numeric);
398           PG_EXEC(metric_insert_numeric);
399           PQclear(d->res);
400           break;
401         case METRIC_STRING:
402           GET_QUERY(metric_insert_text);
403           PG_EXEC(metric_insert_text);
404           PQclear(d->res);
405           break;
406         default:
407           goto bad_row;
408       }
409       break;
410     default:
411       /* should never get here */
412       goto bad_row;
413   }
414   return DS_EXEC_SUCCESS;
415  bad_row:
416   return DS_EXEC_ROW_FAILED;
417 }
418 static int
419 stratcon_database_connect(conn_q *cq) {
420   char dsn[512];
421   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
422   const char *k, *v;
423   int klen;
424   noit_hash_table *t;
425
426   if(cq->dbh) {
427     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
428     PQreset(cq->dbh);
429     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
430     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
431           dsn, PQerrorMessage(cq->dbh));
432     return -1;
433   }
434
435   dsn[0] = '\0';
436   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
437   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
438     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
439     strlcat(dsn, k, sizeof(dsn));
440     strlcat(dsn, "=", sizeof(dsn));
441     strlcat(dsn, v, sizeof(dsn));
442   }
443   cq->dbh = PQconnectdb(dsn);
444   if(!cq->dbh) return -1;
445   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
446   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
447         dsn, PQerrorMessage(cq->dbh));
448   return -1;
449 }
450 static int
451 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
452                                 const char *name) {
453   int rv = -1;
454   PGresult *res;
455   char cmd[128];
456   strlcpy(cmd, p, sizeof(cmd));
457   strlcat(cmd, name, sizeof(cmd));
458   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
459   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
460   PQclear(res);
461   return rv;
462 }
463 static int
464 stratcon_datastore_do(conn_q *cq, const char *cmd) {
465   PGresult *res;
466   int rv = -1;
467   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
468   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
469   PQclear(res);
470   return rv;
471 }
472 #define BUSTED(cq) do { \
473   PQfinish((cq)->dbh); \
474   (cq)->dbh = NULL; \
475   goto full_monty; \
476 } while(0)
477 #define SAVEPOINT(name) do { \
478   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
479   last_sp = current; \
480 } while(0)
481 #define ROLLBACK_TO_SAVEPOINT(name) do { \
482   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
483     BUSTED(cq); \
484   last_sp = NULL; \
485 } while(0)
486 #define RELEASE_SAVEPOINT(name) do { \
487   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
488     BUSTED(cq); \
489   last_sp = NULL; \
490 } while(0)
491 int
492 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
493                                  struct timeval *now) {
494   conn_q *cq = closure;
495   ds_job_detail *current;
496   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
497
498   if(!cq->head) return 0;
499
500   stratcon_database_connect(cq);
501
502   current = cq->head;
503   while(current) {
504     if(current->rt) {
505       stratcon_datastore_find(cq, current);
506       current = current->next;
507     }
508     else if(current->completion_event) {
509       eventer_add(current->completion_event);
510       current = current->next;
511       __remove_until(cq, current);
512     }
513     else current = current->next;
514   }
515   return 0;
516 }
517 int
518 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
519                                   struct timeval *now) {
520   conn_q *cq = closure;
521   ds_job_detail *current, *last_sp;
522   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
523
524   if(!cq->head) return 0;
525
526  full_monty:
527   /* Make sure we have a connection */
528   while(stratcon_database_connect(cq)) {
529     noitL(noit_error, "Error connecting to database\n");
530     sleep(1);
531   }
532
533   current = cq->head;
534   last_sp = NULL;
535   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
536   while(current) {
537     execute_outcome_t rv;
538     if(current->data) {
539       if(!last_sp) SAVEPOINT("batch");
540  
541       if(current->problematic) {
542         noitL(noit_error, "Failed noit line: %s", current->data);
543         RELEASE_SAVEPOINT("batch");
544         current = current->next;
545         continue;
546       }
547       rv = stratcon_datastore_execute(cq, cq->remote, current);
548       switch(rv) {
549         case DS_EXEC_SUCCESS:
550           current = current->next;
551           break;
552         case DS_EXEC_ROW_FAILED:
553           /* rollback to savepoint, mark this record as bad and start again */
554           current->problematic = 1;
555           current = last_sp;
556           ROLLBACK_TO_SAVEPOINT("batch");
557           break;
558         case DS_EXEC_TXN_FAILED:
559           BUSTED(cq);
560       }
561     }
562     if(current->completion_event) {
563       if(last_sp) RELEASE_SAVEPOINT("batch");
564       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
565       eventer_add(current->completion_event);
566       current = current->next;
567       __remove_until(cq, current);
568     }
569   }
570   return 0;
571 }
572 void
573 stratcon_datastore_push(stratcon_datastore_op_t op,
574                         struct sockaddr *remote, void *operand) {
575   conn_q *cq;
576   eventer_t e;
577   ds_job_detail *dsjd;
578
579   cq = __get_conn_q_for_remote(remote);
580   dsjd = calloc(1, sizeof(*dsjd));
581   switch(op) {
582     case DS_OP_FIND:
583       dsjd->rt = operand;
584       __append(cq, dsjd);
585       break;
586     case DS_OP_INSERT:
587       dsjd->data = operand;
588       __append(cq, dsjd);
589       break;
590     case DS_OP_FIND_COMPLETE:
591     case DS_OP_CHKPT:
592       dsjd->completion_event = operand;
593       __append(cq,dsjd);
594       e = eventer_alloc();
595       e->mask = EVENTER_ASYNCH;
596       if(op == DS_OP_FIND_COMPLETE)
597         e->callback = stratcon_datastore_asynch_lookup;
598       else if(op == DS_OP_CHKPT)
599         e->callback = stratcon_datastore_asynch_execute;
600       e->closure = cq;
601       eventer_add(e);
602       break;
603   }
604 }
605
606 int
607 stratcon_datastore_saveconfig(void *unused) {
608   int rv = -1;
609   conn_q _cq = { 0 }, *cq = &_cq;
610   char *buff;
611   ds_single_detail _d = { 0 }, *d = &_d;
612
613   if(stratcon_database_connect(cq) == 0) {
614     char time_as_str[20];
615     size_t len;
616     buff = noit_conf_xml_in_mem(&len);
617     if(!buff) goto bad_row;
618
619     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
620     DECLARE_PARAM_STR("0.0.0.0", 7);
621     DECLARE_PARAM_STR("stratcond", 9);
622     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
623     DECLARE_PARAM_STR(buff, len);
624     free(buff);
625
626     GET_QUERY(config_insert);
627     PG_EXEC(config_insert);
628     PQclear(d->res);
629     rv = 0;
630
631     bad_row:
632       free_params(d);
633   }
634   if(cq->dbh) PQfinish(cq->dbh);
635   return rv;
636 }
Note: See TracBrowser for help on using the browser.