root/src/stratcon_datastore.c

Revision 6a3e5107c8ce5efbdacc6ba2b7a4364ab3ff1881, 12.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fixes #23

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "stratcon_datastore.h"
10 #include "noit_conf.h"
11 #include "noit_check.h"
12 #include <unistd.h>
13 #include <netinet/in.h>
14 #include <sys/un.h>
15 #include <arpa/inet.h>
16 #include <libpq-fe.h>
17
18 static char *check_insert = NULL;
19 static const char *check_insert_conf = "/stratcon/database/statements/check";
20 static char *status_insert = NULL;
21 static const char *status_insert_conf = "/stratcon/database/statements/status";
22 static char *metric_insert_numeric = NULL;
23 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
24 static char *metric_insert_text = NULL;
25 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
26
27 #define GET_QUERY(a) do { \
28   if(a == NULL) \
29     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
30       goto bad_row; \
31 } while(0)
32
33 #define MAX_PARAMS 8
34 typedef struct ds_job_detail {
35   char *data;  /* The raw string, NULL means the stream is done -- commit. */
36   int problematic;
37   eventer_t completion_event; /* This event should be registered if non NULL */
38   struct ds_job_detail *next;
39
40   /* Postgres specific stuff */
41   int nparams;
42   int metric_type;
43   char *paramValues[MAX_PARAMS];
44   int paramLengths[MAX_PARAMS];
45   int paramFormats[MAX_PARAMS];
46   int paramAllocd[MAX_PARAMS];
47 } ds_job_detail;
48
49 typedef struct {
50   struct sockaddr *remote;
51   eventer_jobq_t  *jobq;
52   /* Postgres specific stuff */
53   PGconn          *dbh;
54   ds_job_detail   *head;
55   ds_job_detail   *tail;
56 } conn_q;
57
58 static void
59 free_params(ds_job_detail *d) {
60   int i;
61   for(i=0; i<d->nparams; i++)
62     if(d->paramAllocd[i] && d->paramValues[i])
63       free(d->paramValues[i]);
64 }
65
66 static void
67 __append(conn_q *q, ds_job_detail *d) {
68   d->next = NULL;
69   if(!q->head) q->head = q->tail = d;
70   else {
71     q->tail->next = d;
72     q->tail = d;
73   }
74 }
75 static void
76 __remove_until(conn_q *q, ds_job_detail *d) {
77   ds_job_detail *next;
78   while(q->head && q->head != d) {
79     next = q->head;
80     q->head = q->head->next;
81     free_params(next);
82     if(next->data) free(next->data);
83     free(next);
84   }
85   if(!q->head) q->tail = NULL;
86 }
87
88 noit_hash_table ds_conns;
89
90 conn_q *
91 __get_conn_q_for_remote(struct sockaddr *remote) {
92   conn_q *cq;
93   int len = 0;
94   switch(remote->sa_family) {
95     case AF_INET: len = sizeof(struct sockaddr_in); break;
96     case AF_INET6: len = sizeof(struct sockaddr_in6); break;
97     case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break;
98     default: return NULL;
99   }
100   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq))
101     return cq;
102   cq = calloc(1, sizeof(*cq));
103   cq->remote = malloc(len);
104   memcpy(cq->remote, remote, len);
105   cq->jobq = calloc(1, sizeof(*cq->jobq));
106   eventer_jobq_init(cq->jobq);
107   cq->jobq->backq = eventer_default_backq();
108   /* Add one thread */
109   eventer_jobq_increase_concurrency(cq->jobq);
110   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
111   return cq;
112 }
113
114 typedef enum {
115   DS_EXEC_SUCCESS = 0,
116   DS_EXEC_ROW_FAILED = 1,
117   DS_EXEC_TXN_FAILED = 2,
118 } execute_outcome_t;
119
120 static char *
121 __strndup(const char *src, int len) {
122   char *dst;
123   dst = malloc(len + 1);
124   strlcpy(dst, src, len+1);
125   return dst;
126 }
127 #define DECLARE_PARAM_STR(str, len) do { \
128   d->paramValues[d->nparams] = __strndup(str, len); \
129   d->paramLengths[d->nparams] = len; \
130   d->paramFormats[d->nparams] = 0; \
131   d->paramAllocd[d->nparams] = 1; \
132   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
133     free(d->paramValues[d->nparams]); \
134     d->paramValues[d->nparams] = NULL; \
135     d->paramLengths[d->nparams] = 0; \
136     d->paramAllocd[d->nparams] = 0; \
137   } \
138   d->nparams++; \
139 } while(0)
140
141 execute_outcome_t
142 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
143   int type, len;
144   char *token;
145
146   type = d->data[0];
147
148   /* Parse the log line, but only if we haven't already */
149   if(!d->nparams) {
150     char raddr[128];
151     char *scp, *ecp;
152
153     /* setup our remote address */
154     raddr[0] = '\0';
155     switch(r->sa_family) {
156       case AF_INET:
157         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
158                   raddr, sizeof(raddr));
159         break;
160       case AF_INET6:
161         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
162                   raddr, sizeof(raddr));
163         break;
164     }
165  
166     scp = d->data;
167 #define PROCESS_NEXT_FIELD(t,l) do { \
168   if(!*scp) goto bad_row; \
169   ecp = strchr(scp, '\t'); \
170   if(!ecp) goto bad_row; \
171   token = scp; \
172   len = (ecp-scp); \
173   scp = ecp + 1; \
174 } while(0)
175 #define PROCESS_LAST_FIELD(t,l) do { \
176   if(!*scp) ecp = scp; \
177   else { \
178     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
179     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
180   } \
181   t = scp; \
182   l = (ecp-scp); \
183 } while(0)
184
185     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
186     switch(type) {
187       /* See noit_check_log.c for log description */
188       case 'C':
189         DECLARE_PARAM_STR(raddr, strlen(raddr));
190         PROCESS_NEXT_FIELD(token,len);
191         DECLARE_PARAM_STR(token,len); /* timestamp */
192         PROCESS_NEXT_FIELD(token, len);
193         DECLARE_PARAM_STR(token,len); /* uuid */
194         PROCESS_NEXT_FIELD(token, len);
195         DECLARE_PARAM_STR(token,len); /* target */
196         PROCESS_NEXT_FIELD(token, len);
197         DECLARE_PARAM_STR(token,len); /* module */
198         PROCESS_LAST_FIELD(token, len);
199         DECLARE_PARAM_STR(token,len); /* name */
200         break;
201       case 'M':
202         PROCESS_NEXT_FIELD(token,len);
203         DECLARE_PARAM_STR(token,len); /* timestamp */
204         PROCESS_NEXT_FIELD(token, len);
205         DECLARE_PARAM_STR(token,len); /* uuid */
206         PROCESS_NEXT_FIELD(token, len);
207         DECLARE_PARAM_STR(token,len); /* name */
208         PROCESS_NEXT_FIELD(token,len);
209         d->metric_type = *token;
210         PROCESS_LAST_FIELD(token,len);
211         DECLARE_PARAM_STR(token,len); /* value */
212         break;
213       case 'S':
214         PROCESS_NEXT_FIELD(token,len);
215         DECLARE_PARAM_STR(token,len); /* timestamp */
216         PROCESS_NEXT_FIELD(token, len);
217         DECLARE_PARAM_STR(token,len); /* uuid */
218         PROCESS_NEXT_FIELD(token, len);
219         DECLARE_PARAM_STR(token,len); /* state */
220         PROCESS_NEXT_FIELD(token, len);
221         DECLARE_PARAM_STR(token,len); /* availability */
222         PROCESS_NEXT_FIELD(token, len);
223         DECLARE_PARAM_STR(token,len); /* duration */
224         PROCESS_LAST_FIELD(token,len);
225         DECLARE_PARAM_STR(token,len); /* status */
226         break;
227       default:
228         goto bad_row;
229     }
230
231   }
232
233 #define PG_EXEC(cmd) do { \
234   PGresult *res; \
235   int rv; \
236   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
237                      (const char * const *)d->paramValues, \
238                      d->paramLengths, d->paramFormats, 0); \
239   rv = PQresultStatus(res); \
240   if(rv != PGRES_COMMAND_OK) { \
241     noitL(noit_error, "stratcon datasource bad row: %s\n", \
242           PQresultErrorMessage(res)); \
243     PQclear(res); \
244     goto bad_row; \
245   } \
246   PQclear(res); \
247 } while(0)
248
249   /* Now execute the query */
250   switch(type) {
251     case 'C':
252       GET_QUERY(check_insert);
253       PG_EXEC(check_insert);
254       break;
255     case 'S':
256       GET_QUERY(status_insert);
257       PG_EXEC(status_insert);
258       break;
259     case 'M':
260       switch(d->metric_type) {
261         case METRIC_INT32:
262         case METRIC_UINT32:
263         case METRIC_INT64:
264         case METRIC_UINT64:
265         case METRIC_DOUBLE:
266           GET_QUERY(metric_insert_numeric);
267           PG_EXEC(metric_insert_numeric);
268           break;
269         case METRIC_STRING:
270           GET_QUERY(metric_insert_text);
271           PG_EXEC(metric_insert_text);
272           break;
273         default:
274           goto bad_row;
275       }
276       break;
277     default:
278       /* should never get here */
279       goto bad_row;
280   }
281   return DS_EXEC_SUCCESS;
282  bad_row:
283   return DS_EXEC_ROW_FAILED;
284 }
285 static int
286 stratcon_database_connect(conn_q *cq) {
287   char dsn[512];
288   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
289   const char *k, *v;
290   int klen;
291   noit_hash_table *t;
292
293   if(cq->dbh) {
294     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
295     PQreset(cq->dbh);
296     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
297     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
298           dsn, PQerrorMessage(cq->dbh));
299     return -1;
300   }
301
302   dsn[0] = '\0';
303   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
304   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
305     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
306     strlcat(dsn, k, sizeof(dsn));
307     strlcat(dsn, "=", sizeof(dsn));
308     strlcat(dsn, v, sizeof(dsn));
309   }
310   cq->dbh = PQconnectdb(dsn);
311   if(!cq->dbh) return -1;
312   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
313   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
314         dsn, PQerrorMessage(cq->dbh));
315   return -1;
316 }
317 static int
318 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
319                                 const char *name) {
320   int rv = -1;
321   PGresult *res;
322   char cmd[128];
323   strlcpy(cmd, p, sizeof(cmd));
324   strlcat(cmd, name, sizeof(cmd));
325   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
326   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
327   PQclear(res);
328   return rv;
329 }
330 static int
331 stratcon_datastore_do(conn_q *cq, const char *cmd) {
332   PGresult *res;
333   int rv = -1;
334   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
335   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
336   PQclear(res);
337   return rv;
338 }
339 #define BUSTED(cq) do { \
340   PQfinish((cq)->dbh); \
341   (cq)->dbh = NULL; \
342   goto full_monty; \
343 } while(0)
344 #define SAVEPOINT(name) do { \
345   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
346   last_sp = current; \
347 } while(0)
348 #define ROLLBACK_TO_SAVEPOINT(name) do { \
349   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
350     BUSTED(cq); \
351   last_sp = NULL; \
352 } while(0)
353 #define RELEASE_SAVEPOINT(name) do { \
354   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
355     BUSTED(cq); \
356   last_sp = NULL; \
357 } while(0)
358 int
359 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
360                                   struct timeval *now) {
361   conn_q *cq = closure;
362   ds_job_detail *current, *last_sp;
363   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
364
365   if(!cq->head) return 0;
366
367  full_monty:
368   /* Make sure we have a connection */
369   while(stratcon_database_connect(cq)) {
370     noitL(noit_error, "Error connecting to database\n");
371     sleep(1);
372   }
373
374   current = cq->head;
375   last_sp = NULL;
376   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
377   while(current) {
378     execute_outcome_t rv;
379     if(current->data) {
380       if(!last_sp) SAVEPOINT("batch");
381  
382       if(current->problematic) {
383         noitL(noit_error, "Failed noit line: %s", current->data);
384         RELEASE_SAVEPOINT("batch");
385         current = current->next;
386         continue;
387       }
388       rv = stratcon_datastore_execute(cq, cq->remote, current);
389       switch(rv) {
390         case DS_EXEC_SUCCESS:
391           current = current->next;
392           break;
393         case DS_EXEC_ROW_FAILED:
394           /* rollback to savepoint, mark this record as bad and start again */
395           current->problematic = 1;
396           current = last_sp;
397           ROLLBACK_TO_SAVEPOINT("batch");
398           break;
399         case DS_EXEC_TXN_FAILED:
400           BUSTED(cq);
401       }
402     }
403     if(current->completion_event) {
404       if(last_sp) RELEASE_SAVEPOINT("batch");
405       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
406       eventer_add(current->completion_event);
407       __remove_until(cq, current->next);
408       current = current->next;
409     }
410   }
411   return 0;
412 }
413 void
414 stratcon_datastore_push(stratcon_datastore_op_t op,
415                         struct sockaddr *remote, void *operand) {
416   conn_q *cq;
417   eventer_t e;
418   ds_job_detail *dsjd;
419
420   cq = __get_conn_q_for_remote(remote);
421   dsjd = calloc(1, sizeof(*dsjd));
422   switch(op) {
423     case DS_OP_INSERT:
424       dsjd->data = operand;
425       __append(cq, dsjd);
426       break;
427     case DS_OP_CHKPT:
428       dsjd->completion_event = operand;
429       __append(cq,dsjd);
430       e = eventer_alloc();
431       e->mask = EVENTER_ASYNCH;
432       e->callback = stratcon_datastore_asynch_execute;
433       e->closure = cq;
434       eventer_add(e);
435       break;
436   }
437 }
Note: See TracBrowser for help on using the browser.