root/src/stratcon_datastore.c

Revision a50432366e0c6614924558d1aa82eaa82b967a0b, 11.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

compiles on linux -- still no eventer, refs #12

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "stratcon_datastore.h"
10 #include "noit_conf.h"
11 #include "noit_check.h"
12 #include <unistd.h>
13 #include <netinet/in.h>
14 #include <sys/un.h>
15 #include <arpa/inet.h>
16 #include <libpq-fe.h>
17
18 static char *check_insert = NULL;
19 static const char *check_insert_conf = "/stratcon/database/statements/check";
20 static char *status_insert = NULL;
21 static const char *status_insert_conf = "/stratcon/database/statements/status";
22 static char *metric_insert_numeric = NULL;
23 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
24 static char *metric_insert_text = NULL;
25 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
26
27 #define GET_QUERY(a) do { \
28   if(a == NULL) \
29     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
30       goto bad_row; \
31 } while(0)
32
33 #define MAX_PARAMS 8
34 typedef struct ds_job_detail {
35   char *data;  /* The raw string, NULL means the stream is done -- commit. */
36   int problematic;
37   eventer_t completion_event; /* This event should be registered if non NULL */
38   struct ds_job_detail *next;
39
40   /* Postgres specific stuff */
41   int nparams;
42   int metric_type;
43   char *paramValues[MAX_PARAMS];
44   int paramLengths[MAX_PARAMS];
45   int paramFormats[MAX_PARAMS];
46   int paramAllocd[MAX_PARAMS];
47 } ds_job_detail;
48
49 typedef struct {
50   struct sockaddr *remote;
51   eventer_jobq_t  *jobq;
52   /* Postgres specific stuff */
53   PGconn          *dbh;
54   ds_job_detail   *head;
55   ds_job_detail   *tail;
56 } conn_q;
57
58 static void
59 free_params(ds_job_detail *d) {
60   int i;
61   for(i=0; i<d->nparams; i++)
62     if(d->paramAllocd[i] && d->paramValues[i])
63       free(d->paramValues[i]);
64 }
65
66 static void
67 __append(conn_q *q, ds_job_detail *d) {
68   d->next = NULL;
69   if(!q->head) q->head = q->tail = d;
70   else {
71     q->tail->next = d;
72     q->tail = d;
73   }
74 }
75 static void
76 __remove_until(conn_q *q, ds_job_detail *d) {
77   ds_job_detail *next;
78   while(q->head && q->head != d) {
79     next = q->head;
80     q->head = q->head->next;
81     free_params(next);
82     if(next->data) free(next->data);
83     free(next);
84   }
85   if(!q->head) q->tail = NULL;
86 }
87
88 noit_hash_table ds_conns;
89
90 conn_q *
91 __get_conn_q_for_remote(struct sockaddr *remote) {
92   conn_q *cq;
93   int len = 0;
94   switch(remote->sa_family) {
95     case AF_INET: len = sizeof(struct sockaddr_in); break;
96     case AF_INET6: len = sizeof(struct sockaddr_in6); break;
97     case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break;
98     default: return NULL;
99   }
100   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq))
101     return cq;
102   cq = calloc(1, sizeof(*cq));
103   cq->remote = malloc(len);
104   memcpy(cq->remote, remote, len);
105   cq->jobq = calloc(1, sizeof(*cq->jobq));
106   eventer_jobq_init(cq->jobq);
107   cq->jobq->backq = eventer_default_backq();
108   /* Add one thread */
109   eventer_jobq_increase_concurrency(cq->jobq);
110   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq);
111   return cq;
112 }
113
114 typedef enum {
115   DS_EXEC_SUCCESS = 0,
116   DS_EXEC_ROW_FAILED = 1,
117   DS_EXEC_TXN_FAILED = 2,
118 } execute_outcome_t;
119
120 static char *
121 __strndup(const char *src, int len) {
122   char *dst;
123   dst = malloc(len + 1);
124   strlcpy(dst, src, len+1);
125   return dst;
126 }
127 #define DECLARE_PARAM_STR(str, len) do { \
128   d->paramValues[d->nparams] = __strndup(str, len); \
129   d->paramLengths[d->nparams] = len; \
130   d->paramFormats[d->nparams] = 0; \
131   d->paramAllocd[d->nparams] = 1; \
132   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
133     free(d->paramValues[d->nparams]); \
134     d->paramValues[d->nparams] = NULL; \
135     d->paramLengths[d->nparams] = 0; \
136     d->paramAllocd[d->nparams] = 0; \
137   } \
138   d->nparams++; \
139 } while(0)
140
141 execute_outcome_t
142 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
143   int type, len;
144   char *token;
145
146   type = d->data[0];
147
148   /* Parse the log line, but only if we haven't already */
149   if(!d->nparams) {
150     struct sockaddr_in6 *rin6 = (struct sockaddr_in6 *)r;
151     char raddr[128];
152     char *scp, *ecp;
153     if(inet_ntop(rin6->sin6_family, &rin6->sin6_addr,
154                  raddr, sizeof(raddr)) == NULL)
155       raddr[0] = '\0';
156  
157     scp = d->data;
158 #define PROCESS_NEXT_FIELD(t,l) do { \
159   if(!*scp) goto bad_row; \
160   ecp = strchr(scp, '\t'); \
161   if(!ecp) goto bad_row; \
162   token = scp; \
163   len = (ecp-scp); \
164   scp = ecp + 1; \
165 } while(0)
166 #define PROCESS_LAST_FIELD(t,l) do { \
167   if(!*scp) ecp = scp; \
168   else { \
169     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
170     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
171   } \
172   t = scp; \
173   l = (ecp-scp); \
174 } while(0)
175
176     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
177     switch(type) {
178       /* See noit_check_log.c for log description */
179       case 'C':
180         DECLARE_PARAM_STR(raddr, strlen(raddr));
181         PROCESS_NEXT_FIELD(token,len);
182         DECLARE_PARAM_STR(token,len); /* timestamp */
183         PROCESS_NEXT_FIELD(token, len);
184         DECLARE_PARAM_STR(token,len); /* uuid */
185         PROCESS_NEXT_FIELD(token, len);
186         DECLARE_PARAM_STR(token,len); /* target */
187         PROCESS_NEXT_FIELD(token, len);
188         DECLARE_PARAM_STR(token,len); /* module */
189         PROCESS_LAST_FIELD(token, len);
190         DECLARE_PARAM_STR(token,len); /* name */
191         break;
192       case 'M':
193         PROCESS_NEXT_FIELD(token,len);
194         DECLARE_PARAM_STR(token,len); /* timestamp */
195         PROCESS_NEXT_FIELD(token, len);
196         DECLARE_PARAM_STR(token,len); /* uuid */
197         PROCESS_NEXT_FIELD(token, len);
198         DECLARE_PARAM_STR(token,len); /* name */
199         PROCESS_NEXT_FIELD(token,len);
200         d->metric_type = *token;
201         PROCESS_LAST_FIELD(token,len);
202         DECLARE_PARAM_STR(token,len); /* value */
203         break;
204       case 'S':
205         PROCESS_NEXT_FIELD(token,len);
206         DECLARE_PARAM_STR(token,len); /* timestamp */
207         PROCESS_NEXT_FIELD(token, len);
208         DECLARE_PARAM_STR(token,len); /* uuid */
209         PROCESS_NEXT_FIELD(token, len);
210         DECLARE_PARAM_STR(token,len); /* state */
211         PROCESS_NEXT_FIELD(token, len);
212         DECLARE_PARAM_STR(token,len); /* availability */
213         PROCESS_NEXT_FIELD(token, len);
214         DECLARE_PARAM_STR(token,len); /* duration */
215         PROCESS_LAST_FIELD(token,len);
216         DECLARE_PARAM_STR(token,len); /* status */
217         break;
218       default:
219         goto bad_row;
220     }
221
222   }
223
224 #define PG_EXEC(cmd) do { \
225   PGresult *res; \
226   int rv; \
227   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
228                      (const char * const *)d->paramValues, \
229                      d->paramLengths, d->paramFormats, 0); \
230   rv = PQresultStatus(res); \
231   if(rv != PGRES_COMMAND_OK) { \
232     noitL(noit_error, "stratcon datasource bad row: %s\n", \
233           PQresultErrorMessage(res)); \
234     PQclear(res); \
235     goto bad_row; \
236   } \
237   PQclear(res); \
238 } while(0)
239
240   /* Now execute the query */
241   switch(type) {
242     case 'C':
243       GET_QUERY(check_insert);
244       PG_EXEC(check_insert);
245       break;
246     case 'S':
247       GET_QUERY(status_insert);
248       PG_EXEC(status_insert);
249       break;
250     case 'M':
251       switch(d->metric_type) {
252         case METRIC_INT32:
253         case METRIC_UINT32:
254         case METRIC_INT64:
255         case METRIC_UINT64:
256         case METRIC_DOUBLE:
257           GET_QUERY(metric_insert_numeric);
258           PG_EXEC(metric_insert_numeric);
259           break;
260         case METRIC_STRING:
261           GET_QUERY(metric_insert_text);
262           PG_EXEC(metric_insert_text);
263           break;
264         default:
265           goto bad_row;
266       }
267       break;
268     default:
269       /* should never get here */
270       goto bad_row;
271   }
272   return DS_EXEC_SUCCESS;
273  bad_row:
274   return DS_EXEC_ROW_FAILED;
275 }
276 static int
277 stratcon_database_connect(conn_q *cq) {
278   char dsn[512];
279   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
280   const char *k, *v;
281   int klen;
282   noit_hash_table *t;
283
284   if(cq->dbh) {
285     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
286     PQreset(cq->dbh);
287     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
288     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
289           dsn, PQerrorMessage(cq->dbh));
290     return -1;
291   }
292
293   dsn[0] = '\0';
294   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
295   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
296     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
297     strlcat(dsn, k, sizeof(dsn));
298     strlcat(dsn, "=", sizeof(dsn));
299     strlcat(dsn, v, sizeof(dsn));
300   }
301   cq->dbh = PQconnectdb(dsn);
302   if(!cq->dbh) return -1;
303   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
304   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
305         dsn, PQerrorMessage(cq->dbh));
306   return -1;
307 }
308 static int
309 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
310                                 const char *name) {
311   int rv = -1;
312   PGresult *res;
313   char cmd[128];
314   strlcpy(cmd, p, sizeof(cmd));
315   strlcat(cmd, name, sizeof(cmd));
316   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
317   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
318   PQclear(res);
319   return rv;
320 }
321 static int
322 stratcon_datastore_do(conn_q *cq, const char *cmd) {
323   PGresult *res;
324   int rv = -1;
325   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
326   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
327   PQclear(res);
328   return rv;
329 }
330 #define BUSTED(cq) do { \
331   PQfinish((cq)->dbh); \
332   (cq)->dbh = NULL; \
333   goto full_monty; \
334 } while(0)
335 #define SAVEPOINT(name) do { \
336   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
337   last_sp = current; \
338 } while(0)
339 #define ROLLBACK_TO_SAVEPOINT(name) do { \
340   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
341     BUSTED(cq); \
342   last_sp = NULL; \
343 } while(0)
344 #define RELEASE_SAVEPOINT(name) do { \
345   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
346     BUSTED(cq); \
347   last_sp = NULL; \
348 } while(0)
349 int
350 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
351                                   struct timeval *now) {
352   conn_q *cq = closure;
353   ds_job_detail *current, *last_sp;
354   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
355
356   if(!cq->head) return 0;
357
358  full_monty:
359   /* Make sure we have a connection */
360   while(stratcon_database_connect(cq)) {
361     noitL(noit_error, "Error connecting to database\n");
362     sleep(1);
363   }
364
365   current = cq->head;
366   last_sp = NULL;
367   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
368   while(current) {
369     execute_outcome_t rv;
370     if(current->data) {
371       if(!last_sp) SAVEPOINT("batch");
372  
373       if(current->problematic) {
374         noitL(noit_error, "Failed noit line: %s", current->data);
375         RELEASE_SAVEPOINT("batch");
376         current = current->next;
377         continue;
378       }
379       rv = stratcon_datastore_execute(cq, cq->remote, current);
380       switch(rv) {
381         case DS_EXEC_SUCCESS:
382           current = current->next;
383           break;
384         case DS_EXEC_ROW_FAILED:
385           /* rollback to savepoint, mark this record as bad and start again */
386           current->problematic = 1;
387           current = last_sp;
388           ROLLBACK_TO_SAVEPOINT("batch");
389           break;
390         case DS_EXEC_TXN_FAILED:
391           BUSTED(cq);
392       }
393     }
394     if(current->completion_event) {
395       if(last_sp) RELEASE_SAVEPOINT("batch");
396       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
397       eventer_add(current->completion_event);
398       __remove_until(cq, current->next);
399       current = current->next;
400     }
401   }
402   return 0;
403 }
404 void
405 stratcon_datastore_push(stratcon_datastore_op_t op,
406                         struct sockaddr *remote, void *operand) {
407   conn_q *cq;
408   eventer_t e;
409   ds_job_detail *dsjd;
410
411   cq = __get_conn_q_for_remote(remote);
412   dsjd = calloc(1, sizeof(*dsjd));
413   switch(op) {
414     case DS_OP_INSERT:
415       dsjd->data = operand;
416       __append(cq, dsjd);
417       break;
418     case DS_OP_CHKPT:
419       dsjd->completion_event = operand;
420       __append(cq,dsjd);
421       e = eventer_alloc();
422       e->mask = EVENTER_ASYNCH;
423       e->callback = stratcon_datastore_asynch_execute;
424       e->closure = cq;
425       eventer_add(e);
426       break;
427   }
428 }
Note: See TracBrowser for help on using the browser.