root/src/modules/postgres_ingestor.c

Revision e2ae3eed2723823430eb1dcf4bfb3ef415c86289, 52.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

docs for stomp_driver and postgres_ingestor

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "stratcon_datastore.h"
42 #include "stratcon_realtime_http.h"
43 #include "stratcon_iep.h"
44 #include "noit_conf.h"
45 #include "noit_check.h"
46 #include "noit_rest.h"
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <netinet/in.h>
50 #include <sys/un.h>
51 #include <dirent.h>
52 #include <arpa/inet.h>
53 #include <sys/mman.h>
54 #include <libpq-fe.h>
55 #include <zlib.h>
56 #include <assert.h>
57 #include <errno.h>
58 #include "postgres_ingestor.xmlh"
59
60 #define DECL_STMT(codename,confname) \
61 static char *codename = NULL; \
62 static const char *codename##_conf = "/stratcon/database/statements/" #confname
63
64 DECL_STMT(storage_post_connect, storagepostconnect);
65 DECL_STMT(metanode_post_connect, metanodepostconnect);
66 DECL_STMT(find_storage, findstoragenode);
67 DECL_STMT(all_storage, allstoragenodes);
68 DECL_STMT(check_map, mapchecktostoragenode);
69 DECL_STMT(check_mapall, mapallchecks);
70 DECL_STMT(check_loadall, allchecks);
71 DECL_STMT(check_find, findcheck);
72 DECL_STMT(check_insert, check);
73 DECL_STMT(status_insert, status);
74 DECL_STMT(metric_insert_numeric, metric_numeric);
75 DECL_STMT(metric_insert_text, metric_text);
76 DECL_STMT(config_insert, config);
77 DECL_STMT(config_get, findconfig);
78
79 static noit_log_stream_t ds_err = NULL;
80 static noit_log_stream_t ds_deb = NULL;
81 static noit_log_stream_t ds_pool_deb = NULL;
82 static noit_log_stream_t ingest_err = NULL;
83
84 #define GET_QUERY(a) do { \
85   if(a == NULL) \
86     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
87       goto bad_row; \
88 } while(0)
89
90 struct conn_q;
91
92 typedef struct {
93   char            *queue_name; /* the key fqdn+remote_sn */
94   eventer_jobq_t  *jobq;
95   struct conn_q   *head;
96   pthread_mutex_t  lock;
97   pthread_cond_t   cv;
98   int              ttl;
99   int              in_pool;
100   int              outstanding;
101   int              max_allocated;
102   int              max_in_pool;
103 } conn_pool;
104 typedef struct conn_q {
105   time_t           last_use;
106   char            *dsn;        /* Pg connect string */
107   char            *remote_str; /* the IP of the noit*/
108   char            *remote_cn;  /* the Cert CN of the noit */
109   char            *fqdn;       /* the fqdn of the storage node */
110   conn_pool       *pool;
111   struct conn_q   *next;
112   /* Postgres specific stuff */
113   PGconn          *dbh;
114 } conn_q;
115
116
117 #define MAX_PARAMS 8
118 #define POSTGRES_PARTS \
119   PGresult *res; \
120   int rv; \
121   time_t whence; \
122   int nparams; \
123   int metric_type; \
124   char *paramValues[MAX_PARAMS]; \
125   int paramLengths[MAX_PARAMS]; \
126   int paramFormats[MAX_PARAMS]; \
127   int paramAllocd[MAX_PARAMS];
128
129 typedef struct ds_single_detail {
130   POSTGRES_PARTS
131 } ds_single_detail;
132 typedef struct {
133   /* Postgres specific stuff */
134   POSTGRES_PARTS
135   struct realtime_tracker *rt;
136   conn_q *cq; /* connection on which to perform this job */
137   eventer_t completion_event; /* This event should be registered if non NULL */
138 } ds_rt_detail;
139 typedef struct ds_line_detail {
140   /* Postgres specific stuff */
141   POSTGRES_PARTS
142   char *data;
143   int problematic;
144   struct ds_line_detail *next;
145 } ds_line_detail;
146
147 typedef struct {
148   char *remote_str;
149   char *remote_cn;
150   char *fqdn;
151   int storagenode_id;
152   int fd;
153   char *filename;
154   conn_pool *cpool;
155 } pg_interim_journal_t;
156
157 static int stratcon_database_connect(conn_q *cq);
158 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
159 static int storage_node_quick_lookup(const char *uuid_str,
160                                      const char *remote_cn,
161                                      int *sid_out, int *storagenode_id_out,
162                                      const char **remote_cn_out,
163                                      const char **fqdn_out,
164                                      const char **dsn_out);
165
166 static void
167 free_params(ds_single_detail *d) {
168   int i;
169   for(i=0; i<d->nparams; i++)
170     if(d->paramAllocd[i] && d->paramValues[i])
171       free(d->paramValues[i]);
172 }
173
174 static char *basejpath = NULL;
175 static pthread_mutex_t ds_conns_lock;
176 static noit_hash_table ds_conns;
177
178 /* the fqdn cache needs to be thread safe */
179 typedef struct {
180   char *uuid_str;
181   char *remote_cn;
182   int storagenode_id;
183   int sid;
184 } uuid_info;
185 typedef struct {
186   int storagenode_id;
187   char *fqdn;
188   char *dsn;
189 } storagenode_info;
190 noit_hash_table uuid_to_info_cache;
191 pthread_mutex_t storagenode_to_info_cache_lock;
192 noit_hash_table storagenode_to_info_cache;
193
194 /* Thread-safe connection pools */
195
196 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
197 static void
198 release_conn_q_forceable(conn_q *cq, int forcefree) {
199   int putback = 0;
200   cq->last_use = time(NULL);
201   pthread_mutex_lock(&cq->pool->lock);
202   cq->pool->outstanding--;
203   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
204     putback = 1;
205     cq->next = cq->pool->head;
206     cq->pool->head = cq;
207     cq->pool->in_pool++;
208   }
209   pthread_mutex_unlock(&cq->pool->lock);
210   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
211         putback ? "to pool" : "and destroy", cq->pool->queue_name);
212   pthread_cond_signal(&cq->pool->cv);
213   if(putback) return;
214
215   /* Not put back, release it */
216   if(cq->dbh) PQfinish(cq->dbh);
217   if(cq->remote_str) free(cq->remote_str);
218   if(cq->remote_cn) free(cq->remote_cn);
219   if(cq->fqdn) free(cq->fqdn);
220   if(cq->dsn) free(cq->dsn);
221   free(cq);
222 }
223 static void
224 ttl_purge_conn_pool(conn_pool *pool) {
225   int old_cnt, new_cnt;
226   time_t now = time(NULL);
227   conn_q *cq, *prev = NULL, *iter;
228   /* because we always replace on the head and update the last_use time when
229      doing so, we know they are ordered LRU on the end.  So, once we hit an
230      old one, we know all the others are old too.
231    */
232   if(!pool->head) return; /* hack short circuit for no locks */
233   pthread_mutex_lock(&pool->lock);
234   old_cnt = pool->in_pool;
235   cq = pool->head;
236   while(cq) {
237     if(cq->last_use + cq->pool->ttl < now) {
238       if(prev) prev->next = NULL;
239       else pool->head = NULL;
240       break;
241     }
242     prev = cq;
243     cq = cq->next;
244   }
245   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
246   /* Fix accounting */
247   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
248   new_cnt = pool->in_pool;
249   pthread_mutex_unlock(&pool->lock);
250
251   /* Force release these without holding the lock */
252   while(cq) {
253     cq = cq->next;
254     release_conn_q_forceable(cq, 1);
255   }
256   if(old_cnt != new_cnt)
257     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
258           pool->queue_name);
259 }
260 static void
261 release_conn_q(conn_q *cq) {
262   ttl_purge_conn_pool(cq->pool);
263   release_conn_q_forceable(cq, 0);
264 }
265 static conn_pool *
266 get_conn_pool_for_remote(const char *remote_str,
267                          const char *remote_cn, const char *fqdn) {
268   void *vcpool;
269   conn_pool *cpool = NULL;
270   char queue_name[256] = "datastore_";
271   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
272            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
273            fqdn ? fqdn : "default",
274            remote_cn ? remote_cn : "default");
275   pthread_mutex_lock(&ds_conns_lock);
276   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
277                         strlen(queue_name), &vcpool))
278     cpool = vcpool;
279   pthread_mutex_unlock(&ds_conns_lock);
280   if(!cpool) {
281     vcpool = cpool = calloc(1, sizeof(*cpool));
282     cpool->queue_name = strdup(queue_name);
283     pthread_mutex_init(&cpool->lock, NULL);
284     pthread_cond_init(&cpool->cv, NULL);
285     cpool->in_pool = 0;
286     cpool->outstanding = 0;
287     cpool->max_in_pool = 1;
288     cpool->max_allocated = 1;
289     pthread_mutex_lock(&ds_conns_lock);
290     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
291                         cpool)) {
292       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
293                          strlen(queue_name), &vcpool);
294     }
295     pthread_mutex_unlock(&ds_conns_lock);
296     if(vcpool != cpool) {
297       /* someone beat us to it */
298       free(cpool->queue_name);
299       pthread_mutex_destroy(&cpool->lock);
300       pthread_cond_destroy(&cpool->cv);
301       free(cpool);
302     }
303     else {
304       int i;
305       /* Our job to setup the pool */
306       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
307       eventer_jobq_init(cpool->jobq, queue_name);
308       cpool->jobq->backq = eventer_default_backq();
309       /* Add one thread */
310       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
311         eventer_jobq_increase_concurrency(cpool->jobq);
312     }
313     cpool = vcpool;
314   }
315   return cpool;
316 }
317 static conn_q *
318 get_conn_q_for_remote(const char *remote_str,
319                       const char *remote_cn, const char *fqdn,
320                       const char *dsn) {
321   conn_pool *cpool;
322   conn_q *cq;
323   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
324   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
325         cpool->queue_name);
326   pthread_mutex_lock(&cpool->lock);
327  again:
328   if(cpool->head) {
329     assert(cpool->in_pool > 0);
330     cq = cpool->head;
331     cpool->head = cq->next;
332     cpool->in_pool--;
333     cpool->outstanding++;
334     cq->next = NULL;
335     pthread_mutex_unlock(&cpool->lock);
336     return cq;
337   }
338   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
339     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
340           (void *)pthread_self(), cpool->queue_name);
341     pthread_cond_wait(&cpool->cv, &cpool->lock);
342     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
343           (void *)pthread_self(), cpool->queue_name);
344     goto again;
345   }
346   else {
347     cpool->outstanding++;
348     pthread_mutex_unlock(&cpool->lock);
349   }
350  
351   cq = calloc(1, sizeof(*cq));
352   cq->pool = cpool;
353   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
354   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
355   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
356   cq->dsn = dsn ? strdup(dsn) : NULL;
357   return cq;
358 }
359 static conn_q *
360 get_conn_q_for_metanode() {
361   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
362 }
363
364 typedef enum {
365   DS_EXEC_SUCCESS = 0,
366   DS_EXEC_ROW_FAILED = 1,
367   DS_EXEC_TXN_FAILED = 2,
368 } execute_outcome_t;
369
370 #define DECLARE_PARAM_STR(str, len) do { \
371   d->paramValues[d->nparams] = noit__strndup(str, len); \
372   d->paramLengths[d->nparams] = len; \
373   d->paramFormats[d->nparams] = 0; \
374   d->paramAllocd[d->nparams] = 1; \
375   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
376     free(d->paramValues[d->nparams]); \
377     d->paramValues[d->nparams] = NULL; \
378     d->paramLengths[d->nparams] = 0; \
379     d->paramAllocd[d->nparams] = 0; \
380   } \
381   d->nparams++; \
382 } while(0)
383 #define DECLARE_PARAM_INT(i) do { \
384   int buffer__len; \
385   char buffer__[32]; \
386   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
387   buffer__len = strlen(buffer__); \
388   DECLARE_PARAM_STR(buffer__, buffer__len); \
389 } while(0)
390
391 #define PG_GET_STR_COL(dest, row, name) do { \
392   int colnum = PQfnumber(d->res, name); \
393   dest = NULL; \
394   if (colnum >= 0) \
395     dest = PQgetisnull(d->res, row, colnum) \
396          ? NULL : PQgetvalue(d->res, row, colnum); \
397 } while(0)
398
399 #define PG_EXEC(cmd) do { \
400   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
401                         (const char * const *)d->paramValues, \
402                         d->paramLengths, d->paramFormats, 0); \
403   d->rv = PQresultStatus(d->res); \
404   if(d->rv != PGRES_COMMAND_OK && \
405      d->rv != PGRES_TUPLES_OK) { \
406     const char *pgerr = PQresultErrorMessage(d->res); \
407     const char *pgerr_end = strchr(pgerr, '\n'); \
408     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
409     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
410           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
411           (int)(pgerr_end - pgerr), pgerr); \
412     PQclear(d->res); \
413     goto bad_row; \
414   } \
415 } while(0)
416
417 #define PG_TM_EXEC(cmd, whence) do { \
418   time_t __w = whence; \
419   char cmdbuf[4096]; \
420   struct tm tbuf, *tm; \
421   tm = gmtime_r(&__w, &tbuf); \
422   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
423   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
424                         (const char * const *)d->paramValues, \
425                         d->paramLengths, d->paramFormats, 0); \
426   d->rv = PQresultStatus(d->res); \
427   if(d->rv != PGRES_COMMAND_OK && \
428      d->rv != PGRES_TUPLES_OK) { \
429     const char *pgerr = PQresultErrorMessage(d->res); \
430     const char *pgerr_end = strchr(pgerr, '\n'); \
431     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
432     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
433           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
434           (long long unsigned)whence); \
435     PQclear(d->res); \
436     goto bad_row; \
437   } \
438 } while(0)
439
440 static void *
441 stratcon_ingest_check_loadall(void *vsn) {
442   storagenode_info *sn = vsn;
443   ds_single_detail *d;
444   int i, row_count = 0, good = 0;
445   char buff[1024];
446   conn_q *cq = NULL;
447
448   d = calloc(1, sizeof(*d));
449   GET_QUERY(check_loadall);
450   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
451   i = 0;
452   while(stratcon_database_connect(cq)) {
453     if(i++ > 4) {
454       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
455       release_conn_q(cq);
456       return (void *)(vpsized_int)good;
457     }
458     sleep(1);
459   }
460   PG_EXEC(check_loadall);
461   row_count = PQntuples(d->res);
462  
463   for(i=0; i<row_count; i++) {
464     int rv;
465     int8_t family;
466     struct sockaddr *sin;
467     struct sockaddr_in sin4 = { .sin_family = AF_INET };
468     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
469     char *remote, *id, *target, *module, *name;
470     PG_GET_STR_COL(remote, i, "remote_address");
471     PG_GET_STR_COL(id, i, "id");
472     PG_GET_STR_COL(target, i, "target");
473     PG_GET_STR_COL(module, i, "module");
474     PG_GET_STR_COL(name, i, "name");
475     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
476
477     family = AF_INET;
478     sin = (struct sockaddr *)&sin4;
479     rv = inet_pton(family, remote, &sin4.sin_addr);
480     if(rv != 1) {
481       family = AF_INET6;
482       sin = (struct sockaddr *)&sin6;
483       rv = inet_pton(family, remote, &sin6.sin6_addr);
484       if(rv != 1) {
485         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
486         sin = NULL;
487       }
488     }
489
490     /* stratcon_iep_line_processor takes an allocated operand and frees it */
491     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
492     good++;
493   }
494   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
495         good, row_count, sn->fqdn);
496  bad_row:
497   free_params((ds_single_detail *)d);
498   free(d);
499   if(cq) release_conn_q(cq);
500   return (void *)(vpsized_int)good;
501 }
502 static int
503 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
504                                  struct timeval *now) {
505   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
506   pthread_t *jobs = NULL;
507   int nodes, i = 0, tcnt = 0;
508   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
509   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
510
511   pthread_mutex_lock(&storagenode_to_info_cache_lock);
512   nodes = storagenode_to_info_cache.size;
513   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
514   sns = calloc(MAX(1,nodes), sizeof(*sns));
515   if(nodes == 0) sns[nodes++] = &self;
516   else {
517     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
518     const char *k;
519     void *v;
520     int klen;
521     while(noit_hash_next(&storagenode_to_info_cache,
522                          &iter, &k, &klen, &v)) {
523       sns[i++] = (storagenode_info *)v;
524     }
525   }
526   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
527
528   for(i=0; i<nodes; i++) {
529     if(pthread_create(&jobs[i], NULL,
530                       stratcon_ingest_check_loadall, sns[i]) != 0) {
531       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
532     }
533   }
534   for(i=0; i<nodes; i++) {
535     void *good;
536     pthread_join(jobs[i], &good);
537     tcnt += (int)(vpsized_int)good;
538   }
539   free(jobs);
540   free(sns);
541   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
542   return 0;
543 }
544 static void
545 stratcon_ingest_iep_check_preload() {
546   eventer_t e;
547   conn_pool *cpool;
548
549   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
550   e = eventer_alloc();
551   e->mask = EVENTER_ASYNCH;
552   e->callback = stratcon_ingest_asynch_drive_iep;
553   e->closure = NULL;
554   eventer_add_asynch(cpool->jobq, e);
555 }
556 execute_outcome_t
557 stratcon_ingest_find(ds_rt_detail *d) {
558   conn_q *cq;
559   char *val;
560   int row_count;
561   struct realtime_tracker *node;
562
563   for(node = d->rt; node; node = node->next) {
564     char uuid_str[UUID_STR_LEN+1];
565     const char *fqdn, *dsn, *remote_cn;
566     char remote_ip[32];
567     int storagenode_id;
568
569     uuid_unparse_lower(node->checkid, uuid_str);
570     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
571                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
572       continue;
573
574     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
575           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
576
577     /* We might be able to find the IP from our config if someone has
578      * specified the expected cn in the noit definition.
579      */
580     if(stratcon_find_noit_ip_by_cn(remote_cn,
581                                    remote_ip, sizeof(remote_ip)) == 0) {
582       node->noit = strdup(remote_ip);
583       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
584       continue;
585     }
586
587     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
588     stratcon_database_connect(cq);
589
590     GET_QUERY(check_find);
591     DECLARE_PARAM_INT(node->sid);
592     PG_EXEC(check_find);
593     row_count = PQntuples(d->res);
594     if(row_count != 1) {
595       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
596       PQclear(d->res);
597       goto bad_row;
598     }
599
600     /* Get the remote_address (which noit owns this) */
601     PG_GET_STR_COL(val, 0, "remote_address");
602     if(!val) {
603       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
604       PQclear(d->res);
605       goto bad_row;
606     }
607     node->noit = strdup(val);
608     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
609    bad_row:
610     free_params((ds_single_detail *)d);
611     d->nparams = 0;
612     release_conn_q(cq);
613   }
614   return DS_EXEC_SUCCESS;
615 }
616 execute_outcome_t
617 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
618                         ds_line_detail *d) {
619   int type, len, sid;
620   char *final_buff;
621   uLong final_len, actual_final_len;
622   char *token;
623   char raddr_blank[1] = "";
624   const char *raddr;
625
626   type = d->data[0];
627   raddr = r ? r : raddr_blank;
628
629   /* Parse the log line, but only if we haven't already */
630   if(!d->nparams) {
631     char *scp, *ecp;
632
633     scp = d->data;
634 #define PROCESS_NEXT_FIELD(t,l) do { \
635   if(!*scp) goto bad_row; \
636   ecp = strchr(scp, '\t'); \
637   if(!ecp) goto bad_row; \
638   token = scp; \
639   len = (ecp-scp); \
640   scp = ecp + 1; \
641 } while(0)
642 #define PROCESS_LAST_FIELD(t,l) do { \
643   if(!*scp) ecp = scp; \
644   else { \
645     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
646     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
647   } \
648   t = scp; \
649   l = (ecp-scp); \
650 } while(0)
651
652     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
653     switch(type) {
654       /* See noit_check_log.c for log description */
655       case 'n':
656         DECLARE_PARAM_STR(raddr, strlen(raddr));
657         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
658         DECLARE_PARAM_STR("noitd",5); /* node_type */
659         PROCESS_NEXT_FIELD(token,len);
660         d->whence = (time_t)strtoul(token, NULL, 10);
661         DECLARE_PARAM_STR(token,len); /* timestamp */
662
663         /* This is the expected uncompressed len */
664         PROCESS_NEXT_FIELD(token,len);
665         final_len = atoi(token);
666         final_buff = malloc(final_len);
667         if(!final_buff) goto bad_row;
668  
669         /* The last token is b64 endoded and compressed.
670          * we need to decode it, declare it and then free it.
671          */
672         PROCESS_LAST_FIELD(token, len);
673         /* We can in-place decode this */
674         len = noit_b64_decode((char *)token, len,
675                               (unsigned char *)token, len);
676         if(len <= 0) {
677           noitL(noit_error, "noitd config base64 decoding error.\n");
678           free(final_buff);
679           goto bad_row;
680         }
681         actual_final_len = final_len;
682         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
683                               (unsigned char *)token, len)) {
684           noitL(noit_error, "noitd config decompression failure.\n");
685           free(final_buff);
686           goto bad_row;
687         }
688         if(final_len != actual_final_len) {
689           noitL(noit_error, "noitd config decompression error.\n");
690           free(final_buff);
691           goto bad_row;
692         }
693         DECLARE_PARAM_STR(final_buff, final_len);
694         free(final_buff);
695         break;
696       case 'C':
697         DECLARE_PARAM_STR(raddr, strlen(raddr));
698         PROCESS_NEXT_FIELD(token,len);
699         DECLARE_PARAM_STR(token,len); /* timestamp */
700         d->whence = (time_t)strtoul(token, NULL, 10);
701         PROCESS_NEXT_FIELD(token, len);
702         /* uuid is last 36 bytes */
703         if(len > 36) { token += (len-36); len = 36; }
704         sid = uuid_to_sid(token, remote_cn);
705         if(sid == 0) goto bad_row;
706         DECLARE_PARAM_INT(sid); /* sid */
707         DECLARE_PARAM_STR(token,len); /* uuid */
708         PROCESS_NEXT_FIELD(token, len);
709         DECLARE_PARAM_STR(token,len); /* target */
710         PROCESS_NEXT_FIELD(token, len);
711         DECLARE_PARAM_STR(token,len); /* module */
712         PROCESS_LAST_FIELD(token, len);
713         DECLARE_PARAM_STR(token,len); /* name */
714         break;
715       case 'M':
716         PROCESS_NEXT_FIELD(token,len);
717         DECLARE_PARAM_STR(token,len); /* timestamp */
718         d->whence = (time_t)strtoul(token, NULL, 10);
719         PROCESS_NEXT_FIELD(token, len);
720         /* uuid is last 36 bytes */
721         if(len > 36) { token += (len-36); len = 36; }
722         sid = uuid_to_sid(token, remote_cn);
723         if(sid == 0) goto bad_row;
724         DECLARE_PARAM_INT(sid); /* sid */
725         PROCESS_NEXT_FIELD(token, len);
726         DECLARE_PARAM_STR(token,len); /* name */
727         PROCESS_NEXT_FIELD(token,len);
728         d->metric_type = *token;
729         PROCESS_LAST_FIELD(token,len);
730         DECLARE_PARAM_STR(token,len); /* value */
731         break;
732       case 'S':
733         PROCESS_NEXT_FIELD(token,len);
734         DECLARE_PARAM_STR(token,len); /* timestamp */
735         d->whence = (time_t)strtoul(token, NULL, 10);
736         PROCESS_NEXT_FIELD(token, len);
737         /* uuid is last 36 bytes */
738         if(len > 36) { token += (len-36); len = 36; }
739         sid = uuid_to_sid(token, remote_cn);
740         if(sid == 0) goto bad_row;
741         DECLARE_PARAM_INT(sid); /* sid */
742         PROCESS_NEXT_FIELD(token, len);
743         DECLARE_PARAM_STR(token,len); /* state */
744         PROCESS_NEXT_FIELD(token, len);
745         DECLARE_PARAM_STR(token,len); /* availability */
746         PROCESS_NEXT_FIELD(token, len);
747         DECLARE_PARAM_STR(token,len); /* duration */
748         PROCESS_LAST_FIELD(token,len);
749         DECLARE_PARAM_STR(token,len); /* status */
750         break;
751       default:
752         goto bad_row;
753     }
754
755   }
756
757   /* Now execute the query */
758   switch(type) {
759     case 'n':
760       GET_QUERY(config_insert);
761       PG_EXEC(config_insert);
762       PQclear(d->res);
763       break;
764     case 'C':
765       GET_QUERY(check_insert);
766       PG_TM_EXEC(check_insert, d->whence);
767       PQclear(d->res);
768       break;
769     case 'S':
770       GET_QUERY(status_insert);
771       PG_TM_EXEC(status_insert, d->whence);
772       PQclear(d->res);
773       break;
774     case 'M':
775       switch(d->metric_type) {
776         case METRIC_INT32:
777         case METRIC_UINT32:
778         case METRIC_INT64:
779         case METRIC_UINT64:
780         case METRIC_DOUBLE:
781           GET_QUERY(metric_insert_numeric);
782           PG_TM_EXEC(metric_insert_numeric, d->whence);
783           PQclear(d->res);
784           break;
785         case METRIC_STRING:
786           GET_QUERY(metric_insert_text);
787           PG_TM_EXEC(metric_insert_text, d->whence);
788           PQclear(d->res);
789           break;
790         default:
791           goto bad_row;
792       }
793       break;
794     default:
795       /* should never get here */
796       goto bad_row;
797   }
798   return DS_EXEC_SUCCESS;
799  bad_row:
800   return DS_EXEC_ROW_FAILED;
801 }
802 static int
803 stratcon_database_post_connect(conn_q *cq) {
804   int rv = 0;
805   ds_single_detail _d = { 0 }, *d = &_d;
806   if(cq->fqdn) {
807     char *remote_str, *remote_cn;
808     /* This is the silly way we get null's in through our declare_param_str */
809     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
810     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
811     /* This is a storage node, it gets the storage node post_connect */
812     GET_QUERY(storage_post_connect);
813     rv = -1; /* now we're serious */
814     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
815     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
816     PG_EXEC(storage_post_connect);
817     PQclear(d->res);
818     rv = 0;
819   }
820   else {
821     /* Metanode post_connect */
822     GET_QUERY(metanode_post_connect);
823     rv = -1; /* now we're serious */
824     PG_EXEC(metanode_post_connect);
825     PQclear(d->res);
826     rv = 0;
827   }
828  bad_row:
829   free_params(d);
830   if(rv == -1) {
831     /* Post-connect intentions are serious and fatal */
832     PQfinish(cq->dbh);
833     cq->dbh = NULL;
834   }
835   return rv;
836 }
837 static int
838 stratcon_database_connect(conn_q *cq) {
839   char *dsn, dsn_meta[512];
840   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
841   const char *k, *v;
842   int klen;
843   noit_hash_table *t;
844
845   dsn_meta[0] = '\0';
846   if(!cq->dsn) {
847     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
848     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
849       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
850       strlcat(dsn_meta, k, sizeof(dsn_meta));
851       strlcat(dsn_meta, "=", sizeof(dsn_meta));
852       strlcat(dsn_meta, v, sizeof(dsn_meta));
853     }
854     noit_hash_destroy(t, free, free);
855     free(t);
856     dsn = dsn_meta;
857   }
858   else {
859     char options[32];
860     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
861     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
862                                options, sizeof(options))) {
863       strlcat(dsn_meta, " ", sizeof(dsn_meta));
864       strlcat(dsn_meta, "user", sizeof(dsn_meta));
865       strlcat(dsn_meta, "=", sizeof(dsn_meta));
866       strlcat(dsn_meta, options, sizeof(dsn_meta));
867     }
868     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
869                                options, sizeof(options))) {
870       strlcat(dsn_meta, " ", sizeof(dsn_meta));
871       strlcat(dsn_meta, "password", sizeof(dsn_meta));
872       strlcat(dsn_meta, "=", sizeof(dsn_meta));
873       strlcat(dsn_meta, options, sizeof(dsn_meta));
874     }
875     dsn = dsn_meta;
876   }
877
878   if(cq->dbh) {
879     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
880     PQreset(cq->dbh);
881     if(PQstatus(cq->dbh) != CONNECTION_OK) {
882       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
883             dsn, PQerrorMessage(cq->dbh));
884       return -1;
885     }
886     if(stratcon_database_post_connect(cq)) return -1;
887     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
888     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
889           dsn, PQerrorMessage(cq->dbh));
890     return -1;
891   }
892
893   cq->dbh = PQconnectdb(dsn);
894   if(!cq->dbh) return -1;
895   if(PQstatus(cq->dbh) != CONNECTION_OK) {
896     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
897           dsn, PQerrorMessage(cq->dbh));
898     return -1;
899   }
900   if(stratcon_database_post_connect(cq)) return -1;
901   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
902   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
903         dsn, PQerrorMessage(cq->dbh));
904   return -1;
905 }
906 static int
907 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
908                              const char *name) {
909   int rv = -1;
910   PGresult *res;
911   char cmd[128];
912   strlcpy(cmd, p, sizeof(cmd));
913   strlcat(cmd, name, sizeof(cmd));
914   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
915   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
916   PQclear(res);
917   return rv;
918 }
919 static int
920 stratcon_ingest_do(conn_q *cq, const char *cmd) {
921   PGresult *res;
922   int rv = -1;
923   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
924   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
925   PQclear(res);
926   return rv;
927 }
928 #define BUSTED(cq) do { \
929   PQfinish((cq)->dbh); \
930   (cq)->dbh = NULL; \
931   goto full_monty; \
932 } while(0)
933 #define SAVEPOINT(name) do { \
934   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
935   last_sp = current; \
936 } while(0)
937 #define ROLLBACK_TO_SAVEPOINT(name) do { \
938   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
939     BUSTED(cq); \
940   last_sp = NULL; \
941 } while(0)
942 #define RELEASE_SAVEPOINT(name) do { \
943   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
944     BUSTED(cq); \
945   last_sp = NULL; \
946 } while(0)
947
948 int
949 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
950                               struct timeval *now) {
951   ds_rt_detail *dsjd = closure;
952   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
953   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
954
955   assert(dsjd->rt);
956   stratcon_ingest_find(dsjd);
957   if(dsjd->completion_event)
958     eventer_add(dsjd->completion_event);
959
960   free_params((ds_single_detail *)dsjd);
961   free(dsjd);
962   return 0;
963 }
964 static void
965 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
966                                 eventer_t completion) {
967   eventer_t e;
968   conn_pool *cpool;
969   ds_rt_detail *rtdetail;
970
971   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
972   rtdetail = calloc(1, sizeof(*rtdetail));
973   rtdetail->rt = rt;
974   rtdetail->completion_event = completion;
975   e = eventer_alloc();
976   e->mask = EVENTER_ASYNCH;
977   e->callback = stratcon_ingest_asynch_lookup;
978   e->closure = rtdetail;
979   eventer_add_asynch(cpool->jobq, e);
980 }
981 static const char *
982 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
983   void *vinfo;
984   char *dsn = NULL, *fqdn = NULL;
985   int found = 0;
986   storagenode_info *info = NULL;
987   pthread_mutex_lock(&storagenode_to_info_cache_lock);
988   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
989                         &vinfo)) {
990     found = 1;
991     info = vinfo;
992   }
993   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
994   if(found) {
995     if(fqdn_out) *fqdn_out = info->fqdn;
996     return info->dsn;
997   }
998
999   if(!found && can_use_db) {
1000     ds_single_detail *d;
1001     conn_q *cq;
1002     int row_count;
1003     /* Look it up and store it */
1004     d = calloc(1, sizeof(*d));
1005     cq = get_conn_q_for_metanode();
1006     GET_QUERY(find_storage);
1007     DECLARE_PARAM_INT(id);
1008     PG_EXEC(find_storage);
1009     row_count = PQntuples(d->res);
1010     if(row_count) {
1011       PG_GET_STR_COL(dsn, 0, "dsn");
1012       PG_GET_STR_COL(fqdn, 0, "fqdn");
1013       fqdn = fqdn ? strdup(fqdn) : NULL;
1014       dsn = dsn ? strdup(dsn) : NULL;
1015     }
1016     PQclear(d->res);
1017    bad_row:
1018     free_params(d);
1019     free(d);
1020     release_conn_q(cq);
1021   }
1022   if(fqdn) {
1023     info = calloc(1, sizeof(*info));
1024     info->fqdn = fqdn;
1025     if(fqdn_out) *fqdn_out = info->fqdn;
1026     info->dsn = dsn;
1027     info->storagenode_id = id;
1028     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1029     noit_hash_store(&storagenode_to_info_cache,
1030                     (void *)&info->storagenode_id, sizeof(int), info);
1031     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1032   }
1033   return info ? info->dsn : NULL;
1034 }
1035 static ds_line_detail *
1036 build_insert_batch(pg_interim_journal_t *ij) {
1037   int rv;
1038   off_t len;
1039   const char *buff, *cp, *lcp;
1040   struct stat st;
1041   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1042
1043   if(ij->fd < 0) {
1044     ij->fd = open(ij->filename, O_RDONLY);
1045     if(ij->fd < 0) {
1046       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1047             ij->filename, strerror(errno));
1048       assert(ij->fd >= 0);
1049     }
1050   }
1051   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1052   if(rv == -1) {
1053       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1054             ij->filename, strerror(errno));
1055     assert(rv != -1);
1056   }
1057   len = st.st_size;
1058   if(len > 0) {
1059     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1060     if(buff == (void *)-1) {
1061       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1062             ij->filename, strerror(errno));
1063       assert(buff != (void *)-1);
1064     }
1065     lcp = buff;
1066     while(lcp < (buff + len) &&
1067           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1068       next = calloc(1, sizeof(*next));
1069       next->data = malloc(cp - lcp + 1);
1070       memcpy(next->data, lcp, cp - lcp);
1071       next->data[cp - lcp] = '\0';
1072       if(!head) head = next;
1073       if(last) last->next = next;
1074       last = next;
1075       lcp = cp + 1;
1076     }
1077     munmap((void *)buff, len);
1078   }
1079   close(ij->fd);
1080   return head;
1081 }
1082 static void
1083 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1084   unlink(ij->filename);
1085   if(ij->filename) free(ij->filename);
1086   if(ij->remote_str) free(ij->remote_str);
1087   if(ij->remote_cn) free(ij->remote_cn);
1088   if(ij->fqdn) free(ij->fqdn);
1089   free(ij);
1090 }
1091 static int
1092 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1093                                struct timeval *now) {
1094   int i, total, success, sp_total, sp_success;
1095   pg_interim_journal_t *ij;
1096   ds_line_detail *head = NULL, *current, *last_sp;
1097   const char *dsn;
1098   conn_q *cq;
1099   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1100   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1101
1102   ij = closure;
1103   if(ij->fqdn == NULL) {
1104     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1105     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1106   }
1107   else {
1108     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1109   }
1110   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1111                              ij->fqdn, dsn);
1112   noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1113         ij->remote_str, ij->remote_cn, ij->fqdn);
1114  full_monty:
1115   /* Make sure we have a connection */
1116   i = 1;
1117   while(stratcon_database_connect(cq)) {
1118     noitL(noit_error, "Error connecting to database: %s\n",
1119           ij->fqdn ? ij->fqdn : "(null)");
1120     sleep(i);
1121     i *= 2;
1122     i = MIN(i, 16);
1123   }
1124
1125   if(head == NULL) head = build_insert_batch(ij);
1126   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1127         ij->remote_str ? ij->remote_str : "(null)",
1128         ij->remote_cn ? ij->remote_cn : "(null)",
1129         ij->fqdn ? ij->fqdn : "(null)");
1130   current = head;
1131   last_sp = NULL;
1132   total = success = sp_total = sp_success = 0;
1133   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1134   while(current) {
1135     execute_outcome_t rv;
1136     if(current->data) {
1137       if(!last_sp) {
1138         SAVEPOINT("batch");
1139         sp_success = success;
1140         sp_total = total;
1141       }
1142  
1143       if(current->problematic) {
1144         RELEASE_SAVEPOINT("batch");
1145         current = current->next;
1146         total++;
1147         continue;
1148       }
1149       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1150                                    current);
1151       switch(rv) {
1152         case DS_EXEC_SUCCESS:
1153           total++;
1154           success++;
1155           current = current->next;
1156           break;
1157         case DS_EXEC_ROW_FAILED:
1158           /* rollback to savepoint, mark this record as bad and start again */
1159           if(current->data[0] != 'n')
1160             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1161           current->problematic = 1;
1162           current = last_sp;
1163           success = sp_success;
1164           total = sp_total;
1165           ROLLBACK_TO_SAVEPOINT("batch");
1166           break;
1167         case DS_EXEC_TXN_FAILED:
1168           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1169           BUSTED(cq);
1170       }
1171     }
1172   }
1173   if(last_sp) RELEASE_SAVEPOINT("batch");
1174   if(stratcon_ingest_do(cq, "COMMIT")) {
1175     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1176     BUSTED(cq);
1177   }
1178   /* Cleanup the mess */
1179   while(head) {
1180     ds_line_detail *tofree;
1181     tofree = head;
1182     head = head->next;
1183     if(tofree->data) free(tofree->data);
1184     free_params((ds_single_detail *)tofree);
1185     free(tofree);
1186   }
1187   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1188         ij->remote_str ? ij->remote_str : "(null)",
1189         ij->remote_cn ? ij->remote_cn : "(null)",
1190         ij->fqdn ? ij->fqdn : "(null)", success, total);
1191   pg_interim_journal_remove(ij);
1192   release_conn_q(cq);
1193   return 0;
1194 }
1195 static int
1196 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1197                           int *sid_out, int *storagenode_id_out,
1198                           const char **remote_cn_out,
1199                           const char **fqdn_out, const char **dsn_out) {
1200   /* only called from the main thread -- no safety issues */
1201   void *vuuidinfo, *vinfo;
1202   uuid_info *uuidinfo;
1203   storagenode_info *info = NULL;
1204   char *fqdn = NULL;
1205   char *dsn = NULL;
1206   char *new_remote_cn = NULL;
1207   int storagenode_id = 0, sid = 0;
1208   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1209                          &vuuidinfo)) {
1210     int row_count = 0;
1211     char *tmpint;
1212     ds_single_detail *d;
1213     conn_q *cq;
1214
1215     /* We can't do a database lookup without the remote_cn */
1216     if(!remote_cn) {
1217       if(stratcon_datastore_get_enabled()) {
1218         /* We have an authoritatively maintained cache, we don't do lookups */
1219         return -1;
1220       }
1221       else
1222         remote_cn = "[[null]]";
1223     }
1224
1225     d = calloc(1, sizeof(*d));
1226     cq = get_conn_q_for_metanode();
1227     if(stratcon_database_connect(cq) == 0) {
1228       /* Blocking call to service the cache miss */
1229       GET_QUERY(check_map);
1230       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1231       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1232       PG_EXEC(check_map);
1233       row_count = PQntuples(d->res);
1234       if(row_count != 1) {
1235         PQclear(d->res);
1236         goto bad_row;
1237       }
1238       PG_GET_STR_COL(tmpint, 0, "sid");
1239       if(!tmpint) {
1240         row_count = 0;
1241         PQclear(d->res);
1242         goto bad_row;
1243       }
1244       sid = atoi(tmpint);
1245       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1246       if(tmpint) storagenode_id = atoi(tmpint);
1247       PG_GET_STR_COL(fqdn, 0, "fqdn");
1248       PG_GET_STR_COL(dsn, 0, "dsn");
1249       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1250       fqdn = fqdn ? strdup(fqdn) : NULL;
1251       dsn = dsn ? strdup(dsn) : NULL;
1252       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1253       PQclear(d->res);
1254     }
1255    bad_row:
1256     free_params((ds_single_detail *)d);
1257     free(d);
1258     release_conn_q(cq);
1259     if(row_count != 1) {
1260       return -1;
1261     }
1262     /* Place in cache */
1263     uuidinfo = calloc(1, sizeof(*uuidinfo));
1264     uuidinfo->sid = sid;
1265     uuidinfo->uuid_str = strdup(uuid_str);
1266     uuidinfo->storagenode_id = storagenode_id;
1267     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1268     noit_hash_store(&uuid_to_info_cache,
1269                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1270     /* Also, we may have just witnessed a new storage node, store it */
1271     if(storagenode_id) {
1272       int needs_free = 0;
1273       info = calloc(1, sizeof(*info));
1274       info->storagenode_id = storagenode_id;
1275       info->dsn = dsn ? strdup(dsn) : NULL;
1276       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1277       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1278       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1279                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1280         /* hack to save memory -- we *never* remove from these caches,
1281            so we can use the same fqdn value in the above cache for the key
1282            in the cache below -- (no strdup) */
1283         noit_hash_store(&storagenode_to_info_cache,
1284                         (void *)&info->storagenode_id, sizeof(int), info);
1285       }
1286       else needs_free = 1;
1287       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1288       if(needs_free) {
1289         if(info->dsn) free(info->dsn);
1290         if(info->fqdn) free(info->fqdn);
1291         free(info);
1292       }
1293     }
1294   }
1295   else
1296     uuidinfo = vuuidinfo;
1297
1298   if(uuidinfo && uuidinfo->storagenode_id) {
1299     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1300       /* we don't have dsn and we actually want it */
1301       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1302       if(noit_hash_retrieve(&storagenode_to_info_cache,
1303                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1304                             &vinfo))
1305         info = vinfo;
1306       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1307     }
1308   }
1309
1310   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1311   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1312   assert(uuidinfo);
1313   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1314   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1315   if(sid_out) *sid_out = uuidinfo->sid;
1316   if(fqdn) free(fqdn);
1317   if(dsn) free(dsn);
1318   if(new_remote_cn) free(new_remote_cn);
1319   return 0;
1320 }
1321 static int
1322 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1323   char uuid_str[UUID_STR_LEN+1];
1324   int sid = 0;
1325   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1326   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1327   return sid;
1328 }
1329
1330 static int
1331 stratcon_ingest_saveconfig() {
1332   int rv = -1;
1333   char *buff;
1334   ds_single_detail _d = { 0 }, *d = &_d;
1335   conn_q *cq;
1336   char ipv4_str[32];
1337   struct in_addr r, l;
1338
1339   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1340   memset(&l, 0, sizeof(l));
1341   noit_getip_ipv4(r, &l);
1342   /* Ignore the error.. what are we going to do anyway */
1343   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1344     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1345
1346   cq = get_conn_q_for_metanode();
1347
1348   if(stratcon_database_connect(cq) == 0) {
1349     char time_as_str[20];
1350     size_t len;
1351     buff = noit_conf_xml_in_mem(&len);
1352     if(!buff) goto bad_row;
1353
1354     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1355     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1356     DECLARE_PARAM_STR("", 0);
1357     DECLARE_PARAM_STR("stratcond", 9);
1358     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1359     DECLARE_PARAM_STR(buff, len);
1360     free(buff);
1361
1362     GET_QUERY(config_insert);
1363     PG_EXEC(config_insert);
1364     PQclear(d->res);
1365     rv = 0;
1366
1367     bad_row:
1368       free_params(d);
1369   }
1370   release_conn_q(cq);
1371   return rv;
1372 }
1373
1374 static void
1375 stratcon_ingest_launch_file_ingestion(const char *path,
1376                                       const char *remote_str,
1377                                       const char *remote_cn,
1378                                       const char *id_str) {
1379   pg_interim_journal_t *ij;
1380   eventer_t ingest;
1381
1382   ij = calloc(1, sizeof(*ij));
1383   ij->fd = open(path, O_RDONLY);
1384   if(ij->fd < 0) {
1385     noitL(noit_error, "cannot open journal '%s': %s\n",
1386           path, strerror(errno));
1387     free(ij);
1388     return;
1389   }
1390   close(ij->fd);
1391   ij->fd = -1;
1392   ij->filename = strdup(path);
1393   ij->remote_str = strdup(remote_str);
1394   ij->remote_cn = strdup(remote_cn);
1395   ij->storagenode_id = atoi(id_str);
1396   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1397                                        ij->fqdn);
1398   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1399   ingest = eventer_alloc();
1400   ingest->mask = EVENTER_ASYNCH;
1401   ingest->callback = stratcon_ingest_asynch_execute;
1402   ingest->closure = ij;
1403   eventer_add_asynch(ij->cpool->jobq, ingest);
1404 }
1405 static void
1406 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third) {
1407   char path[PATH_MAX];
1408   DIR *root;
1409   struct dirent *de, *entry;
1410   int i = 0, cnt = 0;
1411   char **entries;
1412   int size = 0;
1413
1414   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1415            first ? "/" : "", first ? first : "",
1416            second ? "/" : "", second ? second : "",
1417            third ? "/" : "", third ? third : "");
1418 #ifdef _PC_NAME_MAX
1419   size = pathconf(path, _PC_NAME_MAX);
1420 #endif
1421   size = MIN(size, PATH_MAX + 128);
1422   de = alloca(size);
1423   root = opendir(path);
1424   if(!root) return;
1425   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1426   closedir(root);
1427   root = opendir(path);
1428   if(!root) return;
1429   entries = malloc(sizeof(*entries) * cnt);
1430   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1431     if(i < cnt) {
1432       entries[i++] = strdup(entry->d_name);
1433     }
1434   }
1435   closedir(root);
1436   cnt = i; /* could have changed, directories are fickle */
1437   qsort(entries, i, sizeof(*entries),
1438         (int (*)(const void *, const void *))strcasecmp);
1439   for(i=0; i<cnt; i++) {
1440     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1441     noitL(ds_deb, "Processing L%d entry '%s'\n",
1442           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1443     if(!first)
1444       stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL);
1445     else if(!second)
1446       stratcon_ingest_sweep_journals_int(first, entries[i], NULL);
1447     else if(!third)
1448       stratcon_ingest_sweep_journals_int(first, second, entries[i]);
1449     else if(strlen(entries[i]) == 16) {
1450       char fullpath[PATH_MAX];
1451       snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath,
1452                first,second,third,entries[i]);
1453       stratcon_ingest_launch_file_ingestion(fullpath,first,second,third);
1454     }
1455   }
1456   for(i=0; i<cnt; i++)
1457     free(entries[i]);
1458   free(entries);
1459 }
1460 static void
1461 stratcon_ingest_sweep_journals() {
1462   stratcon_ingest_sweep_journals_int(NULL,NULL,NULL);
1463 }
1464
1465 int
1466 stratcon_ingest_all_storagenode_info() {
1467   int i, cnt = 0;
1468   ds_single_detail _d = { 0 }, *d = &_d;
1469   conn_q *cq;
1470   cq = get_conn_q_for_metanode();
1471
1472   while(stratcon_database_connect(cq)) {
1473     noitL(noit_error, "Error connecting to database\n");
1474     sleep(1);
1475   }
1476
1477   GET_QUERY(all_storage);
1478   PG_EXEC(all_storage);
1479   cnt = PQntuples(d->res);
1480   for(i=0; i<cnt; i++) {
1481     void *vinfo;
1482     char *tmpint, *fqdn, *dsn;
1483     int storagenode_id;
1484     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1485     storagenode_id = atoi(tmpint);
1486     PG_GET_STR_COL(fqdn, i, "fqdn");
1487     PG_GET_STR_COL(dsn, i, "dsn");
1488     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1489     storagenode_id = tmpint ? atoi(tmpint) : 0;
1490
1491     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1492                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1493       storagenode_info *info;
1494       info = calloc(1, sizeof(*info));
1495       info->storagenode_id = storagenode_id;
1496       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1497       info->dsn = dsn ? strdup(dsn) : NULL;
1498       noit_hash_store(&storagenode_to_info_cache,
1499                       (void *)&info->storagenode_id, sizeof(int), info);
1500       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1501             info->storagenode_id,
1502             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1503     }
1504   }
1505   PQclear(d->res);
1506  bad_row:
1507   free_params(d);
1508
1509   release_conn_q(cq);
1510   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1511   return cnt;
1512 }
1513 int
1514 stratcon_ingest_all_check_info() {
1515   int i, cnt, loaded = 0;
1516   ds_single_detail _d = { 0 }, *d = &_d;
1517   conn_q *cq;
1518   cq = get_conn_q_for_metanode();
1519
1520   while(stratcon_database_connect(cq)) {
1521     noitL(noit_error, "Error connecting to database\n");
1522     sleep(1);
1523   }
1524
1525   GET_QUERY(check_mapall);
1526   PG_EXEC(check_mapall);
1527   cnt = PQntuples(d->res);
1528   for(i=0; i<cnt; i++) {
1529     void *vinfo;
1530     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1531     int sid, storagenode_id;
1532     uuid_info *uuidinfo;
1533     PG_GET_STR_COL(uuid_str, i, "id");
1534     if(!uuid_str) continue;
1535     PG_GET_STR_COL(tmpint, i, "sid");
1536     if(!tmpint) continue;
1537     sid = atoi(tmpint);
1538     PG_GET_STR_COL(fqdn, i, "fqdn");
1539     PG_GET_STR_COL(dsn, i, "dsn");
1540     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1541     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1542     storagenode_id = tmpint ? atoi(tmpint) : 0;
1543
1544     uuidinfo = calloc(1, sizeof(*uuidinfo));
1545     uuidinfo->uuid_str = strdup(uuid_str);
1546     uuidinfo->remote_cn = strdup(remote_cn);
1547     uuidinfo->storagenode_id = storagenode_id;
1548     uuidinfo->sid = sid;
1549     noit_hash_store(&uuid_to_info_cache,
1550                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1551     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1552           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1553     loaded++;
1554     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1555                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1556       storagenode_info *info;
1557       info = calloc(1, sizeof(*info));
1558       info->storagenode_id = storagenode_id;
1559       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1560       info->dsn = dsn ? strdup(dsn) : NULL;
1561       noit_hash_store(&storagenode_to_info_cache,
1562                       (void *)&info->storagenode_id, sizeof(int), info);
1563       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1564             info->storagenode_id,
1565             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1566     }
1567   }
1568   PQclear(d->res);
1569  bad_row:
1570   free_params(d);
1571
1572   release_conn_q(cq);
1573   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1574   return loaded;
1575 }
1576
1577 static int
1578 rest_get_noit_config(noit_http_rest_closure_t *restc,
1579                      int npats, char **pats) {
1580   noit_http_session_ctx *ctx = restc->http_ctx;
1581   ds_single_detail *d;
1582   int row_count = 0;
1583   const char *xml = NULL;
1584   conn_q *cq = NULL;
1585
1586   if(npats != 0) {
1587     noit_http_response_server_error(ctx, "text/xml");
1588     noit_http_response_end(ctx);
1589     return 0;
1590   }
1591   d = calloc(1, sizeof(*d));
1592   GET_QUERY(config_get);
1593   cq = get_conn_q_for_metanode();
1594   if(!cq) {
1595     noit_http_response_server_error(ctx, "text/xml");
1596     goto bad_row;
1597   }
1598
1599   DECLARE_PARAM_STR(restc->remote_cn,
1600                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1601   PG_EXEC(config_get);
1602   row_count = PQntuples(d->res);
1603   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1604
1605   if(xml == NULL) {
1606     char buff[1024];
1607     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1608                                  "<row_count>%d</row_count></error>\n",
1609              restc->remote_cn, row_count);
1610     noit_http_response_append(ctx, buff, strlen(buff));
1611     noit_http_response_not_found(ctx, "text/xml");
1612   }
1613   else {
1614     noit_http_response_append(ctx, xml, strlen(xml));
1615     noit_http_response_ok(ctx, "text/xml");
1616   }
1617  bad_row:
1618   free_params((ds_single_detail *)d);
1619   d->nparams = 0;
1620   if(cq) release_conn_q(cq);
1621
1622   noit_http_response_end(ctx);
1623   return 0;
1624 }
1625
1626 static ingestor_api_t postgres_ingestor_api = {
1627   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1628   .iep_check_preload = stratcon_ingest_iep_check_preload,
1629   .storage_node_lookup = storage_node_quick_lookup,
1630   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1631   .get_noit_config = NULL,
1632   .save_config = stratcon_ingest_saveconfig
1633 };
1634
1635 static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
1636   return 0;
1637 }
1638 static int postgres_ingestor_onload(noit_image_t *self) {
1639   return 0;
1640 }
1641 static int postgres_ingestor_init(noit_module_generic_t *self) {
1642   pthread_mutex_init(&ds_conns_lock, NULL);
1643   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1644   ds_err = noit_log_stream_find("error/datastore");
1645   ds_deb = noit_log_stream_find("debug/datastore");
1646   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1647   ingest_err = noit_log_stream_find("error/ingest");
1648   if(!ds_err) ds_err = noit_error;
1649   if(!ingest_err) ingest_err = noit_error;
1650   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1651                            &basejpath)) {
1652     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1653     exit(-1);
1654   }
1655   stratcon_ingest_all_check_info();
1656   stratcon_ingest_all_storagenode_info();
1657   stratcon_ingest_sweep_journals();
1658   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1659 }
1660
1661 noit_module_generic_t postgres_ingestor = {
1662   {
1663     NOIT_GENERIC_MAGIC,
1664     NOIT_GENERIC_ABI_VERSION,
1665     "postgres_ingestor",
1666     "postgres drive for data ingestion",
1667     postgres_ingestor_xml_description,
1668     postgres_ingestor_onload,
1669   }, 
1670   postgres_ingestor_config,
1671   postgres_ingestor_init
1672 };
Note: See TracBrowser for help on using the browser.