root/src/stratcon_datastore.c

Revision 21b0c6c1011f78327925b8a1914d98cdd5cd43d5, 17.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

first whack at feeding actual data. refs #71

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "utils/noit_b64.h"
10 #include "stratcon_datastore.h"
11 #include "stratcon_realtime_http.h"
12 #include "noit_conf.h"
13 #include "noit_check.h"
14 #include <unistd.h>
15 #include <netinet/in.h>
16 #include <sys/un.h>
17 #include <arpa/inet.h>
18 #include <libpq-fe.h>
19 #include <zlib.h>
20
21 static char *check_find = NULL;
22 static const char *check_find_conf = "/stratcon/database/statements/findcheck";
23 static char *check_insert = NULL;
24 static const char *check_insert_conf = "/stratcon/database/statements/check";
25 static char *status_insert = NULL;
26 static const char *status_insert_conf = "/stratcon/database/statements/status";
27 static char *metric_insert_numeric = NULL;
28 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
29 static char *metric_insert_text = NULL;
30 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
31 static char *config_insert = NULL;
32 static const char *config_insert_conf = "/stratcon/database/statements/config";
33
34 #define GET_QUERY(a) do { \
35   if(a == NULL) \
36     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
37       goto bad_row; \
38 } while(0)
39
40 #define MAX_PARAMS 8
41 #define POSTGRES_PARTS \
42   PGresult *res; \
43   int rv; \
44   int nparams; \
45   int metric_type; \
46   char *paramValues[MAX_PARAMS]; \
47   int paramLengths[MAX_PARAMS]; \
48   int paramFormats[MAX_PARAMS]; \
49   int paramAllocd[MAX_PARAMS];
50
51 typedef struct ds_single_detail {
52   POSTGRES_PARTS
53 } ds_single_detail;
54 typedef struct ds_job_detail {
55   /* Postgres specific stuff */
56   POSTGRES_PARTS
57
58   char *data;  /* The raw string, NULL means the stream is done -- commit. */
59   struct realtime_tracker *rt;
60
61   int problematic;
62   eventer_t completion_event; /* This event should be registered if non NULL */
63   struct ds_job_detail *next;
64 } ds_job_detail;
65
66 typedef struct {
67   struct sockaddr *remote;
68   eventer_jobq_t  *jobq;
69   /* Postgres specific stuff */
70   PGconn          *dbh;
71   ds_job_detail   *head;
72   ds_job_detail   *tail;
73 } conn_q;
74
75 static void
76 free_params(ds_single_detail *d) {
77   int i;
78   for(i=0; i<d->nparams; i++)
79     if(d->paramAllocd[i] && d->paramValues[i])
80       free(d->paramValues[i]);
81 }
82
83 static void
84 __append(conn_q *q, ds_job_detail *d) {
85   d->next = NULL;
86   if(!q->head) q->head = q->tail = d;
87   else {
88     q->tail->next = d;
89     q->tail = d;
90   }
91 }
92 static void
93 __remove_until(conn_q *q, ds_job_detail *d) {
94   ds_job_detail *next;
95   while(q->head && q->head != d) {
96     next = q->head;
97     q->head = q->head->next;
98     free_params((ds_single_detail *)next);
99     if(next->data) free(next->data);
100     free(next);
101   }
102   if(!q->head) q->tail = NULL;
103 }
104
105 noit_hash_table ds_conns;
106
107 conn_q *
108 __get_conn_q_for_remote(struct sockaddr *remote) {
109   conn_q *cq;
110   static const char __zeros[4] = { 0 };
111   int len = 0;
112   if(remote) {
113     switch(remote->sa_family) {
114       case AF_INET: len = sizeof(struct sockaddr_in); break;
115       case AF_INET6: len = sizeof(struct sockaddr_in6); break;
116       case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break;
117       default: return NULL;
118     }
119   }
120   else {
121     /* This is a dummy connection */
122     remote = (struct sockaddr *)__zeros;
123     len = 4;
124   }
125   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq))
126     return cq;
127   cq = calloc(1, sizeof(*cq));
128   cq->remote = malloc(len);
129   memcpy(cq->remote, remote, len);
130   cq->jobq = calloc(1, sizeof(*cq->jobq));
131   eventer_jobq_init(cq->jobq);
132   cq->jobq->backq = eventer_default_backq();
133   /* Add one thread */
134   eventer_jobq_increase_concurrency(cq->jobq);
135   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
136   return cq;
137 }
138
139 typedef enum {
140   DS_EXEC_SUCCESS = 0,
141   DS_EXEC_ROW_FAILED = 1,
142   DS_EXEC_TXN_FAILED = 2,
143 } execute_outcome_t;
144
145 static char *
146 __noit__strndup(const char *src, int len) {
147   int slen;
148   char *dst;
149   for(slen = 0; slen < len; slen++)
150     if(src[slen] == '\0') break;
151   dst = malloc(slen + 1);
152   memcpy(dst, src, slen);
153   dst[slen] = '\0';
154   return dst;
155 }
156 #define DECLARE_PARAM_STR(str, len) do { \
157   d->paramValues[d->nparams] = __noit__strndup(str, len); \
158   d->paramLengths[d->nparams] = len; \
159   d->paramFormats[d->nparams] = 0; \
160   d->paramAllocd[d->nparams] = 1; \
161   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
162     free(d->paramValues[d->nparams]); \
163     d->paramValues[d->nparams] = NULL; \
164     d->paramLengths[d->nparams] = 0; \
165     d->paramAllocd[d->nparams] = 0; \
166   } \
167   d->nparams++; \
168 } while(0)
169 #define DECLARE_PARAM_INT(i) do { \
170   int buffer__len; \
171   char buffer__[32]; \
172   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
173   buffer__len = strlen(buffer__); \
174   DECLARE_PARAM_STR(buffer__, buffer__len); \
175 } while(0)
176
177 #define PG_GET_STR_COL(dest, row, name) do { \
178   int colnum = PQfnumber(d->res, name); \
179   dest = NULL; \
180   if (colnum >= 0) \
181     dest = PQgetisnull(d->res, row, colnum) \
182          ? NULL : PQgetvalue(d->res, row, colnum); \
183 } while(0)
184
185 #define PG_EXEC(cmd) do { \
186   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
187                         (const char * const *)d->paramValues, \
188                         d->paramLengths, d->paramFormats, 0); \
189   d->rv = PQresultStatus(d->res); \
190   if(d->rv != PGRES_COMMAND_OK && \
191      d->rv != PGRES_TUPLES_OK) { \
192     noitL(noit_error, "stratcon datasource bad (%d): %s\n", \
193           d->rv, PQresultErrorMessage(d->res)); \
194     PQclear(d->res); \
195     goto bad_row; \
196   } \
197 } while(0)
198
199 execute_outcome_t
200 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) {
201   char *val;
202   int row_count;
203
204   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid);
205   GET_QUERY(check_find);
206   PG_EXEC(check_find);
207   row_count = PQntuples(d->res);
208   if(row_count != 1) goto bad_row;
209
210   /* Get the check uuid */
211   PG_GET_STR_COL(val, 0, "id");
212   if(!val) goto bad_row;
213   if(uuid_parse(val, d->rt->checkid)) goto bad_row;
214
215   /* Get the remote_address (which noit owns this) */
216   PG_GET_STR_COL(val, 0, "remote_address");
217   if(!val) goto bad_row;
218   d->rt->noit = strdup(val);
219
220   PQclear(d->res);
221   return DS_EXEC_SUCCESS;
222  bad_row:
223   return DS_EXEC_ROW_FAILED;
224 }
225 execute_outcome_t
226 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
227   int type, len;
228   char *final_buff;
229   uLong final_len, actual_final_len;;
230   char *token;
231
232   type = d->data[0];
233
234   /* Parse the log line, but only if we haven't already */
235   if(!d->nparams) {
236     char raddr[128];
237     char *scp, *ecp;
238
239     /* setup our remote address */
240     raddr[0] = '\0';
241     switch(r->sa_family) {
242       case AF_INET:
243         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
244                   raddr, sizeof(raddr));
245         break;
246       case AF_INET6:
247         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
248                   raddr, sizeof(raddr));
249         break;
250       default:
251         noitL(noit_error, "remote address of family %d\n", r->sa_family);
252     }
253  
254     scp = d->data;
255 #define PROCESS_NEXT_FIELD(t,l) do { \
256   if(!*scp) goto bad_row; \
257   ecp = strchr(scp, '\t'); \
258   if(!ecp) goto bad_row; \
259   token = scp; \
260   len = (ecp-scp); \
261   scp = ecp + 1; \
262 } while(0)
263 #define PROCESS_LAST_FIELD(t,l) do { \
264   if(!*scp) ecp = scp; \
265   else { \
266     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
267     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
268   } \
269   t = scp; \
270   l = (ecp-scp); \
271 } while(0)
272
273     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
274     switch(type) {
275       /* See noit_check_log.c for log description */
276       case 'n':
277         DECLARE_PARAM_STR(raddr, strlen(raddr));
278         DECLARE_PARAM_STR("noitd",5); /* node_type */
279         PROCESS_NEXT_FIELD(token,len);
280         DECLARE_PARAM_STR(token,len); /* timestamp */
281
282         /* This is the expected uncompressed len */
283         PROCESS_NEXT_FIELD(token,len);
284         final_len = atoi(token);
285         final_buff = malloc(final_len);
286         if(!final_buff) goto bad_row;
287  
288         /* The last token is b64 endoded and compressed.
289          * we need to decode it, declare it and then free it.
290          */
291         PROCESS_LAST_FIELD(token, len);
292         /* We can in-place decode this */
293         len = noit_b64_decode((char *)token, len,
294                               (unsigned char *)token, len);
295         if(len <= 0) {
296           noitL(noit_error, "noitd config base64 decoding error.\n");
297           free(final_buff);
298           goto bad_row;
299         }
300         actual_final_len = final_len;
301         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
302                               (unsigned char *)token, len)) {
303           noitL(noit_error, "noitd config decompression failure.\n");
304           free(final_buff);
305           goto bad_row;
306         }
307         if(final_len != actual_final_len) {
308           noitL(noit_error, "noitd config decompression error.\n");
309           free(final_buff);
310           goto bad_row;
311         }
312         DECLARE_PARAM_STR(final_buff, final_len);
313         free(final_buff);
314         break;
315       case 'C':
316         DECLARE_PARAM_STR(raddr, strlen(raddr));
317         PROCESS_NEXT_FIELD(token,len);
318         DECLARE_PARAM_STR(token,len); /* timestamp */
319         PROCESS_NEXT_FIELD(token, len);
320         DECLARE_PARAM_STR(token,len); /* uuid */
321         PROCESS_NEXT_FIELD(token, len);
322         DECLARE_PARAM_STR(token,len); /* target */
323         PROCESS_NEXT_FIELD(token, len);
324         DECLARE_PARAM_STR(token,len); /* module */
325         PROCESS_LAST_FIELD(token, len);
326         DECLARE_PARAM_STR(token,len); /* name */
327         break;
328       case 'M':
329         PROCESS_NEXT_FIELD(token,len);
330         DECLARE_PARAM_STR(token,len); /* timestamp */
331         PROCESS_NEXT_FIELD(token, len);
332         DECLARE_PARAM_STR(token,len); /* uuid */
333         PROCESS_NEXT_FIELD(token, len);
334         DECLARE_PARAM_STR(token,len); /* name */
335         PROCESS_NEXT_FIELD(token,len);
336         d->metric_type = *token;
337         PROCESS_LAST_FIELD(token,len);
338         DECLARE_PARAM_STR(token,len); /* value */
339         break;
340       case 'S':
341         PROCESS_NEXT_FIELD(token,len);
342         DECLARE_PARAM_STR(token,len); /* timestamp */
343         PROCESS_NEXT_FIELD(token, len);
344         DECLARE_PARAM_STR(token,len); /* uuid */
345         PROCESS_NEXT_FIELD(token, len);
346         DECLARE_PARAM_STR(token,len); /* state */
347         PROCESS_NEXT_FIELD(token, len);
348         DECLARE_PARAM_STR(token,len); /* availability */
349         PROCESS_NEXT_FIELD(token, len);
350         DECLARE_PARAM_STR(token,len); /* duration */
351         PROCESS_LAST_FIELD(token,len);
352         DECLARE_PARAM_STR(token,len); /* status */
353         break;
354       default:
355         goto bad_row;
356     }
357
358   }
359
360   /* Now execute the query */
361   switch(type) {
362     case 'n':
363       GET_QUERY(config_insert);
364       PG_EXEC(config_insert);
365       PQclear(d->res);
366       break;
367     case 'C':
368       GET_QUERY(check_insert);
369       PG_EXEC(check_insert);
370       PQclear(d->res);
371       break;
372     case 'S':
373       GET_QUERY(status_insert);
374       PG_EXEC(status_insert);
375       PQclear(d->res);
376       break;
377     case 'M':
378       switch(d->metric_type) {
379         case METRIC_INT32:
380         case METRIC_UINT32:
381         case METRIC_INT64:
382         case METRIC_UINT64:
383         case METRIC_DOUBLE:
384           GET_QUERY(metric_insert_numeric);
385           PG_EXEC(metric_insert_numeric);
386           PQclear(d->res);
387           break;
388         case METRIC_STRING:
389           GET_QUERY(metric_insert_text);
390           PG_EXEC(metric_insert_text);
391           PQclear(d->res);
392           break;
393         default:
394           goto bad_row;
395       }
396       break;
397     default:
398       /* should never get here */
399       goto bad_row;
400   }
401   return DS_EXEC_SUCCESS;
402  bad_row:
403   return DS_EXEC_ROW_FAILED;
404 }
405 static int
406 stratcon_database_connect(conn_q *cq) {
407   char dsn[512];
408   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
409   const char *k, *v;
410   int klen;
411   noit_hash_table *t;
412
413   if(cq->dbh) {
414     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
415     PQreset(cq->dbh);
416     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
417     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
418           dsn, PQerrorMessage(cq->dbh));
419     return -1;
420   }
421
422   dsn[0] = '\0';
423   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
424   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
425     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
426     strlcat(dsn, k, sizeof(dsn));
427     strlcat(dsn, "=", sizeof(dsn));
428     strlcat(dsn, v, sizeof(dsn));
429   }
430   cq->dbh = PQconnectdb(dsn);
431   if(!cq->dbh) return -1;
432   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
433   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
434         dsn, PQerrorMessage(cq->dbh));
435   return -1;
436 }
437 static int
438 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
439                                 const char *name) {
440   int rv = -1;
441   PGresult *res;
442   char cmd[128];
443   strlcpy(cmd, p, sizeof(cmd));
444   strlcat(cmd, name, sizeof(cmd));
445   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
446   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
447   PQclear(res);
448   return rv;
449 }
450 static int
451 stratcon_datastore_do(conn_q *cq, const char *cmd) {
452   PGresult *res;
453   int rv = -1;
454   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
455   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
456   PQclear(res);
457   return rv;
458 }
459 #define BUSTED(cq) do { \
460   PQfinish((cq)->dbh); \
461   (cq)->dbh = NULL; \
462   goto full_monty; \
463 } while(0)
464 #define SAVEPOINT(name) do { \
465   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
466   last_sp = current; \
467 } while(0)
468 #define ROLLBACK_TO_SAVEPOINT(name) do { \
469   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
470     BUSTED(cq); \
471   last_sp = NULL; \
472 } while(0)
473 #define RELEASE_SAVEPOINT(name) do { \
474   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
475     BUSTED(cq); \
476   last_sp = NULL; \
477 } while(0)
478 int
479 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
480                                  struct timeval *now) {
481   conn_q *cq = closure;
482   ds_job_detail *current;
483   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
484
485   if(!cq->head) return 0;
486
487   stratcon_database_connect(cq);
488
489   current = cq->head;
490   while(current) {
491     if(current->rt) {
492       stratcon_datastore_find(cq, current);
493       current = current->next;
494     }
495     else if(current->completion_event) {
496       eventer_add(current->completion_event);
497       current = current->next;
498       __remove_until(cq, current);
499     }
500     else current = current->next;
501   }
502   return 0;
503 }
504 int
505 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
506                                   struct timeval *now) {
507   conn_q *cq = closure;
508   ds_job_detail *current, *last_sp;
509   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
510
511   if(!cq->head) return 0;
512
513  full_monty:
514   /* Make sure we have a connection */
515   while(stratcon_database_connect(cq)) {
516     noitL(noit_error, "Error connecting to database\n");
517     sleep(1);
518   }
519
520   current = cq->head;
521   last_sp = NULL;
522   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
523   while(current) {
524     execute_outcome_t rv;
525     if(current->data) {
526       if(!last_sp) SAVEPOINT("batch");
527  
528       if(current->problematic) {
529         noitL(noit_error, "Failed noit line: %s", current->data);
530         RELEASE_SAVEPOINT("batch");
531         current = current->next;
532         continue;
533       }
534       rv = stratcon_datastore_execute(cq, cq->remote, current);
535       switch(rv) {
536         case DS_EXEC_SUCCESS:
537           current = current->next;
538           break;
539         case DS_EXEC_ROW_FAILED:
540           /* rollback to savepoint, mark this record as bad and start again */
541           current->problematic = 1;
542           current = last_sp;
543           ROLLBACK_TO_SAVEPOINT("batch");
544           break;
545         case DS_EXEC_TXN_FAILED:
546           BUSTED(cq);
547       }
548     }
549     if(current->completion_event) {
550       if(last_sp) RELEASE_SAVEPOINT("batch");
551       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
552       eventer_add(current->completion_event);
553       current = current->next;
554       __remove_until(cq, current);
555     }
556   }
557   return 0;
558 }
559 void
560 stratcon_datastore_push(stratcon_datastore_op_t op,
561                         struct sockaddr *remote, void *operand) {
562   conn_q *cq;
563   eventer_t e;
564   ds_job_detail *dsjd;
565
566   cq = __get_conn_q_for_remote(remote);
567   dsjd = calloc(1, sizeof(*dsjd));
568   switch(op) {
569     case DS_OP_FIND:
570       dsjd->rt = operand;
571       __append(cq, dsjd);
572       break;
573     case DS_OP_INSERT:
574       dsjd->data = operand;
575       __append(cq, dsjd);
576       break;
577     case DS_OP_FIND_COMPLETE:
578     case DS_OP_CHKPT:
579       dsjd->completion_event = operand;
580       __append(cq,dsjd);
581       e = eventer_alloc();
582       e->mask = EVENTER_ASYNCH;
583       if(op == DS_OP_FIND_COMPLETE)
584         e->callback = stratcon_datastore_asynch_lookup;
585       else if(op == DS_OP_CHKPT)
586         e->callback = stratcon_datastore_asynch_execute;
587       e->closure = cq;
588       eventer_add(e);
589       break;
590   }
591 }
592
593 int
594 stratcon_datastore_saveconfig(void *unused) {
595   int rv = -1;
596   conn_q _cq = { 0 }, *cq = &_cq;
597   char *buff;
598   ds_single_detail _d = { 0 }, *d = &_d;
599
600   if(stratcon_database_connect(cq) == 0) {
601     char time_as_str[20];
602     size_t len;
603     buff = noit_conf_xml_in_mem(&len);
604     if(!buff) goto bad_row;
605
606     snprintf(time_as_str, sizeof(time_as_str), "%lu", time(NULL));
607     DECLARE_PARAM_STR("0.0.0.0", 7);
608     DECLARE_PARAM_STR("stratcond", 9);
609     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
610     DECLARE_PARAM_STR(buff, len);
611     free(buff);
612
613     GET_QUERY(config_insert);
614     PG_EXEC(config_insert);
615     PQclear(d->res);
616     rv = 0;
617
618     bad_row:
619       free_params(d);
620   }
621   if(cq->dbh) PQfinish(cq->dbh);
622   return rv;
623 }
Note: See TracBrowser for help on using the browser.