root/src/stratcon_datastore.c

Revision 2bdd297562b53dbee111cfefd2101bb9eb99d036, 12.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

fixes #35

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "stratcon_datastore.h"
10 #include "noit_conf.h"
11 #include "noit_check.h"
12 #include <unistd.h>
13 #include <netinet/in.h>
14 #include <sys/un.h>
15 #include <arpa/inet.h>
16 #include <libpq-fe.h>
17
18 static char *check_insert = NULL;
19 static const char *check_insert_conf = "/stratcon/database/statements/check";
20 static char *status_insert = NULL;
21 static const char *status_insert_conf = "/stratcon/database/statements/status";
22 static char *metric_insert_numeric = NULL;
23 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
24 static char *metric_insert_text = NULL;
25 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
26
27 #define GET_QUERY(a) do { \
28   if(a == NULL) \
29     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
30       goto bad_row; \
31 } while(0)
32
33 #define MAX_PARAMS 8
34 typedef struct ds_job_detail {
35   char *data;  /* The raw string, NULL means the stream is done -- commit. */
36   int problematic;
37   eventer_t completion_event; /* This event should be registered if non NULL */
38   struct ds_job_detail *next;
39
40   /* Postgres specific stuff */
41   int nparams;
42   int metric_type;
43   char *paramValues[MAX_PARAMS];
44   int paramLengths[MAX_PARAMS];
45   int paramFormats[MAX_PARAMS];
46   int paramAllocd[MAX_PARAMS];
47 } ds_job_detail;
48
49 typedef struct {
50   struct sockaddr *remote;
51   eventer_jobq_t  *jobq;
52   /* Postgres specific stuff */
53   PGconn          *dbh;
54   ds_job_detail   *head;
55   ds_job_detail   *tail;
56 } conn_q;
57
58 static void
59 free_params(ds_job_detail *d) {
60   int i;
61   for(i=0; i<d->nparams; i++)
62     if(d->paramAllocd[i] && d->paramValues[i])
63       free(d->paramValues[i]);
64 }
65
66 static void
67 __append(conn_q *q, ds_job_detail *d) {
68   d->next = NULL;
69   if(!q->head) q->head = q->tail = d;
70   else {
71     q->tail->next = d;
72     q->tail = d;
73   }
74 }
75 static void
76 __remove_until(conn_q *q, ds_job_detail *d) {
77   ds_job_detail *next;
78   while(q->head && q->head != d) {
79     next = q->head;
80     q->head = q->head->next;
81     free_params(next);
82     if(next->data) free(next->data);
83     free(next);
84   }
85   if(!q->head) q->tail = NULL;
86 }
87
88 noit_hash_table ds_conns;
89
90 conn_q *
91 __get_conn_q_for_remote(struct sockaddr *remote) {
92   conn_q *cq;
93   int len = 0;
94   switch(remote->sa_family) {
95     case AF_INET: len = sizeof(struct sockaddr_in); break;
96     case AF_INET6: len = sizeof(struct sockaddr_in6); break;
97     case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break;
98     default: return NULL;
99   }
100   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq))
101     return cq;
102   cq = calloc(1, sizeof(*cq));
103   cq->remote = malloc(len);
104   memcpy(cq->remote, remote, len);
105   cq->jobq = calloc(1, sizeof(*cq->jobq));
106   eventer_jobq_init(cq->jobq);
107   cq->jobq->backq = eventer_default_backq();
108   /* Add one thread */
109   eventer_jobq_increase_concurrency(cq->jobq);
110   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
111   return cq;
112 }
113
114 typedef enum {
115   DS_EXEC_SUCCESS = 0,
116   DS_EXEC_ROW_FAILED = 1,
117   DS_EXEC_TXN_FAILED = 2,
118 } execute_outcome_t;
119
120 static char *
121 __noit__strndup(const char *src, int len) {
122   char *dst;
123   dst = malloc(len + 1);
124   strlcpy(dst, src, len+1);
125   return dst;
126 }
127 #define DECLARE_PARAM_STR(str, len) do { \
128   d->paramValues[d->nparams] = __noit__strndup(str, len); \
129   d->paramLengths[d->nparams] = len; \
130   d->paramFormats[d->nparams] = 0; \
131   d->paramAllocd[d->nparams] = 1; \
132   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
133     free(d->paramValues[d->nparams]); \
134     d->paramValues[d->nparams] = NULL; \
135     d->paramLengths[d->nparams] = 0; \
136     d->paramAllocd[d->nparams] = 0; \
137   } \
138   d->nparams++; \
139 } while(0)
140
141 execute_outcome_t
142 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
143   int type, len;
144   char *token;
145
146   type = d->data[0];
147
148   /* Parse the log line, but only if we haven't already */
149   if(!d->nparams) {
150     char raddr[128];
151     char *scp, *ecp;
152
153     /* setup our remote address */
154     raddr[0] = '\0';
155     switch(r->sa_family) {
156       case AF_INET:
157         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr),
158                   raddr, sizeof(raddr));
159         break;
160       case AF_INET6:
161         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr),
162                   raddr, sizeof(raddr));
163         break;
164       default:
165         noitL(noit_error, "remote address of family %d\n", r->sa_family);
166     }
167  
168     scp = d->data;
169 #define PROCESS_NEXT_FIELD(t,l) do { \
170   if(!*scp) goto bad_row; \
171   ecp = strchr(scp, '\t'); \
172   if(!ecp) goto bad_row; \
173   token = scp; \
174   len = (ecp-scp); \
175   scp = ecp + 1; \
176 } while(0)
177 #define PROCESS_LAST_FIELD(t,l) do { \
178   if(!*scp) ecp = scp; \
179   else { \
180     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
181     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
182   } \
183   t = scp; \
184   l = (ecp-scp); \
185 } while(0)
186
187     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
188     switch(type) {
189       /* See noit_check_log.c for log description */
190       case 'C':
191         DECLARE_PARAM_STR(raddr, strlen(raddr));
192         PROCESS_NEXT_FIELD(token,len);
193         DECLARE_PARAM_STR(token,len); /* timestamp */
194         PROCESS_NEXT_FIELD(token, len);
195         DECLARE_PARAM_STR(token,len); /* uuid */
196         PROCESS_NEXT_FIELD(token, len);
197         DECLARE_PARAM_STR(token,len); /* target */
198         PROCESS_NEXT_FIELD(token, len);
199         DECLARE_PARAM_STR(token,len); /* module */
200         PROCESS_LAST_FIELD(token, len);
201         DECLARE_PARAM_STR(token,len); /* name */
202         break;
203       case 'M':
204         PROCESS_NEXT_FIELD(token,len);
205         DECLARE_PARAM_STR(token,len); /* timestamp */
206         PROCESS_NEXT_FIELD(token, len);
207         DECLARE_PARAM_STR(token,len); /* uuid */
208         PROCESS_NEXT_FIELD(token, len);
209         DECLARE_PARAM_STR(token,len); /* name */
210         PROCESS_NEXT_FIELD(token,len);
211         d->metric_type = *token;
212         PROCESS_LAST_FIELD(token,len);
213         DECLARE_PARAM_STR(token,len); /* value */
214         break;
215       case 'S':
216         PROCESS_NEXT_FIELD(token,len);
217         DECLARE_PARAM_STR(token,len); /* timestamp */
218         PROCESS_NEXT_FIELD(token, len);
219         DECLARE_PARAM_STR(token,len); /* uuid */
220         PROCESS_NEXT_FIELD(token, len);
221         DECLARE_PARAM_STR(token,len); /* state */
222         PROCESS_NEXT_FIELD(token, len);
223         DECLARE_PARAM_STR(token,len); /* availability */
224         PROCESS_NEXT_FIELD(token, len);
225         DECLARE_PARAM_STR(token,len); /* duration */
226         PROCESS_LAST_FIELD(token,len);
227         DECLARE_PARAM_STR(token,len); /* status */
228         break;
229       default:
230         goto bad_row;
231     }
232
233   }
234
235 #define PG_EXEC(cmd) do { \
236   PGresult *res; \
237   int rv; \
238   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
239                      (const char * const *)d->paramValues, \
240                      d->paramLengths, d->paramFormats, 0); \
241   rv = PQresultStatus(res); \
242   if(rv != PGRES_COMMAND_OK) { \
243     noitL(noit_error, "stratcon datasource bad row: %s\n", \
244           PQresultErrorMessage(res)); \
245     PQclear(res); \
246     goto bad_row; \
247   } \
248   PQclear(res); \
249 } while(0)
250
251   /* Now execute the query */
252   switch(type) {
253     case 'C':
254       GET_QUERY(check_insert);
255       PG_EXEC(check_insert);
256       break;
257     case 'S':
258       GET_QUERY(status_insert);
259       PG_EXEC(status_insert);
260       break;
261     case 'M':
262       switch(d->metric_type) {
263         case METRIC_INT32:
264         case METRIC_UINT32:
265         case METRIC_INT64:
266         case METRIC_UINT64:
267         case METRIC_DOUBLE:
268           GET_QUERY(metric_insert_numeric);
269           PG_EXEC(metric_insert_numeric);
270           break;
271         case METRIC_STRING:
272           GET_QUERY(metric_insert_text);
273           PG_EXEC(metric_insert_text);
274           break;
275         default:
276           goto bad_row;
277       }
278       break;
279     default:
280       /* should never get here */
281       goto bad_row;
282   }
283   return DS_EXEC_SUCCESS;
284  bad_row:
285   return DS_EXEC_ROW_FAILED;
286 }
287 static int
288 stratcon_database_connect(conn_q *cq) {
289   char dsn[512];
290   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
291   const char *k, *v;
292   int klen;
293   noit_hash_table *t;
294
295   if(cq->dbh) {
296     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
297     PQreset(cq->dbh);
298     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
299     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
300           dsn, PQerrorMessage(cq->dbh));
301     return -1;
302   }
303
304   dsn[0] = '\0';
305   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
306   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
307     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
308     strlcat(dsn, k, sizeof(dsn));
309     strlcat(dsn, "=", sizeof(dsn));
310     strlcat(dsn, v, sizeof(dsn));
311   }
312   cq->dbh = PQconnectdb(dsn);
313   if(!cq->dbh) return -1;
314   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
315   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
316         dsn, PQerrorMessage(cq->dbh));
317   return -1;
318 }
319 static int
320 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
321                                 const char *name) {
322   int rv = -1;
323   PGresult *res;
324   char cmd[128];
325   strlcpy(cmd, p, sizeof(cmd));
326   strlcat(cmd, name, sizeof(cmd));
327   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
328   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
329   PQclear(res);
330   return rv;
331 }
332 static int
333 stratcon_datastore_do(conn_q *cq, const char *cmd) {
334   PGresult *res;
335   int rv = -1;
336   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
337   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
338   PQclear(res);
339   return rv;
340 }
341 #define BUSTED(cq) do { \
342   PQfinish((cq)->dbh); \
343   (cq)->dbh = NULL; \
344   goto full_monty; \
345 } while(0)
346 #define SAVEPOINT(name) do { \
347   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
348   last_sp = current; \
349 } while(0)
350 #define ROLLBACK_TO_SAVEPOINT(name) do { \
351   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
352     BUSTED(cq); \
353   last_sp = NULL; \
354 } while(0)
355 #define RELEASE_SAVEPOINT(name) do { \
356   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
357     BUSTED(cq); \
358   last_sp = NULL; \
359 } while(0)
360 int
361 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
362                                   struct timeval *now) {
363   conn_q *cq = closure;
364   ds_job_detail *current, *last_sp;
365   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
366
367   if(!cq->head) return 0;
368
369  full_monty:
370   /* Make sure we have a connection */
371   while(stratcon_database_connect(cq)) {
372     noitL(noit_error, "Error connecting to database\n");
373     sleep(1);
374   }
375
376   current = cq->head;
377   last_sp = NULL;
378   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
379   while(current) {
380     execute_outcome_t rv;
381     if(current->data) {
382       if(!last_sp) SAVEPOINT("batch");
383  
384       if(current->problematic) {
385         noitL(noit_error, "Failed noit line: %s", current->data);
386         RELEASE_SAVEPOINT("batch");
387         current = current->next;
388         continue;
389       }
390       rv = stratcon_datastore_execute(cq, cq->remote, current);
391       switch(rv) {
392         case DS_EXEC_SUCCESS:
393           current = current->next;
394           break;
395         case DS_EXEC_ROW_FAILED:
396           /* rollback to savepoint, mark this record as bad and start again */
397           current->problematic = 1;
398           current = last_sp;
399           ROLLBACK_TO_SAVEPOINT("batch");
400           break;
401         case DS_EXEC_TXN_FAILED:
402           BUSTED(cq);
403       }
404     }
405     if(current->completion_event) {
406       if(last_sp) RELEASE_SAVEPOINT("batch");
407       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
408       eventer_add(current->completion_event);
409       __remove_until(cq, current->next);
410       current = current->next;
411     }
412   }
413   return 0;
414 }
415 void
416 stratcon_datastore_push(stratcon_datastore_op_t op,
417                         struct sockaddr *remote, void *operand) {
418   conn_q *cq;
419   eventer_t e;
420   ds_job_detail *dsjd;
421
422   cq = __get_conn_q_for_remote(remote);
423   dsjd = calloc(1, sizeof(*dsjd));
424   switch(op) {
425     case DS_OP_INSERT:
426       dsjd->data = operand;
427       __append(cq, dsjd);
428       break;
429     case DS_OP_CHKPT:
430       dsjd->completion_event = operand;
431       __append(cq,dsjd);
432       e = eventer_alloc();
433       e->mask = EVENTER_ASYNCH;
434       e->callback = stratcon_datastore_asynch_execute;
435       e->closure = cq;
436       eventer_add(e);
437       break;
438   }
439 }
Note: See TracBrowser for help on using the browser.