root/src/stratcon_datastore.c

Revision 247177137e6b22bd81c65d1120e06c4d14962d46, 10.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

move SQL statements to the conf file

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "stratcon_datastore.h"
10 #include "noit_conf.h"
11 #include "noit_check.h"
12 #include <unistd.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <libpq-fe.h>
16
17 static char *check_insert = NULL;
18 static const char *check_insert_conf = "/stratcon/database/statements/check";
19 static char *status_insert = NULL;
20 static const char *status_insert_conf = "/stratcon/database/statements/status";
21 static char *metric_insert_numeric = NULL;
22 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
23 static char *metric_insert_text = NULL;
24 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
25
26 #define GET_QUERY(a) do { \
27   if(a == NULL) \
28     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
29       goto bad_row; \
30 } while(0)
31
32 #define MAX_PARAMS 8
33 typedef struct ds_job_detail {
34   char *data;  /* The raw string, NULL means the stream is done -- commit. */
35   int problematic;
36   eventer_t completion_event; /* This event should be registered if non NULL */
37   struct ds_job_detail *next;
38
39   /* Postgres specific stuff */
40   int nparams;
41   char *paramValues[MAX_PARAMS];
42   int paramLengths[MAX_PARAMS];
43   int paramFormats[MAX_PARAMS];
44   int paramAllocd[MAX_PARAMS];
45 } ds_job_detail;
46
47 typedef struct {
48   struct sockaddr *remote;
49   eventer_jobq_t  *jobq;
50   /* Postgres specific stuff */
51   PGconn          *dbh;
52   ds_job_detail   *head;
53   ds_job_detail   *tail;
54 } conn_q;
55
56 void __append(conn_q *q, ds_job_detail *d) {
57   d->next = NULL;
58   if(!q->head) q->head = q->tail = d;
59   else {
60     q->tail->next = d;
61     q->tail = d;
62   }
63 }
64 void __remove_until(conn_q *q, ds_job_detail *d) {
65   ds_job_detail *next;
66   while(q->head && q->head != d) {
67     next = q->head;
68     q->head = q->head->next;
69     if(next->data) free(next->data);
70     free(next);
71   }
72   if(!q->head) q->tail = NULL;
73 }
74
75 noit_hash_table ds_conns;
76
77 conn_q *
78 __get_conn_q_for_remote(struct sockaddr *remote) {
79   conn_q *cq;
80   if(noit_hash_retrieve(&ds_conns, (const char *)remote, remote->sa_len,
81                         (void **)&cq))
82     return cq;
83   cq = calloc(1, sizeof(*cq));
84   cq->remote = malloc(remote->sa_len);
85   memcpy(cq->remote, remote, remote->sa_len);
86   cq->jobq = calloc(1, sizeof(*cq->jobq));
87   eventer_jobq_init(cq->jobq);
88   cq->jobq->backq = eventer_default_backq();
89   /* Add one thread */
90   eventer_jobq_increase_concurrency(cq->jobq);
91   noit_hash_store(&ds_conns, (const char *)cq->remote, cq->remote->sa_len, cq);
92   return cq;
93 }
94
95 typedef enum {
96   DS_EXEC_SUCCESS = 0,
97   DS_EXEC_ROW_FAILED = 1,
98   DS_EXEC_TXN_FAILED = 2,
99 } execute_outcome_t;
100
101 static char *
102 __strndup(const char *src, int len) {
103   char *dst;
104   dst = malloc(len + 1);
105   strlcpy(dst, src, len+1);
106   return dst;
107 }
108 #define DECLARE_PARAM_STR(str, len) do { \
109   d->paramValues[d->nparams] = __strndup(str, len); \
110   d->paramLengths[d->nparams] = len; \
111   d->paramFormats[d->nparams] = 0; \
112   d->paramAllocd[d->nparams] = 1; \
113   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
114     free(d->paramValues[d->nparams]); \
115     d->paramLengths[d->nparams] = 0; \
116     d->paramAllocd[d->nparams] = 0; \
117   } \
118   d->nparams++; \
119 } while(0)
120
121 execute_outcome_t
122 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
123   int i, type;
124
125   type = d->data[0];
126
127   /* Parse the log line, but only if we haven't already */
128   if(!d->nparams) {
129     struct sockaddr_in6 *rin6 = (struct sockaddr_in6 *)r;
130     char raddr[128];
131     char *scp, *ecp;
132     int fields;
133     if(inet_ntop(rin6->sin6_family, &rin6->sin6_addr,
134                  raddr, sizeof(raddr)) == NULL)
135       raddr[0] = '\0';
136  
137     d->paramValues[0] = strdup(raddr);
138     d->paramLengths[0] = strlen(raddr);
139     d->paramFormats[0] = 0;
140     d->paramAllocd[0] = 1;
141     d->nparams = 1;
142
143     switch(type) {
144       /* See noit_check_log.c for log description */
145       case 'C':
146       case 'M':
147         fields = 6; break;
148       case 'S':
149         fields = 7; break;
150       default:
151         goto bad_row;
152     }
153
154     scp = d->data;
155     for(i=0;i<fields-1;i++) { /* fields-1 b/c the last field is open-ended */
156       if(!*scp) goto bad_row;
157       ecp = strchr(scp, '\t');
158       if(!ecp) goto bad_row;
159       if(i > 0) /* We skip the type for now */
160         DECLARE_PARAM_STR(scp, ecp-scp);
161       scp = ecp + 1;
162     }
163     /* Now to the last field */
164     if(!*scp) ecp = scp;
165     else {
166       ecp = scp + strlen(scp); /* Puts us at the '\0' */
167       if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */
168     }
169     DECLARE_PARAM_STR(scp, ecp-scp);
170   }
171
172 #define PG_EXEC(cmd) do { \
173   PGresult *res; \
174   int rv; \
175   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
176                      (const char * const *)d->paramValues, \
177                      d->paramLengths, d->paramFormats, 0); \
178   rv = PQresultStatus(res); \
179   if(rv != PGRES_COMMAND_OK) { \
180     noitL(noit_error, "stratcon datasource bad row: %s\n", \
181           PQresultErrorMessage(res)); \
182     PQclear(res); \
183     goto bad_row; \
184   } \
185   PQclear(res); \
186 } while(0)
187
188   /* Now execute the query */
189   switch(type) {
190     case 'C':
191       GET_QUERY(check_insert);
192       PG_EXEC(check_insert);
193       break;
194     case 'S':
195       GET_QUERY(status_insert);
196       PG_EXEC(status_insert);
197       break;
198     case 'M':
199       /* The fifth (idx: 4) bind variable is the type */
200       switch(d->paramValues[4][0]) {
201         case METRIC_INT32:
202         case METRIC_UINT32:
203         case METRIC_INT64:
204         case METRIC_UINT64:
205         case METRIC_DOUBLE:
206           GET_QUERY(metric_insert_numeric);
207           PG_EXEC(metric_insert_numeric);
208           break;
209         case METRIC_STRING:
210           GET_QUERY(metric_insert_text);
211           PG_EXEC(metric_insert_text);
212           break;
213         default:
214           goto bad_row;
215       }
216       break;
217     default:
218       /* should never get here */
219       goto bad_row;
220   }
221   return DS_EXEC_SUCCESS;
222  bad_row:
223   return DS_EXEC_ROW_FAILED;
224 }
225 static int
226 stratcon_database_connect(conn_q *cq) {
227   char dsn[512];
228   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
229   const char *k, *v;
230   int klen;
231   noit_hash_table *t;
232
233   if(cq->dbh) {
234     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
235     PQreset(cq->dbh);
236     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
237     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
238           dsn, PQerrorMessage(cq->dbh));
239     return -1;
240   }
241
242   dsn[0] = '\0';
243   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
244   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
245     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
246     strlcat(dsn, k, sizeof(dsn));
247     strlcat(dsn, "=", sizeof(dsn));
248     strlcat(dsn, v, sizeof(dsn));
249   }
250   cq->dbh = PQconnectdb(dsn);
251   if(!cq->dbh) return -1;
252   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
253   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
254         dsn, PQerrorMessage(cq->dbh));
255   return -1;
256 }
257 static int
258 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
259                                 const char *name) {
260   int rv;
261   PGresult *res;
262   char cmd[128];
263   strlcpy(cmd, p, sizeof(cmd));
264   strlcat(cmd, name, sizeof(cmd));
265   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
266   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
267   PQclear(res);
268   return rv;
269 }
270 static int
271 stratcon_datastore_do(conn_q *cq, const char *cmd) {
272   PGresult *res;
273   int rv = -1;
274   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
275   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
276   PQclear(res);
277   return rv;
278 }
279 #define BUSTED(cq) do { \
280   PQfinish((cq)->dbh); \
281   (cq)->dbh = NULL; \
282   goto full_monty; \
283 } while(0)
284 #define SAVEPOINT(name) do { \
285   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
286   last_sp = current; \
287 } while(0)
288 #define ROLLBACK_TO_SAVEPOINT(name) do { \
289   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
290     BUSTED(cq); \
291   last_sp = NULL; \
292 } while(0)
293 #define RELEASE_SAVEPOINT(name) do { \
294   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
295     BUSTED(cq); \
296   last_sp = NULL; \
297 } while(0)
298 int
299 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
300                                   struct timeval *now) {
301   conn_q *cq = closure;
302   ds_job_detail *current, *last_sp;
303   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
304
305   if(!cq->head) return 0;
306
307  full_monty:
308   /* Make sure we have a connection */
309   while(stratcon_database_connect(cq)) {
310     noitL(noit_error, "Error connecting to database\n");
311     sleep(1);
312   }
313
314   current = cq->head;
315   last_sp = NULL;
316   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
317   while(current) {
318     execute_outcome_t rv;
319     if(current->data) {
320       if(!last_sp) SAVEPOINT("batch");
321  
322       if(current->problematic) {
323         noitL(noit_error, "Failed noit line: %s", current->data);
324         RELEASE_SAVEPOINT("batch");
325         current = current->next;
326         continue;
327       }
328       rv = stratcon_datastore_execute(cq, cq->remote, current);
329       switch(rv) {
330         case DS_EXEC_SUCCESS:
331           current = current->next;
332           break;
333         case DS_EXEC_ROW_FAILED:
334           /* rollback to savepoint, mark this record as bad and start again */
335           current->problematic = 1;
336           current = last_sp;
337           ROLLBACK_TO_SAVEPOINT("batch");
338           break;
339         case DS_EXEC_TXN_FAILED:
340           BUSTED(cq);
341       }
342     }
343     if(current->completion_event) {
344       if(last_sp) RELEASE_SAVEPOINT("batch");
345       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
346       eventer_add(current->completion_event);
347       __remove_until(cq, current->next);
348       current = current->next;
349     }
350   }
351   return 0;
352 }
353 void
354 stratcon_datastore_push(stratcon_datastore_op_t op,
355                         struct sockaddr *remote, void *operand) {
356   conn_q *cq;
357   eventer_t e;
358   ds_job_detail *dsjd;
359
360   cq = __get_conn_q_for_remote(remote);
361   dsjd = calloc(1, sizeof(*dsjd));
362   switch(op) {
363     case DS_OP_INSERT:
364       dsjd->data = operand;
365       __append(cq, dsjd);
366       break;
367     case DS_OP_CHKPT:
368       dsjd->completion_event = operand;
369       __append(cq,dsjd);
370       e = eventer_alloc();
371       e->mask = EVENTER_ASYNCH;
372       e->callback = stratcon_datastore_asynch_execute;
373       e->closure = cq;
374       eventer_add(e);
375       break;
376   }
377 }
Note: See TracBrowser for help on using the browser.