root/src/stratcon_datastore.c

Revision 222a9737db8ea7fefdf3ee212cae3e505459add2, 14.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

this was reading freed memory, refs #51

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "utils/noit_b64.h"
10 #include "stratcon_datastore.h"
11 #include "noit_conf.h"
12 #include "noit_check.h"
13 #include <unistd.h>
14 #include <netinet/in.h>
15 #include <sys/un.h>
16 #include <arpa/inet.h>
17 #include <libpq-fe.h>
18 #include <zlib.h>
19
20 static char *check_insert = NULL;
21 static const char *check_insert_conf = "/stratcon/database/statements/check";
22 static char *status_insert = NULL;
23 static const char *status_insert_conf = "/stratcon/database/statements/status";
24 static char *metric_insert_numeric = NULL;
25 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
26 static char *metric_insert_text = NULL;
27 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
28 static char *config_insert = NULL;
29 static const char *config_insert_conf = "/stratcon/database/statements/config";
30
31 #define GET_QUERY(a) do { \
32   if(a == NULL) \
33     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
34       goto bad_row; \
35 } while(0)
36
37 #define MAX_PARAMS 8
38 #define POSTGRES_PARTS \
39   int nparams; \
40   int metric_type; \
41   char *paramValues[MAX_PARAMS]; \
42   int paramLengths[MAX_PARAMS]; \
43   int paramFormats[MAX_PARAMS]; \
44   int paramAllocd[MAX_PARAMS];
45
46 typedef struct ds_single_detail {
47   POSTGRES_PARTS
48 } ds_single_detail;
49 typedef struct ds_job_detail {
50   /* Postgres specific stuff */
51   POSTGRES_PARTS
52
53   char *data;  /* The raw string, NULL means the stream is done -- commit. */
54   int problematic;
55   eventer_t completion_event; /* This event should be registered if non NULL */
56   struct ds_job_detail *next;
57 } ds_job_detail;
58
59 typedef struct {
60   struct sockaddr *remote;
61   eventer_jobq_t  *jobq;
62   /* Postgres specific stuff */
63   PGconn          *dbh;
64   ds_job_detail   *head;
65   ds_job_detail   *tail;
66 } conn_q;
67
68 static void
69 free_params(ds_single_detail *d) {
70   int i;
71   for(i=0; i<d->nparams; i++)
72     if(d->paramAllocd[i] && d->paramValues[i])
73       free(d->paramValues[i]);
74 }
75
76 static void
77 __append(conn_q *q, ds_job_detail *d) {
78   d->next = NULL;
79   if(!q->head) q->head = q->tail = d;
80   else {
81     q->tail->next = d;
82     q->tail = d;
83   }
84 }
85 static void
86 __remove_until(conn_q *q, ds_job_detail *d) {
87   ds_job_detail *next;
88   while(q->head && q->head != d) {
89     next = q->head;
90     q->head = q->head->next;
91     free_params((ds_single_detail *)next);
92     if(next->data) free(next->data);
93     free(next);
94   }
95   if(!q->head) q->tail = NULL;
96 }
97
98 noit_hash_table ds_conns;
99
100 conn_q *
101 __get_conn_q_for_remote(struct sockaddr *remote) {
102   conn_q *cq;
103   int len = 0;
104   switch(remote->sa_family) {
105     case AF_INET: len = sizeof(struct sockaddr_in); break;
106     case AF_INET6: len = sizeof(struct sockaddr_in6); break;
107     case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break;
108     default: return NULL;
109   }
110   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq))
111     return cq;
112   cq = calloc(1, sizeof(*cq));
113   cq->remote = malloc(len);
114   memcpy(cq->remote, remote, len);
115   cq->jobq = calloc(1, sizeof(*cq->jobq));
116   eventer_jobq_init(cq->jobq);
117   cq->jobq->backq = eventer_default_backq();
118   /* Add one thread */
119   eventer_jobq_increase_concurrency(cq->jobq);
120   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
121   return cq;
122 }
123
124 typedef enum {
125   DS_EXEC_SUCCESS = 0,
126   DS_EXEC_ROW_FAILED = 1,
127   DS_EXEC_TXN_FAILED = 2,
128 } execute_outcome_t;
129
130 static char *
131 __noit__strndup(const char *src, int len) {
132   int slen;
133   char *dst;
134   for(slen = 0; slen < len; slen++)
135     if(src[slen] == '\0') break;
136   dst = malloc(slen + 1);
137   memcpy(dst, src, slen);
138   dst[slen] = '\0';
139   return dst;
140 }
141 #define DECLARE_PARAM_STR(str, len) do { \
142   d->paramValues[d->nparams] = __noit__strndup(str, len); \
143   d->paramLengths[d->nparams] = len; \
144   d->paramFormats[d->nparams] = 0; \
145   d->paramAllocd[d->nparams] = 1; \
146   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
147     free(d->paramValues[d->nparams]); \
148     d->paramValues[d->nparams] = NULL; \
149     d->paramLengths[d->nparams] = 0; \
150     d->paramAllocd[d->nparams] = 0; \
151   } \
152   d->nparams++; \
153 } while(0)
154
155 execute_outcome_t
156 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
157   int type, len;
158   char *final_buff;
159   uLong final_len, actual_final_len;;
160   char *token;
161
162   type = d->data[0];
163
164   /* Parse the log line, but only if we haven't already */
165   if(!d->nparams) {
166     char raddr[128];
167     char *scp, *ecp;
168
169     /* setup our remote address */
170     raddr[0] = '\0';
171     switch(r->sa_family) {
172       case AF_INET:
173         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
174                   raddr, sizeof(raddr));
175         break;
176       case AF_INET6:
177         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
178                   raddr, sizeof(raddr));
179         break;
180       default:
181         noitL(noit_error, "remote address of family %d\n", r->sa_family);
182     }
183  
184     scp = d->data;
185 #define PROCESS_NEXT_FIELD(t,l) do { \
186   if(!*scp) goto bad_row; \
187   ecp = strchr(scp, '\t'); \
188   if(!ecp) goto bad_row; \
189   token = scp; \
190   len = (ecp-scp); \
191   scp = ecp + 1; \
192 } while(0)
193 #define PROCESS_LAST_FIELD(t,l) do { \
194   if(!*scp) ecp = scp; \
195   else { \
196     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
197     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
198   } \
199   t = scp; \
200   l = (ecp-scp); \
201 } while(0)
202
203     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
204     switch(type) {
205       /* See noit_check_log.c for log description */
206       case 'n':
207         DECLARE_PARAM_STR(raddr, strlen(raddr));
208         DECLARE_PARAM_STR("noitd",5); /* node_type */
209         PROCESS_NEXT_FIELD(token,len);
210         DECLARE_PARAM_STR(token,len); /* timestamp */
211
212         /* This is the expected uncompressed len */
213         PROCESS_NEXT_FIELD(token,len);
214         final_len = atoi(token);
215         final_buff = malloc(final_len);
216         if(!final_buff) goto bad_row;
217  
218         /* The last token is b64 endoded and compressed.
219          * we need to decode it, declare it and then free it.
220          */
221         PROCESS_LAST_FIELD(token, len);
222         /* We can in-place decode this */
223         len = noit_b64_decode((char *)token, len,
224                               (unsigned char *)token, len);
225         if(len <= 0) {
226           noitL(noit_error, "noitd config base64 decoding error.\n");
227           free(final_buff);
228           goto bad_row;
229         }
230         actual_final_len = final_len;
231         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
232                               (unsigned char *)token, len)) {
233           noitL(noit_error, "noitd config decompression failure.\n");
234           free(final_buff);
235           goto bad_row;
236         }
237         if(final_len != actual_final_len) {
238           noitL(noit_error, "noitd config decompression error.\n");
239           free(final_buff);
240           goto bad_row;
241         }
242         DECLARE_PARAM_STR(final_buff, final_len);
243         free(final_buff);
244         break;
245       case 'C':
246         DECLARE_PARAM_STR(raddr, strlen(raddr));
247         PROCESS_NEXT_FIELD(token,len);
248         DECLARE_PARAM_STR(token,len); /* timestamp */
249         PROCESS_NEXT_FIELD(token, len);
250         DECLARE_PARAM_STR(token,len); /* uuid */
251         PROCESS_NEXT_FIELD(token, len);
252         DECLARE_PARAM_STR(token,len); /* target */
253         PROCESS_NEXT_FIELD(token, len);
254         DECLARE_PARAM_STR(token,len); /* module */
255         PROCESS_LAST_FIELD(token, len);
256         DECLARE_PARAM_STR(token,len); /* name */
257         break;
258       case 'M':
259         PROCESS_NEXT_FIELD(token,len);
260         DECLARE_PARAM_STR(token,len); /* timestamp */
261         PROCESS_NEXT_FIELD(token, len);
262         DECLARE_PARAM_STR(token,len); /* uuid */
263         PROCESS_NEXT_FIELD(token, len);
264         DECLARE_PARAM_STR(token,len); /* name */
265         PROCESS_NEXT_FIELD(token,len);
266         d->metric_type = *token;
267         PROCESS_LAST_FIELD(token,len);
268         DECLARE_PARAM_STR(token,len); /* value */
269         break;
270       case 'S':
271         PROCESS_NEXT_FIELD(token,len);
272         DECLARE_PARAM_STR(token,len); /* timestamp */
273         PROCESS_NEXT_FIELD(token, len);
274         DECLARE_PARAM_STR(token,len); /* uuid */
275         PROCESS_NEXT_FIELD(token, len);
276         DECLARE_PARAM_STR(token,len); /* state */
277         PROCESS_NEXT_FIELD(token, len);
278         DECLARE_PARAM_STR(token,len); /* availability */
279         PROCESS_NEXT_FIELD(token, len);
280         DECLARE_PARAM_STR(token,len); /* duration */
281         PROCESS_LAST_FIELD(token,len);
282         DECLARE_PARAM_STR(token,len); /* status */
283         break;
284       default:
285         goto bad_row;
286     }
287
288   }
289
290 #define PG_EXEC(cmd) do { \
291   PGresult *res; \
292   int rv; \
293   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
294                      (const char * const *)d->paramValues, \
295                      d->paramLengths, d->paramFormats, 0); \
296   rv = PQresultStatus(res); \
297   if(rv != PGRES_COMMAND_OK && \
298      rv != PGRES_TUPLES_OK) { \
299     noitL(noit_error, "stratcon datasource bad row (%d): %s\n", \
300           rv, PQresultErrorMessage(res)); \
301     PQclear(res); \
302     goto bad_row; \
303   } \
304   PQclear(res); \
305 } while(0)
306
307   /* Now execute the query */
308   switch(type) {
309     case 'n':
310       GET_QUERY(config_insert);
311       PG_EXEC(config_insert);
312       break;
313     case 'C':
314       GET_QUERY(check_insert);
315       PG_EXEC(check_insert);
316       break;
317     case 'S':
318       GET_QUERY(status_insert);
319       PG_EXEC(status_insert);
320       break;
321     case 'M':
322       switch(d->metric_type) {
323         case METRIC_INT32:
324         case METRIC_UINT32:
325         case METRIC_INT64:
326         case METRIC_UINT64:
327         case METRIC_DOUBLE:
328           GET_QUERY(metric_insert_numeric);
329           PG_EXEC(metric_insert_numeric);
330           break;
331         case METRIC_STRING:
332           GET_QUERY(metric_insert_text);
333           PG_EXEC(metric_insert_text);
334           break;
335         default:
336           goto bad_row;
337       }
338       break;
339     default:
340       /* should never get here */
341       goto bad_row;
342   }
343   return DS_EXEC_SUCCESS;
344  bad_row:
345   return DS_EXEC_ROW_FAILED;
346 }
347 static int
348 stratcon_database_connect(conn_q *cq) {
349   char dsn[512];
350   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
351   const char *k, *v;
352   int klen;
353   noit_hash_table *t;
354
355   if(cq->dbh) {
356     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
357     PQreset(cq->dbh);
358     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
359     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
360           dsn, PQerrorMessage(cq->dbh));
361     return -1;
362   }
363
364   dsn[0] = '\0';
365   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
366   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
367     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
368     strlcat(dsn, k, sizeof(dsn));
369     strlcat(dsn, "=", sizeof(dsn));
370     strlcat(dsn, v, sizeof(dsn));
371   }
372   cq->dbh = PQconnectdb(dsn);
373   if(!cq->dbh) return -1;
374   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
375   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
376         dsn, PQerrorMessage(cq->dbh));
377   return -1;
378 }
379 static int
380 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
381                                 const char *name) {
382   int rv = -1;
383   PGresult *res;
384   char cmd[128];
385   strlcpy(cmd, p, sizeof(cmd));
386   strlcat(cmd, name, sizeof(cmd));
387   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
388   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
389   PQclear(res);
390   return rv;
391 }
392 static int
393 stratcon_datastore_do(conn_q *cq, const char *cmd) {
394   PGresult *res;
395   int rv = -1;
396   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
397   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
398   PQclear(res);
399   return rv;
400 }
401 #define BUSTED(cq) do { \
402   PQfinish((cq)->dbh); \
403   (cq)->dbh = NULL; \
404   goto full_monty; \
405 } while(0)
406 #define SAVEPOINT(name) do { \
407   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
408   last_sp = current; \
409 } while(0)
410 #define ROLLBACK_TO_SAVEPOINT(name) do { \
411   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
412     BUSTED(cq); \
413   last_sp = NULL; \
414 } while(0)
415 #define RELEASE_SAVEPOINT(name) do { \
416   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
417     BUSTED(cq); \
418   last_sp = NULL; \
419 } while(0)
420 int
421 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
422                                   struct timeval *now) {
423   conn_q *cq = closure;
424   ds_job_detail *current, *last_sp;
425   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
426
427   if(!cq->head) return 0;
428
429  full_monty:
430   /* Make sure we have a connection */
431   while(stratcon_database_connect(cq)) {
432     noitL(noit_error, "Error connecting to database\n");
433     sleep(1);
434   }
435
436   current = cq->head;
437   last_sp = NULL;
438   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
439   while(current) {
440     execute_outcome_t rv;
441     if(current->data) {
442       if(!last_sp) SAVEPOINT("batch");
443  
444       if(current->problematic) {
445         noitL(noit_error, "Failed noit line: %s", current->data);
446         RELEASE_SAVEPOINT("batch");
447         current = current->next;
448         continue;
449       }
450       rv = stratcon_datastore_execute(cq, cq->remote, current);
451       switch(rv) {
452         case DS_EXEC_SUCCESS:
453           current = current->next;
454           break;
455         case DS_EXEC_ROW_FAILED:
456           /* rollback to savepoint, mark this record as bad and start again */
457           current->problematic = 1;
458           current = last_sp;
459           ROLLBACK_TO_SAVEPOINT("batch");
460           break;
461         case DS_EXEC_TXN_FAILED:
462           BUSTED(cq);
463       }
464     }
465     if(current->completion_event) {
466       ds_job_detail *save_next;
467       if(last_sp) RELEASE_SAVEPOINT("batch");
468       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
469       eventer_add(current->completion_event);
470       current = current->next;
471       __remove_until(cq, current);
472     }
473   }
474   return 0;
475 }
476 void
477 stratcon_datastore_push(stratcon_datastore_op_t op,
478                         struct sockaddr *remote, void *operand) {
479   conn_q *cq;
480   eventer_t e;
481   ds_job_detail *dsjd;
482
483   cq = __get_conn_q_for_remote(remote);
484   dsjd = calloc(1, sizeof(*dsjd));
485   switch(op) {
486     case DS_OP_INSERT:
487       dsjd->data = operand;
488       __append(cq, dsjd);
489       break;
490     case DS_OP_CHKPT:
491       dsjd->completion_event = operand;
492       __append(cq,dsjd);
493       e = eventer_alloc();
494       e->mask = EVENTER_ASYNCH;
495       e->callback = stratcon_datastore_asynch_execute;
496       e->closure = cq;
497       eventer_add(e);
498       break;
499   }
500 }
501
502 int
503 stratcon_datastore_saveconfig(void *unused) {
504   int rv = -1;
505   conn_q _cq = { 0 }, *cq = &_cq;
506   char *buff;
507   ds_single_detail _d = { 0 }, *d = &_d;
508
509   if(stratcon_database_connect(cq) == 0) {
510     char time_as_str[20];
511     size_t len;
512     buff = noit_conf_xml_in_mem(&len);
513     if(!buff) goto bad_row;
514
515     snprintf(time_as_str, sizeof(time_as_str), "%lu", time(NULL));
516     DECLARE_PARAM_STR("0.0.0.0", 7);
517     DECLARE_PARAM_STR("stratcond", 9);
518     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
519     DECLARE_PARAM_STR(buff, len);
520     free(buff);
521
522     GET_QUERY(config_insert);
523     PG_EXEC(config_insert);
524     rv = 0;
525
526     bad_row:
527       free_params(d);
528   }
529   if(cq->dbh) PQfinish(cq->dbh);
530   return rv;
531 }
Note: See TracBrowser for help on using the browser.