root/src/modules/postgres_ingestor.c

Revision 73575572dbd7776c0aed9b4ea5c48a00de417f00, 52.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

memory leak fix

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "stratcon_datastore.h"
42 #include "stratcon_realtime_http.h"
43 #include "stratcon_iep.h"
44 #include "noit_conf.h"
45 #include "noit_check.h"
46 #include "noit_rest.h"
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <netinet/in.h>
50 #include <sys/un.h>
51 #include <dirent.h>
52 #include <arpa/inet.h>
53 #include <sys/mman.h>
54 #include <libpq-fe.h>
55 #include <zlib.h>
56 #include <assert.h>
57 #include <errno.h>
58
59 #define DECL_STMT(codename,confname) \
60 static char *codename = NULL; \
61 static const char *codename##_conf = "/stratcon/database/statements/" #confname
62
63 DECL_STMT(storage_post_connect, storagepostconnect);
64 DECL_STMT(metanode_post_connect, metanodepostconnect);
65 DECL_STMT(find_storage, findstoragenode);
66 DECL_STMT(all_storage, allstoragenodes);
67 DECL_STMT(check_map, mapchecktostoragenode);
68 DECL_STMT(check_mapall, mapallchecks);
69 DECL_STMT(check_loadall, allchecks);
70 DECL_STMT(check_find, findcheck);
71 DECL_STMT(check_insert, check);
72 DECL_STMT(status_insert, status);
73 DECL_STMT(metric_insert_numeric, metric_numeric);
74 DECL_STMT(metric_insert_text, metric_text);
75 DECL_STMT(config_insert, config);
76 DECL_STMT(config_get, findconfig);
77
78 static noit_log_stream_t ds_err = NULL;
79 static noit_log_stream_t ds_deb = NULL;
80 static noit_log_stream_t ds_pool_deb = NULL;
81 static noit_log_stream_t ingest_err = NULL;
82
83 #define GET_QUERY(a) do { \
84   if(a == NULL) \
85     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
86       goto bad_row; \
87 } while(0)
88
89 struct conn_q;
90
91 typedef struct {
92   char            *queue_name; /* the key fqdn+remote_sn */
93   eventer_jobq_t  *jobq;
94   struct conn_q   *head;
95   pthread_mutex_t  lock;
96   pthread_cond_t   cv;
97   int              ttl;
98   int              in_pool;
99   int              outstanding;
100   int              max_allocated;
101   int              max_in_pool;
102 } conn_pool;
103 typedef struct conn_q {
104   time_t           last_use;
105   char            *dsn;        /* Pg connect string */
106   char            *remote_str; /* the IP of the noit*/
107   char            *remote_cn;  /* the Cert CN of the noit */
108   char            *fqdn;       /* the fqdn of the storage node */
109   conn_pool       *pool;
110   struct conn_q   *next;
111   /* Postgres specific stuff */
112   PGconn          *dbh;
113 } conn_q;
114
115
116 #define MAX_PARAMS 8
117 #define POSTGRES_PARTS \
118   PGresult *res; \
119   int rv; \
120   time_t whence; \
121   int nparams; \
122   int metric_type; \
123   char *paramValues[MAX_PARAMS]; \
124   int paramLengths[MAX_PARAMS]; \
125   int paramFormats[MAX_PARAMS]; \
126   int paramAllocd[MAX_PARAMS];
127
128 typedef struct ds_single_detail {
129   POSTGRES_PARTS
130 } ds_single_detail;
131 typedef struct {
132   /* Postgres specific stuff */
133   POSTGRES_PARTS
134   struct realtime_tracker *rt;
135   conn_q *cq; /* connection on which to perform this job */
136   eventer_t completion_event; /* This event should be registered if non NULL */
137 } ds_rt_detail;
138 typedef struct ds_line_detail {
139   /* Postgres specific stuff */
140   POSTGRES_PARTS
141   char *data;
142   int problematic;
143   struct ds_line_detail *next;
144 } ds_line_detail;
145
146 typedef struct {
147   char *remote_str;
148   char *remote_cn;
149   char *fqdn;
150   int storagenode_id;
151   int fd;
152   char *filename;
153   conn_pool *cpool;
154 } pg_interim_journal_t;
155
156 static int stratcon_database_connect(conn_q *cq);
157 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
158 static int storage_node_quick_lookup(const char *uuid_str,
159                                      const char *remote_cn,
160                                      int *sid_out, int *storagenode_id_out,
161                                      const char **remote_cn_out,
162                                      const char **fqdn_out,
163                                      const char **dsn_out);
164
165 static void
166 free_params(ds_single_detail *d) {
167   int i;
168   for(i=0; i<d->nparams; i++)
169     if(d->paramAllocd[i] && d->paramValues[i])
170       free(d->paramValues[i]);
171 }
172
173 static char *basejpath = NULL;
174 static pthread_mutex_t ds_conns_lock;
175 static noit_hash_table ds_conns;
176
177 /* the fqdn cache needs to be thread safe */
178 typedef struct {
179   char *uuid_str;
180   char *remote_cn;
181   int storagenode_id;
182   int sid;
183 } uuid_info;
184 typedef struct {
185   int storagenode_id;
186   char *fqdn;
187   char *dsn;
188 } storagenode_info;
189 noit_hash_table uuid_to_info_cache;
190 pthread_mutex_t storagenode_to_info_cache_lock;
191 noit_hash_table storagenode_to_info_cache;
192
193 /* Thread-safe connection pools */
194
195 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
196 static void
197 release_conn_q_forceable(conn_q *cq, int forcefree) {
198   int putback = 0;
199   cq->last_use = time(NULL);
200   pthread_mutex_lock(&cq->pool->lock);
201   cq->pool->outstanding--;
202   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
203     putback = 1;
204     cq->next = cq->pool->head;
205     cq->pool->head = cq;
206     cq->pool->in_pool++;
207   }
208   pthread_mutex_unlock(&cq->pool->lock);
209   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
210         putback ? "to pool" : "and destroy", cq->pool->queue_name);
211   pthread_cond_signal(&cq->pool->cv);
212   if(putback) return;
213
214   /* Not put back, release it */
215   if(cq->dbh) PQfinish(cq->dbh);
216   if(cq->remote_str) free(cq->remote_str);
217   if(cq->remote_cn) free(cq->remote_cn);
218   if(cq->fqdn) free(cq->fqdn);
219   if(cq->dsn) free(cq->dsn);
220   free(cq);
221 }
222 static void
223 ttl_purge_conn_pool(conn_pool *pool) {
224   int old_cnt, new_cnt;
225   time_t now = time(NULL);
226   conn_q *cq, *prev = NULL, *iter;
227   /* because we always replace on the head and update the last_use time when
228      doing so, we know they are ordered LRU on the end.  So, once we hit an
229      old one, we know all the others are old too.
230    */
231   if(!pool->head) return; /* hack short circuit for no locks */
232   pthread_mutex_lock(&pool->lock);
233   old_cnt = pool->in_pool;
234   cq = pool->head;
235   while(cq) {
236     if(cq->last_use + cq->pool->ttl < now) {
237       if(prev) prev->next = NULL;
238       else pool->head = NULL;
239       break;
240     }
241     prev = cq;
242     cq = cq->next;
243   }
244   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
245   /* Fix accounting */
246   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
247   new_cnt = pool->in_pool;
248   pthread_mutex_unlock(&pool->lock);
249
250   /* Force release these without holding the lock */
251   while(cq) {
252     cq = cq->next;
253     release_conn_q_forceable(cq, 1);
254   }
255   if(old_cnt != new_cnt)
256     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
257           pool->queue_name);
258 }
259 static void
260 release_conn_q(conn_q *cq) {
261   ttl_purge_conn_pool(cq->pool);
262   release_conn_q_forceable(cq, 0);
263 }
264 static conn_pool *
265 get_conn_pool_for_remote(const char *remote_str,
266                          const char *remote_cn, const char *fqdn) {
267   void *vcpool;
268   conn_pool *cpool = NULL;
269   char queue_name[256] = "datastore_";
270   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
271            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
272            fqdn ? fqdn : "default",
273            remote_cn ? remote_cn : "default");
274   pthread_mutex_lock(&ds_conns_lock);
275   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
276                         strlen(queue_name), &vcpool))
277     cpool = vcpool;
278   pthread_mutex_unlock(&ds_conns_lock);
279   if(!cpool) {
280     vcpool = cpool = calloc(1, sizeof(*cpool));
281     cpool->queue_name = strdup(queue_name);
282     pthread_mutex_init(&cpool->lock, NULL);
283     pthread_cond_init(&cpool->cv, NULL);
284     cpool->in_pool = 0;
285     cpool->outstanding = 0;
286     cpool->max_in_pool = 1;
287     cpool->max_allocated = 1;
288     pthread_mutex_lock(&ds_conns_lock);
289     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
290                         cpool)) {
291       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
292                          strlen(queue_name), &vcpool);
293     }
294     pthread_mutex_unlock(&ds_conns_lock);
295     if(vcpool != cpool) {
296       /* someone beat us to it */
297       free(cpool->queue_name);
298       pthread_mutex_destroy(&cpool->lock);
299       pthread_cond_destroy(&cpool->cv);
300       free(cpool);
301     }
302     else {
303       int i;
304       /* Our job to setup the pool */
305       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
306       eventer_jobq_init(cpool->jobq, queue_name);
307       cpool->jobq->backq = eventer_default_backq();
308       /* Add one thread */
309       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
310         eventer_jobq_increase_concurrency(cpool->jobq);
311     }
312     cpool = vcpool;
313   }
314   return cpool;
315 }
316 static conn_q *
317 get_conn_q_for_remote(const char *remote_str,
318                       const char *remote_cn, const char *fqdn,
319                       const char *dsn) {
320   conn_pool *cpool;
321   conn_q *cq;
322   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
323   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
324         cpool->queue_name);
325   pthread_mutex_lock(&cpool->lock);
326  again:
327   if(cpool->head) {
328     assert(cpool->in_pool > 0);
329     cq = cpool->head;
330     cpool->head = cq->next;
331     cpool->in_pool--;
332     cpool->outstanding++;
333     cq->next = NULL;
334     pthread_mutex_unlock(&cpool->lock);
335     return cq;
336   }
337   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
338     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
339           (void *)pthread_self(), cpool->queue_name);
340     pthread_cond_wait(&cpool->cv, &cpool->lock);
341     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
342           (void *)pthread_self(), cpool->queue_name);
343     goto again;
344   }
345   else {
346     cpool->outstanding++;
347     pthread_mutex_unlock(&cpool->lock);
348   }
349  
350   cq = calloc(1, sizeof(*cq));
351   cq->pool = cpool;
352   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
353   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
354   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
355   cq->dsn = dsn ? strdup(dsn) : NULL;
356   return cq;
357 }
358 static conn_q *
359 get_conn_q_for_metanode() {
360   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
361 }
362
363 typedef enum {
364   DS_EXEC_SUCCESS = 0,
365   DS_EXEC_ROW_FAILED = 1,
366   DS_EXEC_TXN_FAILED = 2,
367 } execute_outcome_t;
368
369 #define DECLARE_PARAM_STR(str, len) do { \
370   d->paramValues[d->nparams] = noit__strndup(str, len); \
371   d->paramLengths[d->nparams] = len; \
372   d->paramFormats[d->nparams] = 0; \
373   d->paramAllocd[d->nparams] = 1; \
374   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
375     free(d->paramValues[d->nparams]); \
376     d->paramValues[d->nparams] = NULL; \
377     d->paramLengths[d->nparams] = 0; \
378     d->paramAllocd[d->nparams] = 0; \
379   } \
380   d->nparams++; \
381 } while(0)
382 #define DECLARE_PARAM_INT(i) do { \
383   int buffer__len; \
384   char buffer__[32]; \
385   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
386   buffer__len = strlen(buffer__); \
387   DECLARE_PARAM_STR(buffer__, buffer__len); \
388 } while(0)
389
390 #define PG_GET_STR_COL(dest, row, name) do { \
391   int colnum = PQfnumber(d->res, name); \
392   dest = NULL; \
393   if (colnum >= 0) \
394     dest = PQgetisnull(d->res, row, colnum) \
395          ? NULL : PQgetvalue(d->res, row, colnum); \
396 } while(0)
397
398 #define PG_EXEC(cmd) do { \
399   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
400                         (const char * const *)d->paramValues, \
401                         d->paramLengths, d->paramFormats, 0); \
402   d->rv = PQresultStatus(d->res); \
403   if(d->rv != PGRES_COMMAND_OK && \
404      d->rv != PGRES_TUPLES_OK) { \
405     const char *pgerr = PQresultErrorMessage(d->res); \
406     const char *pgerr_end = strchr(pgerr, '\n'); \
407     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
408     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
409           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
410           (int)(pgerr_end - pgerr), pgerr); \
411     PQclear(d->res); \
412     goto bad_row; \
413   } \
414 } while(0)
415
416 #define PG_TM_EXEC(cmd, whence) do { \
417   time_t __w = whence; \
418   char cmdbuf[4096]; \
419   struct tm tbuf, *tm; \
420   tm = gmtime_r(&__w, &tbuf); \
421   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
422   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
423                         (const char * const *)d->paramValues, \
424                         d->paramLengths, d->paramFormats, 0); \
425   d->rv = PQresultStatus(d->res); \
426   if(d->rv != PGRES_COMMAND_OK && \
427      d->rv != PGRES_TUPLES_OK) { \
428     const char *pgerr = PQresultErrorMessage(d->res); \
429     const char *pgerr_end = strchr(pgerr, '\n'); \
430     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
431     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
432           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
433           (long long unsigned)whence); \
434     PQclear(d->res); \
435     goto bad_row; \
436   } \
437 } while(0)
438
439 static void *
440 stratcon_ingest_check_loadall(void *vsn) {
441   storagenode_info *sn = vsn;
442   ds_single_detail *d;
443   int i, row_count = 0, good = 0;
444   char buff[1024];
445   conn_q *cq = NULL;
446
447   d = calloc(1, sizeof(*d));
448   GET_QUERY(check_loadall);
449   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
450   i = 0;
451   while(stratcon_database_connect(cq)) {
452     if(i++ > 4) {
453       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
454       release_conn_q(cq);
455       return (void *)(vpsized_int)good;
456     }
457     sleep(1);
458   }
459   PG_EXEC(check_loadall);
460   row_count = PQntuples(d->res);
461  
462   for(i=0; i<row_count; i++) {
463     int rv;
464     int8_t family;
465     struct sockaddr *sin;
466     struct sockaddr_in sin4 = { .sin_family = AF_INET };
467     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
468     char *remote, *id, *target, *module, *name;
469     PG_GET_STR_COL(remote, i, "remote_address");
470     PG_GET_STR_COL(id, i, "id");
471     PG_GET_STR_COL(target, i, "target");
472     PG_GET_STR_COL(module, i, "module");
473     PG_GET_STR_COL(name, i, "name");
474     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
475
476     family = AF_INET;
477     sin = (struct sockaddr *)&sin4;
478     rv = inet_pton(family, remote, &sin4.sin_addr);
479     if(rv != 1) {
480       family = AF_INET6;
481       sin = (struct sockaddr *)&sin6;
482       rv = inet_pton(family, remote, &sin6.sin6_addr);
483       if(rv != 1) {
484         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
485         sin = NULL;
486       }
487     }
488
489     /* stratcon_iep_line_processor takes an allocated operand and frees it */
490     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
491     good++;
492   }
493   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
494         good, row_count, sn->fqdn);
495  bad_row:
496   free_params((ds_single_detail *)d);
497   free(d);
498   if(cq) release_conn_q(cq);
499   return (void *)(vpsized_int)good;
500 }
501 static int
502 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
503                                  struct timeval *now) {
504   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
505   pthread_t *jobs = NULL;
506   int nodes, i = 0, tcnt = 0;
507   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
508   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
509
510   pthread_mutex_lock(&storagenode_to_info_cache_lock);
511   nodes = storagenode_to_info_cache.size;
512   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
513   sns = calloc(MAX(1,nodes), sizeof(*sns));
514   if(nodes == 0) sns[nodes++] = &self;
515   else {
516     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
517     const char *k;
518     void *v;
519     int klen;
520     while(noit_hash_next(&storagenode_to_info_cache,
521                          &iter, &k, &klen, &v)) {
522       sns[i++] = (storagenode_info *)v;
523     }
524   }
525   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
526
527   for(i=0; i<nodes; i++) {
528     if(pthread_create(&jobs[i], NULL,
529                       stratcon_ingest_check_loadall, sns[i]) != 0) {
530       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
531     }
532   }
533   for(i=0; i<nodes; i++) {
534     void *good;
535     pthread_join(jobs[i], &good);
536     tcnt += (int)(vpsized_int)good;
537   }
538   free(jobs);
539   free(sns);
540   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
541   return 0;
542 }
543 static void
544 stratcon_ingest_iep_check_preload() {
545   eventer_t e;
546   conn_pool *cpool;
547
548   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
549   e = eventer_alloc();
550   e->mask = EVENTER_ASYNCH;
551   e->callback = stratcon_ingest_asynch_drive_iep;
552   e->closure = NULL;
553   eventer_add_asynch(cpool->jobq, e);
554 }
555 execute_outcome_t
556 stratcon_ingest_find(ds_rt_detail *d) {
557   conn_q *cq;
558   char *val;
559   int row_count;
560   struct realtime_tracker *node;
561
562   for(node = d->rt; node; node = node->next) {
563     char uuid_str[UUID_STR_LEN+1];
564     const char *fqdn, *dsn, *remote_cn;
565     char remote_ip[32];
566     int storagenode_id;
567
568     uuid_unparse_lower(node->checkid, uuid_str);
569     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
570                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
571       continue;
572
573     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
574           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
575
576     /* We might be able to find the IP from our config if someone has
577      * specified the expected cn in the noit definition.
578      */
579     if(stratcon_find_noit_ip_by_cn(remote_cn,
580                                    remote_ip, sizeof(remote_ip)) == 0) {
581       node->noit = strdup(remote_ip);
582       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
583       continue;
584     }
585
586     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
587     stratcon_database_connect(cq);
588
589     GET_QUERY(check_find);
590     DECLARE_PARAM_INT(node->sid);
591     PG_EXEC(check_find);
592     row_count = PQntuples(d->res);
593     if(row_count != 1) {
594       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
595       PQclear(d->res);
596       goto bad_row;
597     }
598
599     /* Get the remote_address (which noit owns this) */
600     PG_GET_STR_COL(val, 0, "remote_address");
601     if(!val) {
602       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
603       PQclear(d->res);
604       goto bad_row;
605     }
606     node->noit = strdup(val);
607     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
608    bad_row:
609     free_params((ds_single_detail *)d);
610     d->nparams = 0;
611     release_conn_q(cq);
612   }
613   return DS_EXEC_SUCCESS;
614 }
615 execute_outcome_t
616 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
617                         ds_line_detail *d) {
618   int type, len, sid;
619   char *final_buff;
620   uLong final_len, actual_final_len;
621   char *token;
622   char raddr_blank[1] = "";
623   const char *raddr;
624
625   type = d->data[0];
626   raddr = r ? r : raddr_blank;
627
628   /* Parse the log line, but only if we haven't already */
629   if(!d->nparams) {
630     char *scp, *ecp;
631
632     scp = d->data;
633 #define PROCESS_NEXT_FIELD(t,l) do { \
634   if(!*scp) goto bad_row; \
635   ecp = strchr(scp, '\t'); \
636   if(!ecp) goto bad_row; \
637   token = scp; \
638   len = (ecp-scp); \
639   scp = ecp + 1; \
640 } while(0)
641 #define PROCESS_LAST_FIELD(t,l) do { \
642   if(!*scp) ecp = scp; \
643   else { \
644     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
645     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
646   } \
647   t = scp; \
648   l = (ecp-scp); \
649 } while(0)
650
651     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
652     switch(type) {
653       /* See noit_check_log.c for log description */
654       case 'n':
655         DECLARE_PARAM_STR(raddr, strlen(raddr));
656         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
657         DECLARE_PARAM_STR("noitd",5); /* node_type */
658         PROCESS_NEXT_FIELD(token,len);
659         d->whence = (time_t)strtoul(token, NULL, 10);
660         DECLARE_PARAM_STR(token,len); /* timestamp */
661
662         /* This is the expected uncompressed len */
663         PROCESS_NEXT_FIELD(token,len);
664         final_len = atoi(token);
665         final_buff = malloc(final_len);
666         if(!final_buff) goto bad_row;
667  
668         /* The last token is b64 endoded and compressed.
669          * we need to decode it, declare it and then free it.
670          */
671         PROCESS_LAST_FIELD(token, len);
672         /* We can in-place decode this */
673         len = noit_b64_decode((char *)token, len,
674                               (unsigned char *)token, len);
675         if(len <= 0) {
676           noitL(noit_error, "noitd config base64 decoding error.\n");
677           free(final_buff);
678           goto bad_row;
679         }
680         actual_final_len = final_len;
681         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
682                               (unsigned char *)token, len)) {
683           noitL(noit_error, "noitd config decompression failure.\n");
684           free(final_buff);
685           goto bad_row;
686         }
687         if(final_len != actual_final_len) {
688           noitL(noit_error, "noitd config decompression error.\n");
689           free(final_buff);
690           goto bad_row;
691         }
692         DECLARE_PARAM_STR(final_buff, final_len);
693         free(final_buff);
694         break;
695       case 'C':
696         DECLARE_PARAM_STR(raddr, strlen(raddr));
697         PROCESS_NEXT_FIELD(token,len);
698         DECLARE_PARAM_STR(token,len); /* timestamp */
699         d->whence = (time_t)strtoul(token, NULL, 10);
700         PROCESS_NEXT_FIELD(token, len);
701         /* uuid is last 36 bytes */
702         if(len > 36) { token += (len-36); len = 36; }
703         sid = uuid_to_sid(token, remote_cn);
704         if(sid == 0) goto bad_row;
705         DECLARE_PARAM_INT(sid); /* sid */
706         DECLARE_PARAM_STR(token,len); /* uuid */
707         PROCESS_NEXT_FIELD(token, len);
708         DECLARE_PARAM_STR(token,len); /* target */
709         PROCESS_NEXT_FIELD(token, len);
710         DECLARE_PARAM_STR(token,len); /* module */
711         PROCESS_LAST_FIELD(token, len);
712         DECLARE_PARAM_STR(token,len); /* name */
713         break;
714       case 'M':
715         PROCESS_NEXT_FIELD(token,len);
716         DECLARE_PARAM_STR(token,len); /* timestamp */
717         d->whence = (time_t)strtoul(token, NULL, 10);
718         PROCESS_NEXT_FIELD(token, len);
719         /* uuid is last 36 bytes */
720         if(len > 36) { token += (len-36); len = 36; }
721         sid = uuid_to_sid(token, remote_cn);
722         if(sid == 0) goto bad_row;
723         DECLARE_PARAM_INT(sid); /* sid */
724         PROCESS_NEXT_FIELD(token, len);
725         DECLARE_PARAM_STR(token,len); /* name */
726         PROCESS_NEXT_FIELD(token,len);
727         d->metric_type = *token;
728         PROCESS_LAST_FIELD(token,len);
729         DECLARE_PARAM_STR(token,len); /* value */
730         break;
731       case 'S':
732         PROCESS_NEXT_FIELD(token,len);
733         DECLARE_PARAM_STR(token,len); /* timestamp */
734         d->whence = (time_t)strtoul(token, NULL, 10);
735         PROCESS_NEXT_FIELD(token, len);
736         /* uuid is last 36 bytes */
737         if(len > 36) { token += (len-36); len = 36; }
738         sid = uuid_to_sid(token, remote_cn);
739         if(sid == 0) goto bad_row;
740         DECLARE_PARAM_INT(sid); /* sid */
741         PROCESS_NEXT_FIELD(token, len);
742         DECLARE_PARAM_STR(token,len); /* state */
743         PROCESS_NEXT_FIELD(token, len);
744         DECLARE_PARAM_STR(token,len); /* availability */
745         PROCESS_NEXT_FIELD(token, len);
746         DECLARE_PARAM_STR(token,len); /* duration */
747         PROCESS_LAST_FIELD(token,len);
748         DECLARE_PARAM_STR(token,len); /* status */
749         break;
750       default:
751         goto bad_row;
752     }
753
754   }
755
756   /* Now execute the query */
757   switch(type) {
758     case 'n':
759       GET_QUERY(config_insert);
760       PG_EXEC(config_insert);
761       PQclear(d->res);
762       break;
763     case 'C':
764       GET_QUERY(check_insert);
765       PG_TM_EXEC(check_insert, d->whence);
766       PQclear(d->res);
767       break;
768     case 'S':
769       GET_QUERY(status_insert);
770       PG_TM_EXEC(status_insert, d->whence);
771       PQclear(d->res);
772       break;
773     case 'M':
774       switch(d->metric_type) {
775         case METRIC_INT32:
776         case METRIC_UINT32:
777         case METRIC_INT64:
778         case METRIC_UINT64:
779         case METRIC_DOUBLE:
780           GET_QUERY(metric_insert_numeric);
781           PG_TM_EXEC(metric_insert_numeric, d->whence);
782           PQclear(d->res);
783           break;
784         case METRIC_STRING:
785           GET_QUERY(metric_insert_text);
786           PG_TM_EXEC(metric_insert_text, d->whence);
787           PQclear(d->res);
788           break;
789         default:
790           goto bad_row;
791       }
792       break;
793     default:
794       /* should never get here */
795       goto bad_row;
796   }
797   return DS_EXEC_SUCCESS;
798  bad_row:
799   return DS_EXEC_ROW_FAILED;
800 }
801 static int
802 stratcon_database_post_connect(conn_q *cq) {
803   int rv = 0;
804   ds_single_detail _d = { 0 }, *d = &_d;
805   if(cq->fqdn) {
806     char *remote_str, *remote_cn;
807     /* This is the silly way we get null's in through our declare_param_str */
808     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
809     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
810     /* This is a storage node, it gets the storage node post_connect */
811     GET_QUERY(storage_post_connect);
812     rv = -1; /* now we're serious */
813     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
814     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
815     PG_EXEC(storage_post_connect);
816     PQclear(d->res);
817     rv = 0;
818   }
819   else {
820     /* Metanode post_connect */
821     GET_QUERY(metanode_post_connect);
822     rv = -1; /* now we're serious */
823     PG_EXEC(metanode_post_connect);
824     PQclear(d->res);
825     rv = 0;
826   }
827  bad_row:
828   free_params(d);
829   if(rv == -1) {
830     /* Post-connect intentions are serious and fatal */
831     PQfinish(cq->dbh);
832     cq->dbh = NULL;
833   }
834   return rv;
835 }
836 static int
837 stratcon_database_connect(conn_q *cq) {
838   char *dsn, dsn_meta[512];
839   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
840   const char *k, *v;
841   int klen;
842   noit_hash_table *t;
843
844   dsn_meta[0] = '\0';
845   if(!cq->dsn) {
846     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
847     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
848       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
849       strlcat(dsn_meta, k, sizeof(dsn_meta));
850       strlcat(dsn_meta, "=", sizeof(dsn_meta));
851       strlcat(dsn_meta, v, sizeof(dsn_meta));
852     }
853     noit_hash_destroy(t, free, free);
854     free(t);
855     dsn = dsn_meta;
856   }
857   else {
858     char options[32];
859     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
860     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
861                                options, sizeof(options))) {
862       strlcat(dsn_meta, " ", sizeof(dsn_meta));
863       strlcat(dsn_meta, "user", sizeof(dsn_meta));
864       strlcat(dsn_meta, "=", sizeof(dsn_meta));
865       strlcat(dsn_meta, options, sizeof(dsn_meta));
866     }
867     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
868                                options, sizeof(options))) {
869       strlcat(dsn_meta, " ", sizeof(dsn_meta));
870       strlcat(dsn_meta, "password", sizeof(dsn_meta));
871       strlcat(dsn_meta, "=", sizeof(dsn_meta));
872       strlcat(dsn_meta, options, sizeof(dsn_meta));
873     }
874     dsn = dsn_meta;
875   }
876
877   if(cq->dbh) {
878     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
879     PQreset(cq->dbh);
880     if(PQstatus(cq->dbh) != CONNECTION_OK) {
881       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
882             dsn, PQerrorMessage(cq->dbh));
883       return -1;
884     }
885     if(stratcon_database_post_connect(cq)) return -1;
886     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
887     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
888           dsn, PQerrorMessage(cq->dbh));
889     return -1;
890   }
891
892   cq->dbh = PQconnectdb(dsn);
893   if(!cq->dbh) return -1;
894   if(PQstatus(cq->dbh) != CONNECTION_OK) {
895     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
896           dsn, PQerrorMessage(cq->dbh));
897     return -1;
898   }
899   if(stratcon_database_post_connect(cq)) return -1;
900   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
901   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
902         dsn, PQerrorMessage(cq->dbh));
903   return -1;
904 }
905 static int
906 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
907                              const char *name) {
908   int rv = -1;
909   PGresult *res;
910   char cmd[128];
911   strlcpy(cmd, p, sizeof(cmd));
912   strlcat(cmd, name, sizeof(cmd));
913   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
914   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
915   PQclear(res);
916   return rv;
917 }
918 static int
919 stratcon_ingest_do(conn_q *cq, const char *cmd) {
920   PGresult *res;
921   int rv = -1;
922   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
923   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
924   PQclear(res);
925   return rv;
926 }
927 #define BUSTED(cq) do { \
928   PQfinish((cq)->dbh); \
929   (cq)->dbh = NULL; \
930   goto full_monty; \
931 } while(0)
932 #define SAVEPOINT(name) do { \
933   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
934   last_sp = current; \
935 } while(0)
936 #define ROLLBACK_TO_SAVEPOINT(name) do { \
937   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
938     BUSTED(cq); \
939   last_sp = NULL; \
940 } while(0)
941 #define RELEASE_SAVEPOINT(name) do { \
942   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
943     BUSTED(cq); \
944   last_sp = NULL; \
945 } while(0)
946
947 int
948 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
949                               struct timeval *now) {
950   ds_rt_detail *dsjd = closure;
951   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
952   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
953
954   assert(dsjd->rt);
955   stratcon_ingest_find(dsjd);
956   if(dsjd->completion_event)
957     eventer_add(dsjd->completion_event);
958
959   free_params((ds_single_detail *)dsjd);
960   free(dsjd);
961   return 0;
962 }
963 static void
964 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
965                                 eventer_t completion) {
966   eventer_t e;
967   conn_pool *cpool;
968   ds_rt_detail *rtdetail;
969
970   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
971   rtdetail = calloc(1, sizeof(*rtdetail));
972   rtdetail->rt = rt;
973   rtdetail->completion_event = completion;
974   e = eventer_alloc();
975   e->mask = EVENTER_ASYNCH;
976   e->callback = stratcon_ingest_asynch_lookup;
977   e->closure = rtdetail;
978   eventer_add_asynch(cpool->jobq, e);
979 }
980 static const char *
981 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
982   void *vinfo;
983   char *dsn = NULL, *fqdn = NULL;
984   int found = 0;
985   storagenode_info *info = NULL;
986   pthread_mutex_lock(&storagenode_to_info_cache_lock);
987   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
988                         &vinfo)) {
989     found = 1;
990     info = vinfo;
991   }
992   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
993   if(found) {
994     if(fqdn_out) *fqdn_out = info->fqdn;
995     return info->dsn;
996   }
997
998   if(!found && can_use_db) {
999     ds_single_detail *d;
1000     conn_q *cq;
1001     int row_count;
1002     /* Look it up and store it */
1003     d = calloc(1, sizeof(*d));
1004     cq = get_conn_q_for_metanode();
1005     GET_QUERY(find_storage);
1006     DECLARE_PARAM_INT(id);
1007     PG_EXEC(find_storage);
1008     row_count = PQntuples(d->res);
1009     if(row_count) {
1010       PG_GET_STR_COL(dsn, 0, "dsn");
1011       PG_GET_STR_COL(fqdn, 0, "fqdn");
1012       fqdn = fqdn ? strdup(fqdn) : NULL;
1013       dsn = dsn ? strdup(dsn) : NULL;
1014     }
1015     PQclear(d->res);
1016    bad_row:
1017     free_params(d);
1018     free(d);
1019     release_conn_q(cq);
1020   }
1021   if(fqdn) {
1022     info = calloc(1, sizeof(*info));
1023     info->fqdn = fqdn;
1024     if(fqdn_out) *fqdn_out = info->fqdn;
1025     info->dsn = dsn;
1026     info->storagenode_id = id;
1027     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1028     noit_hash_store(&storagenode_to_info_cache,
1029                     (void *)&info->storagenode_id, sizeof(int), info);
1030     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1031   }
1032   return info ? info->dsn : NULL;
1033 }
1034 static ds_line_detail *
1035 build_insert_batch(pg_interim_journal_t *ij) {
1036   int rv;
1037   off_t len;
1038   const char *buff, *cp, *lcp;
1039   struct stat st;
1040   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1041
1042   if(ij->fd < 0) {
1043     ij->fd = open(ij->filename, O_RDONLY);
1044     if(ij->fd < 0) {
1045       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1046             ij->filename, strerror(errno));
1047       assert(ij->fd >= 0);
1048     }
1049   }
1050   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1051   if(rv == -1) {
1052       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1053             ij->filename, strerror(errno));
1054     assert(rv != -1);
1055   }
1056   len = st.st_size;
1057   if(len > 0) {
1058     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1059     if(buff == (void *)-1) {
1060       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1061             ij->filename, strerror(errno));
1062       assert(buff != (void *)-1);
1063     }
1064     lcp = buff;
1065     while(lcp < (buff + len) &&
1066           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1067       next = calloc(1, sizeof(*next));
1068       next->data = malloc(cp - lcp + 1);
1069       memcpy(next->data, lcp, cp - lcp);
1070       next->data[cp - lcp] = '\0';
1071       if(!head) head = next;
1072       if(last) last->next = next;
1073       last = next;
1074       lcp = cp + 1;
1075     }
1076     munmap((void *)buff, len);
1077   }
1078   close(ij->fd);
1079   return head;
1080 }
1081 static void
1082 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1083   unlink(ij->filename);
1084   if(ij->filename) free(ij->filename);
1085   if(ij->remote_str) free(ij->remote_str);
1086   if(ij->remote_cn) free(ij->remote_cn);
1087   if(ij->fqdn) free(ij->fqdn);
1088   free(ij);
1089 }
1090 static int
1091 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1092                                struct timeval *now) {
1093   int i, total, success, sp_total, sp_success;
1094   pg_interim_journal_t *ij;
1095   ds_line_detail *head = NULL, *current, *last_sp;
1096   const char *dsn;
1097   conn_q *cq;
1098   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1099   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1100
1101   ij = closure;
1102   if(ij->fqdn == NULL) {
1103     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1104     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1105   }
1106   else {
1107     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1108   }
1109   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1110                              ij->fqdn, dsn);
1111   noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1112         ij->remote_str, ij->remote_cn, ij->fqdn);
1113  full_monty:
1114   /* Make sure we have a connection */
1115   i = 1;
1116   while(stratcon_database_connect(cq)) {
1117     noitL(noit_error, "Error connecting to database: %s\n",
1118           ij->fqdn ? ij->fqdn : "(null)");
1119     sleep(i);
1120     i *= 2;
1121     i = MIN(i, 16);
1122   }
1123
1124   if(head == NULL) head = build_insert_batch(ij);
1125   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1126         ij->remote_str ? ij->remote_str : "(null)",
1127         ij->remote_cn ? ij->remote_cn : "(null)",
1128         ij->fqdn ? ij->fqdn : "(null)");
1129   current = head;
1130   last_sp = NULL;
1131   total = success = sp_total = sp_success = 0;
1132   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1133   while(current) {
1134     execute_outcome_t rv;
1135     if(current->data) {
1136       if(!last_sp) {
1137         SAVEPOINT("batch");
1138         sp_success = success;
1139         sp_total = total;
1140       }
1141  
1142       if(current->problematic) {
1143         RELEASE_SAVEPOINT("batch");
1144         current = current->next;
1145         total++;
1146         continue;
1147       }
1148       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1149                                    current);
1150       switch(rv) {
1151         case DS_EXEC_SUCCESS:
1152           total++;
1153           success++;
1154           current = current->next;
1155           break;
1156         case DS_EXEC_ROW_FAILED:
1157           /* rollback to savepoint, mark this record as bad and start again */
1158           if(current->data[0] != 'n')
1159             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1160           current->problematic = 1;
1161           current = last_sp;
1162           success = sp_success;
1163           total = sp_total;
1164           ROLLBACK_TO_SAVEPOINT("batch");
1165           break;
1166         case DS_EXEC_TXN_FAILED:
1167           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1168           BUSTED(cq);
1169       }
1170     }
1171   }
1172   if(last_sp) RELEASE_SAVEPOINT("batch");
1173   if(stratcon_ingest_do(cq, "COMMIT")) {
1174     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1175     BUSTED(cq);
1176   }
1177   /* Cleanup the mess */
1178   while(head) {
1179     ds_line_detail *tofree;
1180     tofree = head;
1181     head = head->next;
1182     if(tofree->data) free(tofree->data);
1183     free_params((ds_single_detail *)tofree);
1184     free(tofree);
1185   }
1186   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1187         ij->remote_str ? ij->remote_str : "(null)",
1188         ij->remote_cn ? ij->remote_cn : "(null)",
1189         ij->fqdn ? ij->fqdn : "(null)", success, total);
1190   pg_interim_journal_remove(ij);
1191   release_conn_q(cq);
1192   return 0;
1193 }
1194 static int
1195 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1196                           int *sid_out, int *storagenode_id_out,
1197                           const char **remote_cn_out,
1198                           const char **fqdn_out, const char **dsn_out) {
1199   /* only called from the main thread -- no safety issues */
1200   void *vuuidinfo, *vinfo;
1201   uuid_info *uuidinfo;
1202   storagenode_info *info = NULL;
1203   char *fqdn = NULL;
1204   char *dsn = NULL;
1205   char *new_remote_cn = NULL;
1206   int storagenode_id = 0, sid = 0;
1207   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1208                          &vuuidinfo)) {
1209     int row_count = 0;
1210     char *tmpint;
1211     ds_single_detail *d;
1212     conn_q *cq;
1213
1214     /* We can't do a database lookup without the remote_cn */
1215     if(!remote_cn) {
1216       if(stratcon_datastore_get_enabled()) {
1217         /* We have an authoritatively maintained cache, we don't do lookups */
1218         return -1;
1219       }
1220       else
1221         remote_cn = "[[null]]";
1222     }
1223
1224     d = calloc(1, sizeof(*d));
1225     cq = get_conn_q_for_metanode();
1226     if(stratcon_database_connect(cq) == 0) {
1227       /* Blocking call to service the cache miss */
1228       GET_QUERY(check_map);
1229       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1230       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1231       PG_EXEC(check_map);
1232       row_count = PQntuples(d->res);
1233       if(row_count != 1) {
1234         PQclear(d->res);
1235         goto bad_row;
1236       }
1237       PG_GET_STR_COL(tmpint, 0, "sid");
1238       if(!tmpint) {
1239         row_count = 0;
1240         PQclear(d->res);
1241         goto bad_row;
1242       }
1243       sid = atoi(tmpint);
1244       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1245       if(tmpint) storagenode_id = atoi(tmpint);
1246       PG_GET_STR_COL(fqdn, 0, "fqdn");
1247       PG_GET_STR_COL(dsn, 0, "dsn");
1248       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1249       fqdn = fqdn ? strdup(fqdn) : NULL;
1250       dsn = dsn ? strdup(dsn) : NULL;
1251       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1252       PQclear(d->res);
1253     }
1254    bad_row:
1255     free_params((ds_single_detail *)d);
1256     free(d);
1257     release_conn_q(cq);
1258     if(row_count != 1) {
1259       return -1;
1260     }
1261     /* Place in cache */
1262     uuidinfo = calloc(1, sizeof(*uuidinfo));
1263     uuidinfo->sid = sid;
1264     uuidinfo->uuid_str = strdup(uuid_str);
1265     uuidinfo->storagenode_id = storagenode_id;
1266     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1267     noit_hash_store(&uuid_to_info_cache,
1268                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1269     /* Also, we may have just witnessed a new storage node, store it */
1270     if(storagenode_id) {
1271       int needs_free = 0;
1272       info = calloc(1, sizeof(*info));
1273       info->storagenode_id = storagenode_id;
1274       info->dsn = dsn ? strdup(dsn) : NULL;
1275       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1276       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1277       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1278                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1279         /* hack to save memory -- we *never* remove from these caches,
1280            so we can use the same fqdn value in the above cache for the key
1281            in the cache below -- (no strdup) */
1282         noit_hash_store(&storagenode_to_info_cache,
1283                         (void *)&info->storagenode_id, sizeof(int), info);
1284       }
1285       else needs_free = 1;
1286       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1287       if(needs_free) {
1288         if(info->dsn) free(info->dsn);
1289         if(info->fqdn) free(info->fqdn);
1290         free(info);
1291       }
1292     }
1293   }
1294   else
1295     uuidinfo = vuuidinfo;
1296
1297   if(uuidinfo && uuidinfo->storagenode_id) {
1298     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1299       /* we don't have dsn and we actually want it */
1300       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1301       if(noit_hash_retrieve(&storagenode_to_info_cache,
1302                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1303                             &vinfo))
1304         info = vinfo;
1305       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1306     }
1307   }
1308
1309   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1310   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1311   assert(uuidinfo);
1312   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1313   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1314   if(sid_out) *sid_out = uuidinfo->sid;
1315   if(fqdn) free(fqdn);
1316   if(dsn) free(dsn);
1317   if(new_remote_cn) free(new_remote_cn);
1318   return 0;
1319 }
1320 static int
1321 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1322   char uuid_str[UUID_STR_LEN+1];
1323   int sid = 0;
1324   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1325   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1326   return sid;
1327 }
1328
1329 static int
1330 stratcon_ingest_saveconfig() {
1331   int rv = -1;
1332   char *buff;
1333   ds_single_detail _d = { 0 }, *d = &_d;
1334   conn_q *cq;
1335   char ipv4_str[32];
1336   struct in_addr r, l;
1337
1338   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1339   memset(&l, 0, sizeof(l));
1340   noit_getip_ipv4(r, &l);
1341   /* Ignore the error.. what are we going to do anyway */
1342   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1343     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1344
1345   cq = get_conn_q_for_metanode();
1346
1347   if(stratcon_database_connect(cq) == 0) {
1348     char time_as_str[20];
1349     size_t len;
1350     buff = noit_conf_xml_in_mem(&len);
1351     if(!buff) goto bad_row;
1352
1353     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1354     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1355     DECLARE_PARAM_STR("", 0);
1356     DECLARE_PARAM_STR("stratcond", 9);
1357     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1358     DECLARE_PARAM_STR(buff, len);
1359     free(buff);
1360
1361     GET_QUERY(config_insert);
1362     PG_EXEC(config_insert);
1363     PQclear(d->res);
1364     rv = 0;
1365
1366     bad_row:
1367       free_params(d);
1368   }
1369   release_conn_q(cq);
1370   return rv;
1371 }
1372
1373 static void
1374 stratcon_ingest_launch_file_ingestion(const char *path,
1375                                       const char *remote_str,
1376                                       const char *remote_cn,
1377                                       const char *id_str) {
1378   pg_interim_journal_t *ij;
1379   eventer_t ingest;
1380
1381   ij = calloc(1, sizeof(*ij));
1382   ij->fd = open(path, O_RDONLY);
1383   if(ij->fd < 0) {
1384     noitL(noit_error, "cannot open journal '%s': %s\n",
1385           path, strerror(errno));
1386     free(ij);
1387     return;
1388   }
1389   close(ij->fd);
1390   ij->fd = -1;
1391   ij->filename = strdup(path);
1392   ij->remote_str = strdup(remote_str);
1393   ij->remote_cn = strdup(remote_cn);
1394   ij->storagenode_id = atoi(id_str);
1395   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1396                                        ij->fqdn);
1397   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1398   ingest = eventer_alloc();
1399   ingest->mask = EVENTER_ASYNCH;
1400   ingest->callback = stratcon_ingest_asynch_execute;
1401   ingest->closure = ij;
1402   eventer_add_asynch(ij->cpool->jobq, ingest);
1403 }
1404 static void
1405 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third) {
1406   char path[PATH_MAX];
1407   DIR *root;
1408   struct dirent *de, *entry;
1409   int i = 0, cnt = 0;
1410   char **entries;
1411   int size = 0;
1412
1413   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1414            first ? "/" : "", first ? first : "",
1415            second ? "/" : "", second ? second : "",
1416            third ? "/" : "", third ? third : "");
1417 #ifdef _PC_NAME_MAX
1418   size = pathconf(path, _PC_NAME_MAX);
1419 #endif
1420   size = MIN(size, PATH_MAX + 128);
1421   de = alloca(size);
1422   root = opendir(path);
1423   if(!root) return;
1424   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1425   closedir(root);
1426   root = opendir(path);
1427   if(!root) return;
1428   entries = malloc(sizeof(*entries) * cnt);
1429   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1430     if(i < cnt) {
1431       entries[i++] = strdup(entry->d_name);
1432     }
1433   }
1434   closedir(root);
1435   cnt = i; /* could have changed, directories are fickle */
1436   qsort(entries, i, sizeof(*entries),
1437         (int (*)(const void *, const void *))strcasecmp);
1438   for(i=0; i<cnt; i++) {
1439     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1440     noitL(ds_deb, "Processing L%d entry '%s'\n",
1441           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1442     if(!first)
1443       stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL);
1444     else if(!second)
1445       stratcon_ingest_sweep_journals_int(first, entries[i], NULL);
1446     else if(!third)
1447       stratcon_ingest_sweep_journals_int(first, second, entries[i]);
1448     else if(strlen(entries[i]) == 16) {
1449       char fullpath[PATH_MAX];
1450       snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath,
1451                first,second,third,entries[i]);
1452       stratcon_ingest_launch_file_ingestion(fullpath,first,second,third);
1453     }
1454   }
1455   for(i=0; i<cnt; i++)
1456     free(entries[i]);
1457   free(entries);
1458 }
1459 static void
1460 stratcon_ingest_sweep_journals() {
1461   stratcon_ingest_sweep_journals_int(NULL,NULL,NULL);
1462 }
1463
1464 int
1465 stratcon_ingest_all_storagenode_info() {
1466   int i, cnt = 0;
1467   ds_single_detail _d = { 0 }, *d = &_d;
1468   conn_q *cq;
1469   cq = get_conn_q_for_metanode();
1470
1471   while(stratcon_database_connect(cq)) {
1472     noitL(noit_error, "Error connecting to database\n");
1473     sleep(1);
1474   }
1475
1476   GET_QUERY(all_storage);
1477   PG_EXEC(all_storage);
1478   cnt = PQntuples(d->res);
1479   for(i=0; i<cnt; i++) {
1480     void *vinfo;
1481     char *tmpint, *fqdn, *dsn;
1482     int storagenode_id;
1483     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1484     storagenode_id = atoi(tmpint);
1485     PG_GET_STR_COL(fqdn, i, "fqdn");
1486     PG_GET_STR_COL(dsn, i, "dsn");
1487     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1488     storagenode_id = tmpint ? atoi(tmpint) : 0;
1489
1490     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1491                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1492       storagenode_info *info;
1493       info = calloc(1, sizeof(*info));
1494       info->storagenode_id = storagenode_id;
1495       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1496       info->dsn = dsn ? strdup(dsn) : NULL;
1497       noit_hash_store(&storagenode_to_info_cache,
1498                       (void *)&info->storagenode_id, sizeof(int), info);
1499       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1500             info->storagenode_id,
1501             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1502     }
1503   }
1504   PQclear(d->res);
1505  bad_row:
1506   free_params(d);
1507
1508   release_conn_q(cq);
1509   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1510   return cnt;
1511 }
1512 int
1513 stratcon_ingest_all_check_info() {
1514   int i, cnt, loaded = 0;
1515   ds_single_detail _d = { 0 }, *d = &_d;
1516   conn_q *cq;
1517   cq = get_conn_q_for_metanode();
1518
1519   while(stratcon_database_connect(cq)) {
1520     noitL(noit_error, "Error connecting to database\n");
1521     sleep(1);
1522   }
1523
1524   GET_QUERY(check_mapall);
1525   PG_EXEC(check_mapall);
1526   cnt = PQntuples(d->res);
1527   for(i=0; i<cnt; i++) {
1528     void *vinfo;
1529     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1530     int sid, storagenode_id;
1531     uuid_info *uuidinfo;
1532     PG_GET_STR_COL(uuid_str, i, "id");
1533     if(!uuid_str) continue;
1534     PG_GET_STR_COL(tmpint, i, "sid");
1535     if(!tmpint) continue;
1536     sid = atoi(tmpint);
1537     PG_GET_STR_COL(fqdn, i, "fqdn");
1538     PG_GET_STR_COL(dsn, i, "dsn");
1539     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1540     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1541     storagenode_id = tmpint ? atoi(tmpint) : 0;
1542
1543     uuidinfo = calloc(1, sizeof(*uuidinfo));
1544     uuidinfo->uuid_str = strdup(uuid_str);
1545     uuidinfo->remote_cn = strdup(remote_cn);
1546     uuidinfo->storagenode_id = storagenode_id;
1547     uuidinfo->sid = sid;
1548     noit_hash_store(&uuid_to_info_cache,
1549                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1550     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1551           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1552     loaded++;
1553     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1554                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1555       storagenode_info *info;
1556       info = calloc(1, sizeof(*info));
1557       info->storagenode_id = storagenode_id;
1558       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1559       info->dsn = dsn ? strdup(dsn) : NULL;
1560       noit_hash_store(&storagenode_to_info_cache,
1561                       (void *)&info->storagenode_id, sizeof(int), info);
1562       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1563             info->storagenode_id,
1564             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1565     }
1566   }
1567   PQclear(d->res);
1568  bad_row:
1569   free_params(d);
1570
1571   release_conn_q(cq);
1572   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1573   return loaded;
1574 }
1575
1576 static int
1577 rest_get_noit_config(noit_http_rest_closure_t *restc,
1578                      int npats, char **pats) {
1579   noit_http_session_ctx *ctx = restc->http_ctx;
1580   ds_single_detail *d;
1581   int row_count = 0;
1582   const char *xml = NULL;
1583   conn_q *cq = NULL;
1584
1585   if(npats != 0) {
1586     noit_http_response_server_error(ctx, "text/xml");
1587     noit_http_response_end(ctx);
1588     return 0;
1589   }
1590   d = calloc(1, sizeof(*d));
1591   GET_QUERY(config_get);
1592   cq = get_conn_q_for_metanode();
1593   if(!cq) {
1594     noit_http_response_server_error(ctx, "text/xml");
1595     goto bad_row;
1596   }
1597
1598   DECLARE_PARAM_STR(restc->remote_cn,
1599                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1600   PG_EXEC(config_get);
1601   row_count = PQntuples(d->res);
1602   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1603
1604   if(xml == NULL) {
1605     char buff[1024];
1606     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1607                                  "<row_count>%d</row_count></error>\n",
1608              restc->remote_cn, row_count);
1609     noit_http_response_append(ctx, buff, strlen(buff));
1610     noit_http_response_not_found(ctx, "text/xml");
1611   }
1612   else {
1613     noit_http_response_append(ctx, xml, strlen(xml));
1614     noit_http_response_ok(ctx, "text/xml");
1615   }
1616  bad_row:
1617   free_params((ds_single_detail *)d);
1618   d->nparams = 0;
1619   if(cq) release_conn_q(cq);
1620
1621   noit_http_response_end(ctx);
1622   return 0;
1623 }
1624
1625 static ingestor_api_t postgres_ingestor_api = {
1626   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1627   .iep_check_preload = stratcon_ingest_iep_check_preload,
1628   .storage_node_lookup = storage_node_quick_lookup,
1629   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1630   .get_noit_config = NULL,
1631   .save_config = stratcon_ingest_saveconfig
1632 };
1633
1634 static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
1635   return 0;
1636 }
1637 static int postgres_ingestor_onload(noit_image_t *self) {
1638   return 0;
1639 }
1640 static int postgres_ingestor_init(noit_module_generic_t *self) {
1641   pthread_mutex_init(&ds_conns_lock, NULL);
1642   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1643   ds_err = noit_log_stream_find("error/datastore");
1644   ds_deb = noit_log_stream_find("debug/datastore");
1645   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1646   ingest_err = noit_log_stream_find("error/ingest");
1647   if(!ds_err) ds_err = noit_error;
1648   if(!ingest_err) ingest_err = noit_error;
1649   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1650                            &basejpath)) {
1651     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1652     exit(-1);
1653   }
1654   stratcon_ingest_all_check_info();
1655   stratcon_ingest_all_storagenode_info();
1656   stratcon_ingest_sweep_journals();
1657   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1658 }
1659
1660 noit_module_generic_t postgres_ingestor = {
1661   {
1662     NOIT_GENERIC_MAGIC,
1663     NOIT_GENERIC_ABI_VERSION,
1664     "postgres_ingestor",
1665     "postgres drive for data ingestion",
1666     "",
1667     postgres_ingestor_onload,
1668   }, 
1669   postgres_ingestor_config,
1670   postgres_ingestor_init
1671 };
Note: See TracBrowser for help on using the browser.