root/src/stratcon_datastore.c

Revision baa98ed1b2d238f912d8e21d4f6cdba3b1af1aa5, 10.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 years ago)

free leaking memory

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "stratcon_datastore.h"
10 #include "noit_conf.h"
11 #include "noit_check.h"
12 #include <unistd.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <libpq-fe.h>
16
17 static char *check_insert = NULL;
18 static const char *check_insert_conf = "/stratcon/database/statements/check";
19 static char *status_insert = NULL;
20 static const char *status_insert_conf = "/stratcon/database/statements/status";
21 static char *metric_insert_numeric = NULL;
22 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
23 static char *metric_insert_text = NULL;
24 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
25
26 #define GET_QUERY(a) do { \
27   if(a == NULL) \
28     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
29       goto bad_row; \
30 } while(0)
31
32 #define MAX_PARAMS 8
33 typedef struct ds_job_detail {
34   char *data;  /* The raw string, NULL means the stream is done -- commit. */
35   int problematic;
36   eventer_t completion_event; /* This event should be registered if non NULL */
37   struct ds_job_detail *next;
38
39   /* Postgres specific stuff */
40   int nparams;
41   char *paramValues[MAX_PARAMS];
42   int paramLengths[MAX_PARAMS];
43   int paramFormats[MAX_PARAMS];
44   int paramAllocd[MAX_PARAMS];
45 } ds_job_detail;
46
47 typedef struct {
48   struct sockaddr *remote;
49   eventer_jobq_t  *jobq;
50   /* Postgres specific stuff */
51   PGconn          *dbh;
52   ds_job_detail   *head;
53   ds_job_detail   *tail;
54 } conn_q;
55
56 void __append(conn_q *q, ds_job_detail *d) {
57   d->next = NULL;
58   if(!q->head) q->head = q->tail = d;
59   else {
60     q->tail->next = d;
61     q->tail = d;
62   }
63 }
64 void __remove_until(conn_q *q, ds_job_detail *d) {
65   ds_job_detail *next;
66   while(q->head && q->head != d) {
67     next = q->head;
68     q->head = q->head->next;
69     free_params(next);
70     if(next->data) free(next->data);
71     free(next);
72   }
73   if(!q->head) q->tail = NULL;
74 }
75
76 noit_hash_table ds_conns;
77
78 conn_q *
79 __get_conn_q_for_remote(struct sockaddr *remote) {
80   conn_q *cq;
81   if(noit_hash_retrieve(&ds_conns, (const char *)remote, remote->sa_len,
82                         (void **)&cq))
83     return cq;
84   cq = calloc(1, sizeof(*cq));
85   cq->remote = malloc(remote->sa_len);
86   memcpy(cq->remote, remote, remote->sa_len);
87   cq->jobq = calloc(1, sizeof(*cq->jobq));
88   eventer_jobq_init(cq->jobq);
89   cq->jobq->backq = eventer_default_backq();
90   /* Add one thread */
91   eventer_jobq_increase_concurrency(cq->jobq);
92   noit_hash_store(&ds_conns, (const char *)cq->remote, cq->remote->sa_len, cq);
93   return cq;
94 }
95
96 typedef enum {
97   DS_EXEC_SUCCESS = 0,
98   DS_EXEC_ROW_FAILED = 1,
99   DS_EXEC_TXN_FAILED = 2,
100 } execute_outcome_t;
101
102 static char *
103 __strndup(const char *src, int len) {
104   char *dst;
105   dst = malloc(len + 1);
106   strlcpy(dst, src, len+1);
107   return dst;
108 }
109 #define DECLARE_PARAM_STR(str, len) do { \
110   d->paramValues[d->nparams] = __strndup(str, len); \
111   d->paramLengths[d->nparams] = len; \
112   d->paramFormats[d->nparams] = 0; \
113   d->paramAllocd[d->nparams] = 1; \
114   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
115     free(d->paramValues[d->nparams]); \
116     d->paramValues[d->nparams] = NULL; \
117     d->paramLengths[d->nparams] = 0; \
118     d->paramAllocd[d->nparams] = 0; \
119   } \
120   d->nparams++; \
121 } while(0)
122
123 static void
124 free_params(ds_job_detail *d) {
125   int i;
126   for(i=0; i<d->nparams; i++)
127     if(d->paramAllocd[i] && d->paramValues[i])
128       free(d->paramValues[i]);
129 }
130 execute_outcome_t
131 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
132   int i, type;
133
134   type = d->data[0];
135
136   /* Parse the log line, but only if we haven't already */
137   if(!d->nparams) {
138     struct sockaddr_in6 *rin6 = (struct sockaddr_in6 *)r;
139     char raddr[128];
140     char *scp, *ecp;
141     int fields;
142     if(inet_ntop(rin6->sin6_family, &rin6->sin6_addr,
143                  raddr, sizeof(raddr)) == NULL)
144       raddr[0] = '\0';
145  
146     d->paramValues[0] = strdup(raddr);
147     d->paramLengths[0] = strlen(raddr);
148     d->paramFormats[0] = 0;
149     d->paramAllocd[0] = 1;
150     d->nparams = 1;
151
152     switch(type) {
153       /* See noit_check_log.c for log description */
154       case 'C':
155       case 'M':
156         fields = 6; break;
157       case 'S':
158         fields = 7; break;
159       default:
160         goto bad_row;
161     }
162
163     scp = d->data;
164     for(i=0;i<fields-1;i++) { /* fields-1 b/c the last field is open-ended */
165       if(!*scp) goto bad_row;
166       ecp = strchr(scp, '\t');
167       if(!ecp) goto bad_row;
168       if(i > 0) /* We skip the type for now */
169         DECLARE_PARAM_STR(scp, ecp-scp);
170       scp = ecp + 1;
171     }
172     /* Now to the last field */
173     if(!*scp) ecp = scp;
174     else {
175       ecp = scp + strlen(scp); /* Puts us at the '\0' */
176       if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */
177     }
178     DECLARE_PARAM_STR(scp, ecp-scp);
179   }
180
181 #define PG_EXEC(cmd) do { \
182   PGresult *res; \
183   int rv; \
184   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
185                      (const char * const *)d->paramValues, \
186                      d->paramLengths, d->paramFormats, 0); \
187   rv = PQresultStatus(res); \
188   if(rv != PGRES_COMMAND_OK) { \
189     noitL(noit_error, "stratcon datasource bad row: %s\n", \
190           PQresultErrorMessage(res)); \
191     PQclear(res); \
192     goto bad_row; \
193   } \
194   PQclear(res); \
195 } while(0)
196
197   /* Now execute the query */
198   switch(type) {
199     case 'C':
200       GET_QUERY(check_insert);
201       PG_EXEC(check_insert);
202       break;
203     case 'S':
204       GET_QUERY(status_insert);
205       PG_EXEC(status_insert);
206       break;
207     case 'M':
208       /* The fifth (idx: 4) bind variable is the type */
209       switch(d->paramValues[4][0]) {
210         case METRIC_INT32:
211         case METRIC_UINT32:
212         case METRIC_INT64:
213         case METRIC_UINT64:
214         case METRIC_DOUBLE:
215           GET_QUERY(metric_insert_numeric);
216           PG_EXEC(metric_insert_numeric);
217           break;
218         case METRIC_STRING:
219           GET_QUERY(metric_insert_text);
220           PG_EXEC(metric_insert_text);
221           break;
222         default:
223           goto bad_row;
224       }
225       break;
226     default:
227       /* should never get here */
228       goto bad_row;
229   }
230   return DS_EXEC_SUCCESS;
231  bad_row:
232   return DS_EXEC_ROW_FAILED;
233 }
234 static int
235 stratcon_database_connect(conn_q *cq) {
236   char dsn[512];
237   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
238   const char *k, *v;
239   int klen;
240   noit_hash_table *t;
241
242   if(cq->dbh) {
243     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
244     PQreset(cq->dbh);
245     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
246     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
247           dsn, PQerrorMessage(cq->dbh));
248     return -1;
249   }
250
251   dsn[0] = '\0';
252   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
253   while(noit_hash_next(t, &iter, &k, &klen, (void **)&v)) {
254     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));
255     strlcat(dsn, k, sizeof(dsn));
256     strlcat(dsn, "=", sizeof(dsn));
257     strlcat(dsn, v, sizeof(dsn));
258   }
259   cq->dbh = PQconnectdb(dsn);
260   if(!cq->dbh) return -1;
261   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
262   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
263         dsn, PQerrorMessage(cq->dbh));
264   return -1;
265 }
266 static int
267 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
268                                 const char *name) {
269   int rv = -1;
270   PGresult *res;
271   char cmd[128];
272   strlcpy(cmd, p, sizeof(cmd));
273   strlcat(cmd, name, sizeof(cmd));
274   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
275   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
276   PQclear(res);
277   return rv;
278 }
279 static int
280 stratcon_datastore_do(conn_q *cq, const char *cmd) {
281   PGresult *res;
282   int rv = -1;
283   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
284   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
285   PQclear(res);
286   return rv;
287 }
288 #define BUSTED(cq) do { \
289   PQfinish((cq)->dbh); \
290   (cq)->dbh = NULL; \
291   goto full_monty; \
292 } while(0)
293 #define SAVEPOINT(name) do { \
294   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
295   last_sp = current; \
296 } while(0)
297 #define ROLLBACK_TO_SAVEPOINT(name) do { \
298   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
299     BUSTED(cq); \
300   last_sp = NULL; \
301 } while(0)
302 #define RELEASE_SAVEPOINT(name) do { \
303   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
304     BUSTED(cq); \
305   last_sp = NULL; \
306 } while(0)
307 int
308 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
309                                   struct timeval *now) {
310   conn_q *cq = closure;
311   ds_job_detail *current, *last_sp;
312   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
313
314   if(!cq->head) return 0;
315
316  full_monty:
317   /* Make sure we have a connection */
318   while(stratcon_database_connect(cq)) {
319     noitL(noit_error, "Error connecting to database\n");
320     sleep(1);
321   }
322
323   current = cq->head;
324   last_sp = NULL;
325   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
326   while(current) {
327     execute_outcome_t rv;
328     if(current->data) {
329       if(!last_sp) SAVEPOINT("batch");
330  
331       if(current->problematic) {
332         noitL(noit_error, "Failed noit line: %s", current->data);
333         RELEASE_SAVEPOINT("batch");
334         current = current->next;
335         continue;
336       }
337       rv = stratcon_datastore_execute(cq, cq->remote, current);
338       switch(rv) {
339         case DS_EXEC_SUCCESS:
340           current = current->next;
341           break;
342         case DS_EXEC_ROW_FAILED:
343           /* rollback to savepoint, mark this record as bad and start again */
344           current->problematic = 1;
345           current = last_sp;
346           ROLLBACK_TO_SAVEPOINT("batch");
347           break;
348         case DS_EXEC_TXN_FAILED:
349           BUSTED(cq);
350       }
351     }
352     if(current->completion_event) {
353       if(last_sp) RELEASE_SAVEPOINT("batch");
354       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
355       eventer_add(current->completion_event);
356       __remove_until(cq, current->next);
357       current = current->next;
358     }
359   }
360   return 0;
361 }
362 void
363 stratcon_datastore_push(stratcon_datastore_op_t op,
364                         struct sockaddr *remote, void *operand) {
365   conn_q *cq;
366   eventer_t e;
367   ds_job_detail *dsjd;
368
369   cq = __get_conn_q_for_remote(remote);
370   dsjd = calloc(1, sizeof(*dsjd));
371   switch(op) {
372     case DS_OP_INSERT:
373       dsjd->data = operand;
374       __append(cq, dsjd);
375       break;
376     case DS_OP_CHKPT:
377       dsjd->completion_event = operand;
378       __append(cq,dsjd);
379       e = eventer_alloc();
380       e->mask = EVENTER_ASYNCH;
381       e->callback = stratcon_datastore_asynch_execute;
382       e->closure = cq;
383       eventer_add(e);
384       break;
385   }
386 }
Note: See TracBrowser for help on using the browser.