root/src/stratcon_datastore.c

Revision bce2dd5b1ebb5d3628562c5f487797d4d7da1b46, 11.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fix this up to work

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "stratcon_datastore.h"
10 #include "noit_conf.h"
11 #include "noit_check.h"
12 #include <unistd.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <libpq-fe.h>
16
17 static char *check_insert = NULL;
18 static const char *check_insert_conf = "/stratcon/database/statements/check";
19 static char *status_insert = NULL;
20 static const char *status_insert_conf = "/stratcon/database/statements/status";
21 static char *metric_insert_numeric = NULL;
22 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
23 static char *metric_insert_text = NULL;
24 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
25
26 #define GET_QUERY(a) do { \
27   if(a == NULL) \
28     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
29       goto bad_row; \
30 } while(0)
31
32 #define MAX_PARAMS 8
33 typedef struct ds_job_detail {
34   char *data;  /* The raw string, NULL means the stream is done -- commit. */
35   int problematic;
36   eventer_t completion_event; /* This event should be registered if non NULL */
37   struct ds_job_detail *next;
38
39   /* Postgres specific stuff */
40   int nparams;
41   int metric_type;
42   char *paramValues[MAX_PARAMS];
43   int paramLengths[MAX_PARAMS];
44   int paramFormats[MAX_PARAMS];
45   int paramAllocd[MAX_PARAMS];
46 } ds_job_detail;
47
48 typedef struct {
49   struct sockaddr *remote;
50   eventer_jobq_t  *jobq;
51   /* Postgres specific stuff */
52   PGconn          *dbh;
53   ds_job_detail   *head;
54   ds_job_detail   *tail;
55 } conn_q;
56
57 static void
58 free_params(ds_job_detail *d) {
59   int i;
60   for(i=0; i<d->nparams; i++)
61     if(d->paramAllocd[i] && d->paramValues[i])
62       free(d->paramValues[i]);
63 }
64
65 static void
66 __append(conn_q *q, ds_job_detail *d) {
67   d->next = NULL;
68   if(!q->head) q->head = q->tail = d;
69   else {
70     q->tail->next = d;
71     q->tail = d;
72   }
73 }
74 static void
75 __remove_until(conn_q *q, ds_job_detail *d) {
76   ds_job_detail *next;
77   while(q->head && q->head != d) {
78     next = q->head;
79     q->head = q->head->next;
80     free_params(next);
81     if(next->data) free(next->data);
82     free(next);
83   }
84   if(!q->head) q->tail = NULL;
85 }
86
87 noit_hash_table ds_conns;
88
89 conn_q *
90 __get_conn_q_for_remote(struct sockaddr *remote) {
91   conn_q *cq;
92   if(noit_hash_retrieve(&ds_conns, (const char *)remote, remote->sa_len,
93                         (void **)&cq))
94     return cq;
95   cq = calloc(1, sizeof(*cq));
96   cq->remote = malloc(remote->sa_len);
97   memcpy(cq->remote, remote, remote->sa_len);
98   cq->jobq = calloc(1, sizeof(*cq->jobq));
99   eventer_jobq_init(cq->jobq);
100   cq->jobq->backq = eventer_default_backq();
101   /* Add one thread */
102   eventer_jobq_increase_concurrency(cq->jobq);
103   noit_hash_store(&ds_conns, (const char *)cq->remote, cq->remote->sa_len, cq);
104   return cq;
105 }
106
107 typedef enum {
108   DS_EXEC_SUCCESS = 0,
109   DS_EXEC_ROW_FAILED = 1,
110   DS_EXEC_TXN_FAILED = 2,
111 } execute_outcome_t;
112
113 static char *
114 __strndup(const char *src, int len) {
115   char *dst;
116   dst = malloc(len + 1);
117   strlcpy(dst, src, len+1);
118   return dst;
119 }
120 #define DECLARE_PARAM_STR(str, len) do { \
121   d->paramValues[d->nparams] = __strndup(str, len); \
122   d->paramLengths[d->nparams] = len; \
123   d->paramFormats[d->nparams] = 0; \
124   d->paramAllocd[d->nparams] = 1; \
125   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
126     free(d->paramValues[d->nparams]); \
127     d->paramValues[d->nparams] = NULL; \
128     d->paramLengths[d->nparams] = 0; \
129     d->paramAllocd[d->nparams] = 0; \
130   } \
131   d->nparams++; \
132 } while(0)
133
134 execute_outcome_t
135 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
136   int type, len;
137   char *token;
138
139   type = d->data[0];
140
141   /* Parse the log line, but only if we haven't already */
142   if(!d->nparams) {
143     struct sockaddr_in6 *rin6 = (struct sockaddr_in6 *)r;
144     char raddr[128];
145     char *scp, *ecp;
146     if(inet_ntop(rin6->sin6_family, &rin6->sin6_addr,
147                  raddr, sizeof(raddr)) == NULL)
148       raddr[0] = '\0';
149  
150     scp = d->data;
151 #define PROCESS_NEXT_FIELD(t,l) do { \
152   if(!*scp) goto bad_row; \
153   ecp = strchr(scp, '\t'); \
154   if(!ecp) goto bad_row; \
155   token = scp; \
156   len = (ecp-scp); \
157   scp = ecp + 1; \
158 } while(0)
159 #define PROCESS_LAST_FIELD(t,l) do { \
160   if(!*scp) ecp = scp; \
161   else { \
162     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
163     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
164   } \
165   t = scp; \
166   l = (ecp-scp); \
167 } while(0)
168
169     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
170     switch(type) {
171       /* See noit_check_log.c for log description */
172       case 'C':
173         DECLARE_PARAM_STR(raddr, strlen(raddr));
174         PROCESS_NEXT_FIELD(token,len);
175         DECLARE_PARAM_STR(token,len); /* timestamp */
176         PROCESS_NEXT_FIELD(token, len);
177         DECLARE_PARAM_STR(token,len); /* uuid */
178         PROCESS_NEXT_FIELD(token, len);
179         DECLARE_PARAM_STR(token,len); /* target */
180         PROCESS_NEXT_FIELD(token, len);
181         DECLARE_PARAM_STR(token,len); /* module */
182         PROCESS_LAST_FIELD(token, len);
183         DECLARE_PARAM_STR(token,len); /* name */
184         break;
185       case 'M':
186         PROCESS_NEXT_FIELD(token,len);
187         DECLARE_PARAM_STR(token,len); /* timestamp */
188         PROCESS_NEXT_FIELD(token, len);
189         DECLARE_PARAM_STR(token,len); /* uuid */
190         PROCESS_NEXT_FIELD(token, len);
191         DECLARE_PARAM_STR(token,len); /* name */
192         PROCESS_NEXT_FIELD(token,len);
193         d->metric_type = *token;
194         PROCESS_LAST_FIELD(token,len);
195         DECLARE_PARAM_STR(token,len); /* value */
196         break;
197       case 'S':
198         PROCESS_NEXT_FIELD(token,len);
199         DECLARE_PARAM_STR(token,len); /* timestamp */
200         PROCESS_NEXT_FIELD(token, len);
201         DECLARE_PARAM_STR(token,len); /* uuid */
202         PROCESS_NEXT_FIELD(token, len);
203         DECLARE_PARAM_STR(token,len); /* state */
204         PROCESS_NEXT_FIELD(token, len);
205         DECLARE_PARAM_STR(token,len); /* availability */
206         PROCESS_NEXT_FIELD(token, len);
207         DECLARE_PARAM_STR(token,len); /* duration */
208         PROCESS_LAST_FIELD(token,len);
209         DECLARE_PARAM_STR(token,len); /* status */
210         break;
211       default:
212         goto bad_row;
213     }
214
215   }
216
217 #define PG_EXEC(cmd) do { \
218   PGresult *res; \
219   int rv; \
220   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
221                      (const char * const *)d->paramValues, \
222                      d->paramLengths, d->paramFormats, 0); \
223   rv = PQresultStatus(res); \
224   if(rv != PGRES_COMMAND_OK) { \
225     noitL(noit_error, "stratcon datasource bad row: %s\n", \
226           PQresultErrorMessage(res)); \
227     PQclear(res); \
228     goto bad_row; \
229   } \
230   PQclear(res); \
231 } while(0)
232
233   /* Now execute the query */
234   switch(type) {
235     case 'C':
236       GET_QUERY(check_insert);
237       PG_EXEC(check_insert);
238       break;
239     case 'S':
240       GET_QUERY(status_insert);
241       PG_EXEC(status_insert);
242       break;
243     case 'M':
244       switch(d->metric_type) {
245         case METRIC_INT32:
246         case METRIC_UINT32:
247         case METRIC_INT64:
248         case METRIC_UINT64:
249         case METRIC_DOUBLE:
250           GET_QUERY(metric_insert_numeric);
251           PG_EXEC(metric_insert_numeric);
252           break;
253         case METRIC_STRING:
254           GET_QUERY(metric_insert_text);
255           PG_EXEC(metric_insert_text);
256           break;
257         default:
258           goto bad_row;
259       }
260       break;
261     default:
262       /* should never get here */
263       goto bad_row;
264   }
265   return DS_EXEC_SUCCESS;
266  bad_row:
267   return DS_EXEC_ROW_FAILED;
268 }
269 static int
270 stratcon_database_connect(conn_q *cq) {
271   char dsn[512];
272   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
273   const char *k, *v;
274   int klen;
275   noit_hash_table *t;
276
277   if(cq->dbh) {
278     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
279     PQreset(cq->dbh);
280     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
281     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
282           dsn, PQerrorMessage(cq->dbh));
283     return -1;
284   }
285
286   dsn[0] = '\0';
287   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
288   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
289     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
290     strlcat(dsn, k, sizeof(dsn));
291     strlcat(dsn, "=", sizeof(dsn));
292     strlcat(dsn, v, sizeof(dsn));
293   }
294   cq->dbh = PQconnectdb(dsn);
295   if(!cq->dbh) return -1;
296   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
297   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
298         dsn, PQerrorMessage(cq->dbh));
299   return -1;
300 }
301 static int
302 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
303                                 const char *name) {
304   int rv = -1;
305   PGresult *res;
306   char cmd[128];
307   strlcpy(cmd, p, sizeof(cmd));
308   strlcat(cmd, name, sizeof(cmd));
309   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
310   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
311   PQclear(res);
312   return rv;
313 }
314 static int
315 stratcon_datastore_do(conn_q *cq, const char *cmd) {
316   PGresult *res;
317   int rv = -1;
318   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
319   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
320   PQclear(res);
321   return rv;
322 }
323 #define BUSTED(cq) do { \
324   PQfinish((cq)->dbh); \
325   (cq)->dbh = NULL; \
326   goto full_monty; \
327 } while(0)
328 #define SAVEPOINT(name) do { \
329   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
330   last_sp = current; \
331 } while(0)
332 #define ROLLBACK_TO_SAVEPOINT(name) do { \
333   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
334     BUSTED(cq); \
335   last_sp = NULL; \
336 } while(0)
337 #define RELEASE_SAVEPOINT(name) do { \
338   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
339     BUSTED(cq); \
340   last_sp = NULL; \
341 } while(0)
342 int
343 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
344                                   struct timeval *now) {
345   conn_q *cq = closure;
346   ds_job_detail *current, *last_sp;
347   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
348
349   if(!cq->head) return 0;
350
351  full_monty:
352   /* Make sure we have a connection */
353   while(stratcon_database_connect(cq)) {
354     noitL(noit_error, "Error connecting to database\n");
355     sleep(1);
356   }
357
358   current = cq->head;
359   last_sp = NULL;
360   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
361   while(current) {
362     execute_outcome_t rv;
363     if(current->data) {
364       if(!last_sp) SAVEPOINT("batch");
365  
366       if(current->problematic) {
367         noitL(noit_error, "Failed noit line: %s", current->data);
368         RELEASE_SAVEPOINT("batch");
369         current = current->next;
370         continue;
371       }
372       rv = stratcon_datastore_execute(cq, cq->remote, current);
373       switch(rv) {
374         case DS_EXEC_SUCCESS:
375           current = current->next;
376           break;
377         case DS_EXEC_ROW_FAILED:
378           /* rollback to savepoint, mark this record as bad and start again */
379           current->problematic = 1;
380           current = last_sp;
381           ROLLBACK_TO_SAVEPOINT("batch");
382           break;
383         case DS_EXEC_TXN_FAILED:
384           BUSTED(cq);
385       }
386     }
387     if(current->completion_event) {
388       if(last_sp) RELEASE_SAVEPOINT("batch");
389       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
390       eventer_add(current->completion_event);
391       __remove_until(cq, current->next);
392       current = current->next;
393     }
394   }
395   return 0;
396 }
397 void
398 stratcon_datastore_push(stratcon_datastore_op_t op,
399                         struct sockaddr *remote, void *operand) {
400   conn_q *cq;
401   eventer_t e;
402   ds_job_detail *dsjd;
403
404   cq = __get_conn_q_for_remote(remote);
405   dsjd = calloc(1, sizeof(*dsjd));
406   switch(op) {
407     case DS_OP_INSERT:
408       dsjd->data = operand;
409       __append(cq, dsjd);
410       break;
411     case DS_OP_CHKPT:
412       dsjd->completion_event = operand;
413       __append(cq,dsjd);
414       e = eventer_alloc();
415       e->mask = EVENTER_ASYNCH;
416       e->callback = stratcon_datastore_asynch_execute;
417       e->closure = cq;
418       eventer_add(e);
419       break;
420   }
421 }
Note: See TracBrowser for help on using the browser.