root/src/modules/postgres_ingestor.c

Revision 4ed37cf09ac9817ced9312616da97a3a1e90c6b3, 51.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 months ago)

cleanup of modules, verbose structure setting

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "utils/noit_watchdog.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_realtime_http.h"
44 #include "stratcon_iep.h"
45 #include "noit_conf.h"
46 #include "noit_check.h"
47 #include "noit_check_log_helpers.h"
48 #include "noit_rest.h"
49 #include <unistd.h>
50 #include <fcntl.h>
51 #include <netinet/in.h>
52 #include <sys/un.h>
53 #include <dirent.h>
54 #include <arpa/inet.h>
55 #include <sys/mman.h>
56 #include <libpq-fe.h>
57 #include <zlib.h>
58 #include <assert.h>
59 #include <errno.h>
60 #include "postgres_ingestor.xmlh"
61 #include "bundle.pb-c.h"
62
63 #define DECL_STMT(codename,confname) \
64 static char *codename = NULL; \
65 static const char *codename##_conf = "/stratcon/database/statements/" #confname
66
67 DECL_STMT(storage_post_connect, storagepostconnect);
68 DECL_STMT(metanode_post_connect, metanodepostconnect);
69 DECL_STMT(find_storage, findstoragenode);
70 DECL_STMT(all_storage, allstoragenodes);
71 DECL_STMT(check_map, mapchecktostoragenode);
72 DECL_STMT(check_mapall, mapallchecks);
73 DECL_STMT(check_loadall, allchecks);
74 DECL_STMT(check_find, findcheck);
75 DECL_STMT(check_insert, check);
76 DECL_STMT(status_insert, status);
77 DECL_STMT(metric_insert_numeric, metric_numeric);
78 DECL_STMT(metric_insert_text, metric_text);
79 DECL_STMT(config_insert, config);
80 DECL_STMT(config_get, findconfig);
81
82 static noit_log_stream_t ds_err = NULL;
83 static noit_log_stream_t ds_deb = NULL;
84 static noit_log_stream_t ds_pool_deb = NULL;
85 static noit_log_stream_t ingest_err = NULL;
86
87 #define GET_QUERY(a) do { \
88   if(a == NULL) \
89     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
90       goto bad_row; \
91 } while(0)
92
93 struct conn_q;
94
95 typedef struct {
96   char            *queue_name; /* the key fqdn+remote_sn */
97   eventer_jobq_t  *jobq;
98   struct conn_q   *head;
99   pthread_mutex_t  lock;
100   pthread_cond_t   cv;
101   int              ttl;
102   int              in_pool;
103   int              outstanding;
104   int              max_allocated;
105   int              max_in_pool;
106 } conn_pool;
107 typedef struct conn_q {
108   time_t           last_use;
109   char            *dsn;        /* Pg connect string */
110   char            *remote_str; /* the IP of the noit*/
111   char            *remote_cn;  /* the Cert CN of the noit */
112   char            *fqdn;       /* the fqdn of the storage node */
113   conn_pool       *pool;
114   struct conn_q   *next;
115   /* Postgres specific stuff */
116   PGconn          *dbh;
117 } conn_q;
118
119
120 #define MAX_PARAMS 8
121 #define POSTGRES_PARTS \
122   PGresult *res; \
123   int rv; \
124   time_t whence; \
125   int nparams; \
126   int metric_type; \
127   char *paramValues[MAX_PARAMS]; \
128   int paramLengths[MAX_PARAMS]; \
129   int paramFormats[MAX_PARAMS]; \
130   int paramAllocd[MAX_PARAMS];
131
132 typedef struct ds_single_detail {
133   POSTGRES_PARTS
134 } ds_single_detail;
135 typedef struct {
136   /* Postgres specific stuff */
137   POSTGRES_PARTS
138   struct realtime_tracker *rt;
139   conn_q *cq; /* connection on which to perform this job */
140   eventer_t completion_event; /* This event should be registered if non NULL */
141 } ds_rt_detail;
142 typedef struct ds_line_detail {
143   /* Postgres specific stuff */
144   POSTGRES_PARTS
145   char *data;
146   int problematic;
147   struct ds_line_detail *next;
148 } ds_line_detail;
149
150 typedef struct {
151   char *remote_str;
152   char *remote_cn;
153   char *fqdn;
154   int storagenode_id;
155   int fd;
156   char *filename;
157   conn_pool *cpool;
158 } pg_interim_journal_t;
159
160 static int stratcon_database_connect(conn_q *cq);
161 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
162 static int storage_node_quick_lookup(const char *uuid_str,
163                                      const char *remote_cn,
164                                      int *sid_out, int *storagenode_id_out,
165                                      const char **remote_cn_out,
166                                      const char **fqdn_out,
167                                      const char **dsn_out);
168
169 static void
170 free_params(ds_single_detail *d) {
171   int i;
172   for(i=0; i<d->nparams; i++)
173     if(d->paramAllocd[i] && d->paramValues[i])
174       free(d->paramValues[i]);
175 }
176
177 static char *basejpath = NULL;
178 static pthread_mutex_t ds_conns_lock;
179 static noit_hash_table ds_conns;
180
181 /* the fqdn cache needs to be thread safe */
182 typedef struct {
183   char *uuid_str;
184   char *remote_cn;
185   int storagenode_id;
186   int sid;
187 } uuid_info;
188 typedef struct {
189   int storagenode_id;
190   char *fqdn;
191   char *dsn;
192 } storagenode_info;
193 noit_hash_table uuid_to_info_cache;
194 pthread_mutex_t storagenode_to_info_cache_lock;
195 noit_hash_table storagenode_to_info_cache;
196
197 /* Thread-safe connection pools */
198
199 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
200 static void
201 release_conn_q_forceable(conn_q *cq, int forcefree) {
202   int putback = 0;
203   cq->last_use = time(NULL);
204   pthread_mutex_lock(&cq->pool->lock);
205   cq->pool->outstanding--;
206   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
207     putback = 1;
208     cq->next = cq->pool->head;
209     cq->pool->head = cq;
210     cq->pool->in_pool++;
211   }
212   pthread_mutex_unlock(&cq->pool->lock);
213   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)(vpsized_int)pthread_self(),
214         putback ? "to pool" : "and destroy", cq->pool->queue_name);
215   pthread_cond_signal(&cq->pool->cv);
216   if(putback) return;
217
218   /* Not put back, release it */
219   if(cq->dbh) PQfinish(cq->dbh);
220   if(cq->remote_str) free(cq->remote_str);
221   if(cq->remote_cn) free(cq->remote_cn);
222   if(cq->fqdn) free(cq->fqdn);
223   if(cq->dsn) free(cq->dsn);
224   free(cq);
225 }
226 static void
227 ttl_purge_conn_pool(conn_pool *pool) {
228   int old_cnt, new_cnt;
229   time_t now = time(NULL);
230   conn_q *cq, *prev = NULL, *iter;
231   /* because we always replace on the head and update the last_use time when
232      doing so, we know they are ordered LRU on the end.  So, once we hit an
233      old one, we know all the others are old too.
234    */
235   if(!pool->head) return; /* hack short circuit for no locks */
236   pthread_mutex_lock(&pool->lock);
237   old_cnt = pool->in_pool;
238   cq = pool->head;
239   while(cq) {
240     if(cq->last_use + cq->pool->ttl < now) {
241       if(prev) prev->next = NULL;
242       else pool->head = NULL;
243       break;
244     }
245     prev = cq;
246     cq = cq->next;
247   }
248   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
249   /* Fix accounting */
250   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
251   new_cnt = pool->in_pool;
252   pthread_mutex_unlock(&pool->lock);
253
254   /* Force release these without holding the lock */
255   while(cq) {
256     prev = cq;
257     cq = cq->next;
258     release_conn_q_forceable(prev, 1);
259   }
260   if(old_cnt != new_cnt)
261     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
262           pool->queue_name);
263 }
264 static void
265 release_conn_q(conn_q *cq) {
266   ttl_purge_conn_pool(cq->pool);
267   release_conn_q_forceable(cq, 0);
268 }
269 static conn_pool *
270 get_conn_pool_for_remote(const char *remote_str,
271                          const char *remote_cn, const char *fqdn) {
272   void *vcpool;
273   conn_pool *cpool = NULL;
274   char queue_name[256] = "datastore_";
275   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
276            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
277            fqdn ? fqdn : "default",
278            remote_cn ? remote_cn : "default");
279   pthread_mutex_lock(&ds_conns_lock);
280   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
281                         strlen(queue_name), &vcpool))
282     cpool = vcpool;
283   pthread_mutex_unlock(&ds_conns_lock);
284   if(!cpool) {
285     vcpool = cpool = calloc(1, sizeof(*cpool));
286     cpool->queue_name = strdup(queue_name);
287     pthread_mutex_init(&cpool->lock, NULL);
288     pthread_cond_init(&cpool->cv, NULL);
289     cpool->in_pool = 0;
290     cpool->outstanding = 0;
291     cpool->max_in_pool = 1;
292     cpool->max_allocated = 1;
293     pthread_mutex_lock(&ds_conns_lock);
294     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
295                         cpool)) {
296       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
297                          strlen(queue_name), &vcpool);
298     }
299     pthread_mutex_unlock(&ds_conns_lock);
300     if(vcpool != cpool) {
301       /* someone beat us to it */
302       free(cpool->queue_name);
303       pthread_mutex_destroy(&cpool->lock);
304       pthread_cond_destroy(&cpool->cv);
305       free(cpool);
306     }
307     else {
308       int i;
309       /* Our job to setup the pool */
310       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
311       eventer_jobq_init(cpool->jobq, queue_name);
312       cpool->jobq->backq = eventer_default_backq();
313       /* Add one thread */
314       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
315         eventer_jobq_increase_concurrency(cpool->jobq);
316     }
317     cpool = vcpool;
318   }
319   return cpool;
320 }
321 static conn_q *
322 get_conn_q_for_remote(const char *remote_str,
323                       const char *remote_cn, const char *fqdn,
324                       const char *dsn) {
325   conn_pool *cpool;
326   conn_q *cq;
327   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
328   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)(vpsized_int)pthread_self(),
329         cpool->queue_name);
330   pthread_mutex_lock(&cpool->lock);
331  again:
332   if(cpool->head) {
333     assert(cpool->in_pool > 0);
334     cq = cpool->head;
335     cpool->head = cq->next;
336     cpool->in_pool--;
337     cpool->outstanding++;
338     cq->next = NULL;
339     pthread_mutex_unlock(&cpool->lock);
340     return cq;
341   }
342   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
343     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
344           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
345     pthread_cond_wait(&cpool->cv, &cpool->lock);
346     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
347           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
348     goto again;
349   }
350   else {
351     cpool->outstanding++;
352     pthread_mutex_unlock(&cpool->lock);
353   }
354  
355   cq = calloc(1, sizeof(*cq));
356   cq->pool = cpool;
357   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
358   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
359   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
360   cq->dsn = dsn ? strdup(dsn) : NULL;
361   return cq;
362 }
363 static conn_q *
364 get_conn_q_for_metanode() {
365   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
366 }
367
368 typedef enum {
369   DS_EXEC_SUCCESS = 0,
370   DS_EXEC_ROW_FAILED = 1,
371   DS_EXEC_TXN_FAILED = 2,
372 } execute_outcome_t;
373
374 #define DECLARE_PARAM_STR(str, len) do { \
375   d->paramValues[d->nparams] = noit__strndup(str, len); \
376   d->paramLengths[d->nparams] = len; \
377   d->paramFormats[d->nparams] = 0; \
378   d->paramAllocd[d->nparams] = 1; \
379   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
380     free(d->paramValues[d->nparams]); \
381     d->paramValues[d->nparams] = NULL; \
382     d->paramLengths[d->nparams] = 0; \
383     d->paramAllocd[d->nparams] = 0; \
384   } \
385   d->nparams++; \
386 } while(0)
387 #define DECLARE_PARAM_INT(i) do { \
388   int buffer__len; \
389   char buffer__[32]; \
390   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
391   buffer__len = strlen(buffer__); \
392   DECLARE_PARAM_STR(buffer__, buffer__len); \
393 } while(0)
394
395 #define PG_GET_STR_COL(dest, row, name) do { \
396   int colnum = PQfnumber(d->res, name); \
397   dest = NULL; \
398   if (colnum >= 0) \
399     dest = PQgetisnull(d->res, row, colnum) \
400          ? NULL : PQgetvalue(d->res, row, colnum); \
401 } while(0)
402
403 #define PG_EXEC(cmd) do { \
404   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
405                         (const char * const *)d->paramValues, \
406                         d->paramLengths, d->paramFormats, 0); \
407   d->rv = PQresultStatus(d->res); \
408   if(d->rv != PGRES_COMMAND_OK && \
409      d->rv != PGRES_TUPLES_OK) { \
410     const char *pgerr = PQresultErrorMessage(d->res); \
411     const char *pgerr_end = strchr(pgerr, '\n'); \
412     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
413     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
414           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
415           (int)(pgerr_end - pgerr), pgerr); \
416     PQclear(d->res); \
417     goto bad_row; \
418   } \
419 } while(0)
420
421 #define PG_TM_EXEC(cmd, whence) do { \
422   time_t __w = whence; \
423   char cmdbuf[4096]; \
424   struct tm tbuf, *tm; \
425   tm = gmtime_r(&__w, &tbuf); \
426   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
427   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
428                         (const char * const *)d->paramValues, \
429                         d->paramLengths, d->paramFormats, 0); \
430   d->rv = PQresultStatus(d->res); \
431   if(d->rv != PGRES_COMMAND_OK && \
432      d->rv != PGRES_TUPLES_OK) { \
433     const char *pgerr = PQresultErrorMessage(d->res); \
434     const char *pgerr_end = strchr(pgerr, '\n'); \
435     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
436     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
437           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
438           (long long unsigned)whence); \
439     PQclear(d->res); \
440     goto bad_row; \
441   } \
442 } while(0)
443
444 static void *
445 stratcon_ingest_check_loadall(void *vsn) {
446   storagenode_info *sn = vsn;
447   ds_single_detail *d;
448   int i, row_count = 0, good = 0;
449   char buff[1024];
450   conn_q *cq = NULL;
451
452   d = calloc(1, sizeof(*d));
453   GET_QUERY(check_loadall);
454   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
455   i = 0;
456   while(stratcon_database_connect(cq)) {
457     if(i++ > 4) {
458       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
459       release_conn_q(cq);
460       free(d);
461       return (void *)(vpsized_int)good;
462     }
463     sleep(1);
464   }
465   PG_EXEC(check_loadall);
466   row_count = PQntuples(d->res);
467  
468   for(i=0; i<row_count; i++) {
469     int rv;
470     int8_t family;
471     struct sockaddr *sin;
472     struct sockaddr_in sin4 = { .sin_family = AF_INET };
473     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
474     char *remote, *id, *target, *module, *name;
475     PG_GET_STR_COL(remote, i, "remote_address");
476     PG_GET_STR_COL(id, i, "id");
477     PG_GET_STR_COL(target, i, "target");
478     PG_GET_STR_COL(module, i, "module");
479     PG_GET_STR_COL(name, i, "name");
480     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
481
482     family = AF_INET;
483     sin = (struct sockaddr *)&sin4;
484     rv = inet_pton(family, remote, &sin4.sin_addr);
485     if(rv != 1) {
486       family = AF_INET6;
487       sin = (struct sockaddr *)&sin6;
488       rv = inet_pton(family, remote, &sin6.sin6_addr);
489       if(rv != 1) {
490         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
491         sin = NULL;
492       }
493     }
494
495     /* stratcon_iep_line_processor takes an allocated operand and frees it */
496     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
497     good++;
498   }
499   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
500         good, row_count, sn->fqdn);
501  bad_row:
502   free_params((ds_single_detail *)d);
503   free(d);
504   if(cq) release_conn_q(cq);
505   return (void *)(vpsized_int)good;
506 }
507 static int
508 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
509                                  struct timeval *now) {
510   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
511   pthread_t *jobs = NULL;
512   int nodes, i = 0, tcnt = 0;
513   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
514   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
515
516   pthread_mutex_lock(&storagenode_to_info_cache_lock);
517   nodes = storagenode_to_info_cache.size;
518   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
519   sns = calloc(MAX(1,nodes), sizeof(*sns));
520   if(nodes == 0) sns[nodes++] = &self;
521   else {
522     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
523     const char *k;
524     void *v;
525     int klen;
526     while(noit_hash_next(&storagenode_to_info_cache,
527                          &iter, &k, &klen, &v)) {
528       sns[i++] = (storagenode_info *)v;
529     }
530   }
531   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
532
533   for(i=0; i<nodes; i++) {
534     if(pthread_create(&jobs[i], NULL,
535                       stratcon_ingest_check_loadall, sns[i]) != 0) {
536       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
537     }
538   }
539   for(i=0; i<nodes; i++) {
540     void *good;
541     pthread_join(jobs[i], &good);
542     tcnt += (int)(vpsized_int)good;
543   }
544   free(jobs);
545   free(sns);
546   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
547   return 0;
548 }
549 static void
550 stratcon_ingest_iep_check_preload() {
551   eventer_t e;
552   conn_pool *cpool;
553
554   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
555   e = eventer_alloc();
556   e->mask = EVENTER_ASYNCH;
557   e->callback = stratcon_ingest_asynch_drive_iep;
558   e->closure = NULL;
559   eventer_add_asynch(cpool->jobq, e);
560 }
561 execute_outcome_t
562 stratcon_ingest_find(ds_rt_detail *d) {
563   conn_q *cq;
564   char *val;
565   int row_count;
566   struct realtime_tracker *node;
567
568   for(node = d->rt; node; node = node->next) {
569     char uuid_str[UUID_STR_LEN+1];
570     const char *fqdn, *dsn, *remote_cn;
571     char remote_ip[32];
572     int storagenode_id;
573
574     uuid_unparse_lower(node->checkid, uuid_str);
575     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
576                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
577       continue;
578
579     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
580           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
581
582     /* We might be able to find the IP from our config if someone has
583      * specified the expected cn in the noit definition.
584      */
585     if(stratcon_find_noit_ip_by_cn(remote_cn,
586                                    remote_ip, sizeof(remote_ip)) == 0) {
587       node->noit = strdup(remote_ip);
588       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
589       continue;
590     }
591
592     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
593     if(stratcon_database_connect(cq) != 0) goto bad_row;
594
595     GET_QUERY(check_find);
596     DECLARE_PARAM_INT(node->sid);
597     PG_EXEC(check_find);
598     row_count = PQntuples(d->res);
599     if(row_count != 1) {
600       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
601       PQclear(d->res);
602       goto bad_row;
603     }
604
605     /* Get the remote_address (which noit owns this) */
606     PG_GET_STR_COL(val, 0, "remote_address");
607     if(!val) {
608       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
609       PQclear(d->res);
610       goto bad_row;
611     }
612     node->noit = strdup(val);
613     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
614    bad_row:
615     free_params((ds_single_detail *)d);
616     d->nparams = 0;
617     release_conn_q(cq);
618   }
619   return DS_EXEC_SUCCESS;
620 }
621 execute_outcome_t
622 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
623                         ds_line_detail *d) {
624   int type, len, sid;
625   char *final_buff;
626   uLong final_len, actual_final_len;
627   char *token;
628   char raddr_blank[1] = "";
629   const char *raddr;
630
631   type = d->data[0];
632   raddr = r ? r : raddr_blank;
633
634   /* Parse the log line, but only if we haven't already */
635   if(!d->nparams) {
636     char *scp, *ecp;
637
638     scp = d->data;
639 #define PROCESS_NEXT_FIELD(t,l) do { \
640   if(!*scp) goto bad_row; \
641   ecp = strchr(scp, '\t'); \
642   if(!ecp) goto bad_row; \
643   token = scp; \
644   len = (ecp-scp); \
645   scp = ecp + 1; \
646 } while(0)
647 #define PROCESS_LAST_FIELD(t,l) do { \
648   if(!*scp) ecp = scp; \
649   else { \
650     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
651     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
652   } \
653   t = scp; \
654   l = (ecp-scp); \
655 } while(0)
656
657     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
658     switch(type) {
659       /* See noit_check_log.c for log description */
660       case 'n':
661         DECLARE_PARAM_STR(raddr, strlen(raddr));
662         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
663         DECLARE_PARAM_STR("noitd",5); /* node_type */
664         PROCESS_NEXT_FIELD(token,len);
665         d->whence = (time_t)strtoul(token, NULL, 10);
666         DECLARE_PARAM_STR(token,len); /* timestamp */
667
668         /* This is the expected uncompressed len */
669         PROCESS_NEXT_FIELD(token,len);
670         final_len = atoi(token);
671         final_buff = malloc(final_len);
672         if(!final_buff) goto bad_row;
673  
674         /* The last token is b64 endoded and compressed.
675          * we need to decode it, declare it and then free it.
676          */
677         PROCESS_LAST_FIELD(token, len);
678         /* We can in-place decode this */
679         len = noit_b64_decode((char *)token, len,
680                               (unsigned char *)token, len);
681         if(len <= 0) {
682           noitL(noit_error, "noitd config base64 decoding error.\n");
683           free(final_buff);
684           goto bad_row;
685         }
686         actual_final_len = final_len;
687         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
688                               (unsigned char *)token, len)) {
689           noitL(noit_error, "noitd config decompression failure.\n");
690           free(final_buff);
691           goto bad_row;
692         }
693         if(final_len != actual_final_len) {
694           noitL(noit_error, "noitd config decompression error.\n");
695           free(final_buff);
696           goto bad_row;
697         }
698         DECLARE_PARAM_STR(final_buff, final_len);
699         free(final_buff);
700         break;
701       case 'D':
702         break;
703       case 'C':
704         DECLARE_PARAM_STR(raddr, strlen(raddr));
705         PROCESS_NEXT_FIELD(token,len);
706         DECLARE_PARAM_STR(token,len); /* timestamp */
707         d->whence = (time_t)strtoul(token, NULL, 10);
708         PROCESS_NEXT_FIELD(token, len);
709         /* uuid is last 36 bytes */
710         if(len > 36) { token += (len-36); len = 36; }
711         sid = uuid_to_sid(token, remote_cn);
712         if(sid == 0) goto bad_row;
713         DECLARE_PARAM_INT(sid); /* sid */
714         DECLARE_PARAM_STR(token,len); /* uuid */
715         PROCESS_NEXT_FIELD(token, len);
716         DECLARE_PARAM_STR(token,len); /* target */
717         PROCESS_NEXT_FIELD(token, len);
718         DECLARE_PARAM_STR(token,len); /* module */
719         PROCESS_LAST_FIELD(token, len);
720         DECLARE_PARAM_STR(token,len); /* name */
721         break;
722       case 'M':
723         PROCESS_NEXT_FIELD(token,len);
724         DECLARE_PARAM_STR(token,len); /* timestamp */
725         d->whence = (time_t)strtoul(token, NULL, 10);
726         PROCESS_NEXT_FIELD(token, len);
727         /* uuid is last 36 bytes */
728         if(len > 36) { token += (len-36); len = 36; }
729         sid = uuid_to_sid(token, remote_cn);
730         if(sid == 0) goto bad_row;
731         DECLARE_PARAM_INT(sid); /* sid */
732         PROCESS_NEXT_FIELD(token, len);
733         DECLARE_PARAM_STR(token,len); /* name */
734         PROCESS_NEXT_FIELD(token,len);
735         d->metric_type = *token;
736         PROCESS_LAST_FIELD(token,len);
737         DECLARE_PARAM_STR(token,len); /* value */
738         break;
739       case 'S':
740         PROCESS_NEXT_FIELD(token,len);
741         DECLARE_PARAM_STR(token,len); /* timestamp */
742         d->whence = (time_t)strtoul(token, NULL, 10);
743         PROCESS_NEXT_FIELD(token, len);
744         /* uuid is last 36 bytes */
745         if(len > 36) { token += (len-36); len = 36; }
746         sid = uuid_to_sid(token, remote_cn);
747         if(sid == 0) goto bad_row;
748         DECLARE_PARAM_INT(sid); /* sid */
749         PROCESS_NEXT_FIELD(token, len);
750         DECLARE_PARAM_STR(token,len); /* state */
751         PROCESS_NEXT_FIELD(token, len);
752         DECLARE_PARAM_STR(token,len); /* availability */
753         PROCESS_NEXT_FIELD(token, len);
754         DECLARE_PARAM_STR(token,len); /* duration */
755         PROCESS_LAST_FIELD(token,len);
756         DECLARE_PARAM_STR(token,len); /* status */
757         break;
758       default:
759         goto bad_row;
760     }
761
762   }
763
764   /* Now execute the query */
765   switch(type) {
766     case 'n':
767       GET_QUERY(config_insert);
768       PG_EXEC(config_insert);
769       PQclear(d->res);
770       break;
771     case 'C':
772       GET_QUERY(check_insert);
773       PG_TM_EXEC(check_insert, d->whence);
774       PQclear(d->res);
775       break;
776     case 'S':
777       GET_QUERY(status_insert);
778       PG_TM_EXEC(status_insert, d->whence);
779       PQclear(d->res);
780       break;
781     case 'D':
782       break;
783     case 'M':
784       switch(d->metric_type) {
785         case METRIC_INT32:
786         case METRIC_UINT32:
787         case METRIC_INT64:
788         case METRIC_UINT64:
789         case METRIC_DOUBLE:
790           GET_QUERY(metric_insert_numeric);
791           PG_TM_EXEC(metric_insert_numeric, d->whence);
792           PQclear(d->res);
793           break;
794         case METRIC_STRING:
795           GET_QUERY(metric_insert_text);
796           PG_TM_EXEC(metric_insert_text, d->whence);
797           PQclear(d->res);
798           break;
799         default:
800           goto bad_row;
801       }
802       break;
803     default:
804       /* should never get here */
805       goto bad_row;
806   }
807   return DS_EXEC_SUCCESS;
808  bad_row:
809   return DS_EXEC_ROW_FAILED;
810 }
811 static int
812 stratcon_database_post_connect(conn_q *cq) {
813   int rv = 0;
814   ds_single_detail _d = { 0 }, *d = &_d;
815   if(cq->fqdn) {
816     char *remote_str, *remote_cn;
817     /* This is the silly way we get null's in through our declare_param_str */
818     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
819     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
820     /* This is a storage node, it gets the storage node post_connect */
821     GET_QUERY(storage_post_connect);
822     rv = -1; /* now we're serious */
823     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
824     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
825     PG_EXEC(storage_post_connect);
826     PQclear(d->res);
827     rv = 0;
828   }
829   else {
830     /* Metanode post_connect */
831     GET_QUERY(metanode_post_connect);
832     rv = -1; /* now we're serious */
833     PG_EXEC(metanode_post_connect);
834     PQclear(d->res);
835     rv = 0;
836   }
837  bad_row:
838   free_params(d);
839   if(rv == -1) {
840     /* Post-connect intentions are serious and fatal */
841     PQfinish(cq->dbh);
842     cq->dbh = NULL;
843   }
844   return rv;
845 }
846 static int
847 stratcon_database_connect(conn_q *cq) {
848   char *dsn, dsn_meta[512];
849   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
850   const char *k, *v;
851   int klen;
852   noit_hash_table *t;
853
854   dsn_meta[0] = '\0';
855   if(!cq->dsn) {
856     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
857     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
858       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
859       strlcat(dsn_meta, k, sizeof(dsn_meta));
860       strlcat(dsn_meta, "=", sizeof(dsn_meta));
861       strlcat(dsn_meta, v, sizeof(dsn_meta));
862     }
863     noit_hash_destroy(t, free, free);
864     free(t);
865     dsn = dsn_meta;
866   }
867   else {
868     char options[32];
869     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
870     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
871                                options, sizeof(options))) {
872       strlcat(dsn_meta, " ", sizeof(dsn_meta));
873       strlcat(dsn_meta, "user", sizeof(dsn_meta));
874       strlcat(dsn_meta, "=", sizeof(dsn_meta));
875       strlcat(dsn_meta, options, sizeof(dsn_meta));
876     }
877     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
878                                options, sizeof(options))) {
879       strlcat(dsn_meta, " ", sizeof(dsn_meta));
880       strlcat(dsn_meta, "password", sizeof(dsn_meta));
881       strlcat(dsn_meta, "=", sizeof(dsn_meta));
882       strlcat(dsn_meta, options, sizeof(dsn_meta));
883     }
884     dsn = dsn_meta;
885   }
886
887   if(cq->dbh) {
888     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
889     PQreset(cq->dbh);
890     if(PQstatus(cq->dbh) != CONNECTION_OK) {
891       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
892             dsn, PQerrorMessage(cq->dbh));
893       return -1;
894     }
895     if(stratcon_database_post_connect(cq)) return -1;
896     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
897     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
898           dsn, PQerrorMessage(cq->dbh));
899     return -1;
900   }
901
902   cq->dbh = PQconnectdb(dsn);
903   if(!cq->dbh) return -1;
904   if(PQstatus(cq->dbh) != CONNECTION_OK) {
905     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
906           dsn, PQerrorMessage(cq->dbh));
907     return -1;
908   }
909   if(stratcon_database_post_connect(cq)) return -1;
910   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
911   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
912         dsn, PQerrorMessage(cq->dbh));
913   return -1;
914 }
915 static int
916 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
917                              const char *name) {
918   int rv = -1;
919   PGresult *res;
920   char cmd[128];
921   strlcpy(cmd, p, sizeof(cmd));
922   strlcat(cmd, name, sizeof(cmd));
923   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
924   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
925   PQclear(res);
926   return rv;
927 }
928 static int
929 stratcon_ingest_do(conn_q *cq, const char *cmd) {
930   PGresult *res;
931   int rv = -1;
932   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
933   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
934   PQclear(res);
935   return rv;
936 }
937 #define BUSTED(cq) do { \
938   PQfinish((cq)->dbh); \
939   (cq)->dbh = NULL; \
940   goto full_monty; \
941 } while(0)
942 #define SAVEPOINT(name) do { \
943   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
944   last_sp = current; \
945 } while(0)
946 #define ROLLBACK_TO_SAVEPOINT(name) do { \
947   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
948     BUSTED(cq); \
949   last_sp = NULL; \
950 } while(0)
951 #define RELEASE_SAVEPOINT(name) do { \
952   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
953     BUSTED(cq); \
954   last_sp = NULL; \
955 } while(0)
956
957 int
958 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
959                               struct timeval *now) {
960   ds_rt_detail *dsjd = closure;
961   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
962   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
963
964   assert(dsjd->rt);
965   stratcon_ingest_find(dsjd);
966   if(dsjd->completion_event)
967     eventer_add(dsjd->completion_event);
968
969   free_params((ds_single_detail *)dsjd);
970   free(dsjd);
971   return 0;
972 }
973 static void
974 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
975                                 eventer_t completion) {
976   eventer_t e;
977   conn_pool *cpool;
978   ds_rt_detail *rtdetail;
979
980   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
981   rtdetail = calloc(1, sizeof(*rtdetail));
982   rtdetail->rt = rt;
983   rtdetail->completion_event = completion;
984   e = eventer_alloc();
985   e->mask = EVENTER_ASYNCH;
986   e->callback = stratcon_ingest_asynch_lookup;
987   e->closure = rtdetail;
988   eventer_add_asynch(cpool->jobq, e);
989 }
990 static const char *
991 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
992   void *vinfo;
993   char *dsn = NULL, *fqdn = NULL;
994   int found = 0;
995   storagenode_info *info = NULL;
996   pthread_mutex_lock(&storagenode_to_info_cache_lock);
997   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
998                         &vinfo)) {
999     found = 1;
1000     info = vinfo;
1001   }
1002   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1003   if(found) {
1004     if(fqdn_out) *fqdn_out = info->fqdn;
1005     return info->dsn;
1006   }
1007
1008   if(!found && can_use_db) {
1009     ds_single_detail *d;
1010     conn_q *cq;
1011     int row_count;
1012     /* Look it up and store it */
1013     d = calloc(1, sizeof(*d));
1014     cq = get_conn_q_for_metanode();
1015     GET_QUERY(find_storage);
1016     DECLARE_PARAM_INT(id);
1017     PG_EXEC(find_storage);
1018     row_count = PQntuples(d->res);
1019     if(row_count) {
1020       PG_GET_STR_COL(dsn, 0, "dsn");
1021       PG_GET_STR_COL(fqdn, 0, "fqdn");
1022       fqdn = fqdn ? strdup(fqdn) : NULL;
1023       dsn = dsn ? strdup(dsn) : NULL;
1024     }
1025     PQclear(d->res);
1026    bad_row:
1027     free_params(d);
1028     free(d);
1029     release_conn_q(cq);
1030   }
1031   if(fqdn) {
1032     info = calloc(1, sizeof(*info));
1033     info->fqdn = fqdn;
1034     if(fqdn_out) *fqdn_out = info->fqdn;
1035     info->dsn = dsn;
1036     info->storagenode_id = id;
1037     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1038     noit_hash_store(&storagenode_to_info_cache,
1039                     (void *)&info->storagenode_id, sizeof(int), info);
1040     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1041   }
1042   return info ? info->dsn : NULL;
1043 }
1044 static void
1045 expand_b_record(ds_line_detail **head, ds_line_detail **last,
1046                 const char *line, int len) {
1047   char **outrows;
1048   int i, cnt;
1049   ds_line_detail *next;
1050
1051   cnt = noit_check_log_b_to_sm(line, len, &outrows);
1052   for(i=0;i<cnt;i++) {
1053     if(outrows[i] == NULL) continue;
1054     next = calloc(sizeof(*next), 1);
1055     next->data = outrows[i];
1056     if(!*head) *head = next;
1057     if(*last) (*last)->next = next;
1058     *last = next;
1059   }
1060   if(outrows) free(outrows);
1061 }
1062 static ds_line_detail *
1063 build_insert_batch(pg_interim_journal_t *ij) {
1064   int rv;
1065   off_t len;
1066   const char *buff, *cp, *lcp;
1067   struct stat st;
1068   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1069
1070   if(ij->fd < 0) {
1071     ij->fd = open(ij->filename, O_RDONLY);
1072     if(ij->fd < 0) {
1073       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1074             ij->filename, strerror(errno));
1075       assert(ij->fd >= 0);
1076     }
1077   }
1078   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1079   if(rv == -1) {
1080       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1081             ij->filename, strerror(errno));
1082     assert(rv != -1);
1083   }
1084   len = st.st_size;
1085   if(len > 0) {
1086     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1087     if(buff == (void *)-1) {
1088       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1089             ij->filename, strerror(errno));
1090       assert(buff != (void *)-1);
1091     }
1092     lcp = buff;
1093     while(lcp < (buff + len) &&
1094           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1095       if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') {
1096       /* Bundle records are special and need to be expanded into
1097        * traditional records here
1098        */
1099         noit_compression_type_t ctype = NOIT_COMPRESS_NONE;
1100         switch(lcp[1]) {
1101           case '1': /* version 1 */
1102             ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */
1103           case '2': /* version 2 */
1104               expand_b_record(&head, &last, lcp, cp - lcp);
1105             break;
1106           default:
1107             noitL(noit_error, "unknown bundle version %c\n", lcp[1]);
1108         }
1109       }
1110       else {
1111         next = calloc(1, sizeof(*next));
1112         next->data = malloc(cp - lcp + 1);
1113         memcpy(next->data, lcp, cp - lcp);
1114         next->data[cp - lcp] = '\0';
1115         if(!head) head = next;
1116         if(last) last->next = next;
1117         last = next;
1118       }
1119       lcp = cp + 1;
1120     }
1121     munmap((void *)buff, len);
1122   }
1123   close(ij->fd);
1124   return head;
1125 }
1126 static void
1127 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1128   unlink(ij->filename);
1129   free(ij->filename);
1130   if(ij->remote_str) free(ij->remote_str);
1131   if(ij->remote_cn) free(ij->remote_cn);
1132   if(ij->fqdn) free(ij->fqdn);
1133   free(ij);
1134 }
1135 static int
1136 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1137                                struct timeval *now) {
1138   int i, total, success, sp_total, sp_success;
1139   pg_interim_journal_t *ij;
1140   ds_line_detail *head = NULL, *current, *last_sp;
1141   const char *dsn;
1142   conn_q *cq;
1143   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1144   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1145
1146   ij = closure;
1147   if(ij->fqdn == NULL) {
1148     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1149     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1150   }
1151   else {
1152     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1153   }
1154   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1155                              ij->fqdn, dsn);
1156   noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1157         ij->remote_str, ij->remote_cn, ij->fqdn);
1158  full_monty:
1159   /* Make sure we have a connection */
1160   i = 1;
1161   while(stratcon_database_connect(cq)) {
1162     noitL(noit_error, "Error connecting to database: %s\n",
1163           ij->fqdn ? ij->fqdn : "(null)");
1164     sleep(i);
1165     i *= 2;
1166     i = MIN(i, 16);
1167   }
1168
1169   if(head == NULL) head = build_insert_batch(ij);
1170   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1171         ij->remote_str ? ij->remote_str : "(null)",
1172         ij->remote_cn ? ij->remote_cn : "(null)",
1173         ij->fqdn ? ij->fqdn : "(null)");
1174   current = head;
1175   last_sp = NULL;
1176   total = success = sp_total = sp_success = 0;
1177   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1178   while(current) {
1179     execute_outcome_t rv;
1180     if(current->data) {
1181       if(!last_sp) {
1182         SAVEPOINT("batch");
1183         sp_success = success;
1184         sp_total = total;
1185       }
1186  
1187       if(current->problematic) {
1188         RELEASE_SAVEPOINT("batch");
1189         current = current->next;
1190         total++;
1191         continue;
1192       }
1193       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1194                                    current);
1195       switch(rv) {
1196         case DS_EXEC_SUCCESS:
1197           total++;
1198           success++;
1199           current = current->next;
1200           break;
1201         case DS_EXEC_ROW_FAILED:
1202           /* rollback to savepoint, mark this record as bad and start again */
1203           if(current->data[0] != 'n')
1204             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1205           current->problematic = 1;
1206           current = last_sp;
1207           success = sp_success;
1208           total = sp_total;
1209           ROLLBACK_TO_SAVEPOINT("batch");
1210           break;
1211         case DS_EXEC_TXN_FAILED:
1212           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1213           BUSTED(cq);
1214       }
1215     }
1216   }
1217   if(last_sp) RELEASE_SAVEPOINT("batch");
1218   if(stratcon_ingest_do(cq, "COMMIT")) {
1219     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1220     BUSTED(cq);
1221   }
1222   /* Cleanup the mess */
1223   while(head) {
1224     ds_line_detail *tofree;
1225     tofree = head;
1226     head = head->next;
1227     if(tofree->data) free(tofree->data);
1228     free_params((ds_single_detail *)tofree);
1229     free(tofree);
1230   }
1231   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1232         ij->remote_str ? ij->remote_str : "(null)",
1233         ij->remote_cn ? ij->remote_cn : "(null)",
1234         ij->fqdn ? ij->fqdn : "(null)", success, total);
1235   pg_interim_journal_remove(ij);
1236   release_conn_q(cq);
1237   return 0;
1238 }
1239 static int
1240 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1241                           int *sid_out, int *storagenode_id_out,
1242                           const char **remote_cn_out,
1243                           const char **fqdn_out, const char **dsn_out) {
1244   /* only called from the main thread -- no safety issues */
1245   void *vuuidinfo, *vinfo;
1246   uuid_info *uuidinfo;
1247   storagenode_info *info = NULL;
1248   char *fqdn = NULL;
1249   char *dsn = NULL;
1250   char *new_remote_cn = NULL;
1251   int storagenode_id = 0, sid = 0;
1252   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1253                          &vuuidinfo)) {
1254     int row_count = 0;
1255     char *tmpint;
1256     ds_single_detail *d;
1257     conn_q *cq;
1258
1259     /* We can't do a database lookup without the remote_cn */
1260     if(!remote_cn) {
1261       if(stratcon_datastore_get_enabled()) {
1262         /* We have an authoritatively maintained cache, we don't do lookups */
1263         return -1;
1264       }
1265       else
1266         remote_cn = "[[null]]";
1267     }
1268
1269     d = calloc(1, sizeof(*d));
1270     cq = get_conn_q_for_metanode();
1271     if(stratcon_database_connect(cq) == 0) {
1272       /* Blocking call to service the cache miss */
1273       GET_QUERY(check_map);
1274       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1275       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1276       PG_EXEC(check_map);
1277       row_count = PQntuples(d->res);
1278       if(row_count != 1) {
1279         PQclear(d->res);
1280         goto bad_row;
1281       }
1282       PG_GET_STR_COL(tmpint, 0, "sid");
1283       if(!tmpint) {
1284         row_count = 0;
1285         PQclear(d->res);
1286         goto bad_row;
1287       }
1288       sid = atoi(tmpint);
1289       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1290       if(tmpint) storagenode_id = atoi(tmpint);
1291       PG_GET_STR_COL(fqdn, 0, "fqdn");
1292       PG_GET_STR_COL(dsn, 0, "dsn");
1293       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1294       fqdn = fqdn ? strdup(fqdn) : NULL;
1295       dsn = dsn ? strdup(dsn) : NULL;
1296       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1297       PQclear(d->res);
1298     }
1299    bad_row:
1300     free_params((ds_single_detail *)d);
1301     free(d);
1302     release_conn_q(cq);
1303     if(row_count != 1) {
1304       return -1;
1305     }
1306     /* Place in cache */
1307     uuidinfo = calloc(1, sizeof(*uuidinfo));
1308     uuidinfo->sid = sid;
1309     uuidinfo->uuid_str = strdup(uuid_str);
1310     uuidinfo->storagenode_id = storagenode_id;
1311     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1312     noit_hash_store(&uuid_to_info_cache,
1313                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1314     /* Also, we may have just witnessed a new storage node, store it */
1315     if(storagenode_id) {
1316       int needs_free = 0;
1317       info = calloc(1, sizeof(*info));
1318       info->storagenode_id = storagenode_id;
1319       info->dsn = dsn ? strdup(dsn) : NULL;
1320       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1321       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1322       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1323                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1324         /* hack to save memory -- we *never* remove from these caches,
1325            so we can use the same fqdn value in the above cache for the key
1326            in the cache below -- (no strdup) */
1327         noit_hash_store(&storagenode_to_info_cache,
1328                         (void *)&info->storagenode_id, sizeof(int), info);
1329       }
1330       else needs_free = 1;
1331       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1332       if(needs_free) {
1333         if(info->dsn) free(info->dsn);
1334         if(info->fqdn) free(info->fqdn);
1335         free(info);
1336       }
1337     }
1338   }
1339   else
1340     uuidinfo = vuuidinfo;
1341
1342   if(uuidinfo && uuidinfo->storagenode_id) {
1343     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1344       /* we don't have dsn and we actually want it */
1345       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1346       if(noit_hash_retrieve(&storagenode_to_info_cache,
1347                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1348                             &vinfo))
1349         info = vinfo;
1350       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1351     }
1352   }
1353
1354   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1355   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1356   assert(uuidinfo);
1357   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1358   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1359   if(sid_out) *sid_out = uuidinfo->sid;
1360   if(fqdn) free(fqdn);
1361   if(dsn) free(dsn);
1362   if(new_remote_cn) free(new_remote_cn);
1363   return 0;
1364 }
1365 static int
1366 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1367   char uuid_str[UUID_STR_LEN+1];
1368   int sid = 0;
1369   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1370   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1371   return sid;
1372 }
1373
1374 static int
1375 stratcon_ingest_saveconfig() {
1376   int rv = -1;
1377   char *buff;
1378   ds_single_detail _d = { 0 }, *d = &_d;
1379   conn_q *cq;
1380   char ipv4_str[32];
1381   struct in_addr r, l;
1382
1383   r.s_addr = htonl((4 << 24) | (2 << 16) | (2 << 8) | 1);
1384   memset(&l, 0, sizeof(l));
1385   noit_getip_ipv4(r, &l);
1386   /* Ignore the error.. what are we going to do anyway */
1387   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1388     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1389
1390   cq = get_conn_q_for_metanode();
1391
1392   if(stratcon_database_connect(cq) == 0) {
1393     char time_as_str[20];
1394     size_t len;
1395     buff = noit_conf_xml_in_mem(&len);
1396     if(!buff) goto bad_row;
1397
1398     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1399     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1400     DECLARE_PARAM_STR("", 0);
1401     DECLARE_PARAM_STR("stratcond", 9);
1402     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1403     DECLARE_PARAM_STR(buff, len);
1404     free(buff);
1405
1406     GET_QUERY(config_insert);
1407     PG_EXEC(config_insert);
1408     PQclear(d->res);
1409     rv = 0;
1410
1411     bad_row:
1412       free_params(d);
1413   }
1414   release_conn_q(cq);
1415   return rv;
1416 }
1417
1418 static int
1419 stratcon_ingest_launch_file_ingestion(const char *path,
1420                                       const char *remote_str,
1421                                       const char *remote_cn,
1422                                       const char *id_str) {
1423   pg_interim_journal_t *ij;
1424   char pgfile[PATH_MAX];
1425   eventer_t ingest;
1426
1427   if(strcmp(path + strlen(path) - 3, ".pg")) {
1428     snprintf(pgfile, sizeof(pgfile), "%s.pg", path);
1429     if(link(path, pgfile) < 0 && errno != EEXIST) {
1430       noitL(noit_error, "cannot link journal %s: %s\n", path, strerror(errno));
1431       return -1;
1432     }
1433   }
1434   else
1435     strlcpy(pgfile, path, sizeof(pgfile));
1436   ij = calloc(1, sizeof(*ij));
1437   ij->fd = open(pgfile, O_RDONLY);
1438   if(ij->fd < 0) {
1439     noitL(noit_error, "cannot open journal '%s': %s\n",
1440           pgfile, strerror(errno));
1441     free(ij);
1442     return -1;
1443   }
1444   close(ij->fd);
1445   ij->fd = -1;
1446   ij->filename = strdup(pgfile);
1447   ij->remote_str = strdup(remote_str);
1448   ij->remote_cn = strdup(remote_cn);
1449   ij->storagenode_id = atoi(id_str);
1450   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1451                                        ij->fqdn);
1452   noitL(noit_debug, "ingesting payload: %s\n", ij->filename);
1453   ingest = eventer_alloc();
1454   ingest->mask = EVENTER_ASYNCH;
1455   ingest->callback = stratcon_ingest_asynch_execute;
1456   ingest->closure = ij;
1457   eventer_add_asynch(ij->cpool->jobq, ingest);
1458   return 0;
1459 }
1460
1461 int
1462 stratcon_ingest_all_storagenode_info() {
1463   int i, cnt = 0;
1464   ds_single_detail _d = { 0 }, *d = &_d;
1465   conn_q *cq;
1466   cq = get_conn_q_for_metanode();
1467
1468   while(stratcon_database_connect(cq)) {
1469     noitL(noit_error, "Error connecting to database\n");
1470     sleep(1);
1471   }
1472
1473   GET_QUERY(all_storage);
1474   PG_EXEC(all_storage);
1475   cnt = PQntuples(d->res);
1476   for(i=0; i<cnt; i++) {
1477     void *vinfo;
1478     char *tmpint, *fqdn, *dsn;
1479     int storagenode_id;
1480     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1481     storagenode_id = atoi(tmpint);
1482     PG_GET_STR_COL(fqdn, i, "fqdn");
1483     PG_GET_STR_COL(dsn, i, "dsn");
1484     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1485     storagenode_id = tmpint ? atoi(tmpint) : 0;
1486
1487     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1488                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1489       storagenode_info *info;
1490       info = calloc(1, sizeof(*info));
1491       info->storagenode_id = storagenode_id;
1492       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1493       info->dsn = dsn ? strdup(dsn) : NULL;
1494       noit_hash_store(&storagenode_to_info_cache,
1495                       (void *)&info->storagenode_id, sizeof(int), info);
1496       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1497             info->storagenode_id,
1498             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1499     }
1500     noit_watchdog_child_heartbeat();
1501   }
1502   PQclear(d->res);
1503  bad_row:
1504   free_params(d);
1505
1506   release_conn_q(cq);
1507   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1508   return cnt;
1509 }
1510 int
1511 stratcon_ingest_all_check_info() {
1512   int i, cnt, loaded = 0;
1513   ds_single_detail _d = { 0 }, *d = &_d;
1514   conn_q *cq;
1515   cq = get_conn_q_for_metanode();
1516
1517   while(stratcon_database_connect(cq)) {
1518     noitL(noit_error, "Error connecting to database\n");
1519     sleep(1);
1520   }
1521
1522   GET_QUERY(check_mapall);
1523   PG_EXEC(check_mapall);
1524   cnt = PQntuples(d->res);
1525   for(i=0; i<cnt; i++) {
1526     void *vinfo;
1527     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1528     int sid, storagenode_id;
1529     uuid_info *uuidinfo;
1530     PG_GET_STR_COL(uuid_str, i, "id");
1531     if(!uuid_str) continue;
1532     PG_GET_STR_COL(tmpint, i, "sid");
1533     if(!tmpint) continue;
1534     sid = atoi(tmpint);
1535     PG_GET_STR_COL(fqdn, i, "fqdn");
1536     PG_GET_STR_COL(dsn, i, "dsn");
1537     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1538     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1539     storagenode_id = tmpint ? atoi(tmpint) : 0;
1540
1541     uuidinfo = calloc(1, sizeof(*uuidinfo));
1542     uuidinfo->uuid_str = strdup(uuid_str);
1543     uuidinfo->remote_cn = strdup(remote_cn);
1544     uuidinfo->storagenode_id = storagenode_id;
1545     uuidinfo->sid = sid;
1546     noit_hash_store(&uuid_to_info_cache,
1547                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1548     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1549           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1550     loaded++;
1551     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1552                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1553       storagenode_info *info;
1554       info = calloc(1, sizeof(*info));
1555       info->storagenode_id = storagenode_id;
1556       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1557       info->dsn = dsn ? strdup(dsn) : NULL;
1558       noit_hash_store(&storagenode_to_info_cache,
1559                       (void *)&info->storagenode_id, sizeof(int), info);
1560       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1561             info->storagenode_id,
1562             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1563     }
1564     noit_watchdog_child_heartbeat();
1565   }
1566   PQclear(d->res);
1567  bad_row:
1568   free_params(d);
1569
1570   release_conn_q(cq);
1571   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1572   return loaded;
1573 }
1574
1575 static char *
1576 stratcon_get_noit_config(const char *cn) {
1577   ds_single_detail *d;
1578   int row_count = 0;
1579   const char *xml = NULL;
1580   char *xmlcopy = NULL;
1581   conn_q *cq = NULL;
1582
1583   d = calloc(1, sizeof(*d));
1584   GET_QUERY(config_get);
1585   cq = get_conn_q_for_metanode();
1586   if(!cq) goto bad_row;
1587
1588   DECLARE_PARAM_STR(cn, cn ? strlen(cn) : 0);
1589   PG_EXEC(config_get);
1590   row_count = PQntuples(d->res);
1591   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1592
1593   if(xml) xmlcopy = strdup(xml);
1594
1595  bad_row:
1596   free_params((ds_single_detail *)d);
1597   d->nparams = 0;
1598   if(cq) release_conn_q(cq);
1599   free(d);
1600
1601   return xmlcopy;
1602 }
1603
1604 static ingestor_api_t postgres_ingestor_api = {
1605   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1606   .iep_check_preload = stratcon_ingest_iep_check_preload,
1607   .storage_node_lookup = storage_node_quick_lookup,
1608   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1609   .get_noit_config = stratcon_get_noit_config,
1610   .save_config = stratcon_ingest_saveconfig
1611 };
1612
1613 static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
1614   return 0;
1615 }
1616 static int postgres_ingestor_onload(noit_image_t *self) {
1617   return 0;
1618 }
1619 static int is_postgres_ingestor_file(const char *file) {
1620   noit_watchdog_child_heartbeat();
1621   return (strlen(file) == 19 && !strcmp(file + 16, ".pg"));
1622 }
1623 static int postgres_ingestor_init(noit_module_generic_t *self) {
1624   stratcon_datastore_core_init();
1625   pthread_mutex_init(&ds_conns_lock, NULL);
1626   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1627   ds_err = noit_log_stream_find("error/datastore");
1628   ds_deb = noit_log_stream_find("debug/datastore");
1629   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1630   ingest_err = noit_log_stream_find("error/ingest");
1631   if(!ds_err) ds_err = noit_error;
1632   if(!ingest_err) ingest_err = noit_error;
1633   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1634                            &basejpath)) {
1635     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1636     exit(-1);
1637   }
1638   stratcon_ingest_all_check_info();
1639   stratcon_ingest_all_storagenode_info();
1640   stratcon_ingest_sweep_journals(is_postgres_ingestor_file,
1641                                  stratcon_ingest_launch_file_ingestion);
1642   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1643 }
1644
1645 noit_module_generic_t postgres_ingestor = {
1646   {
1647     .magic = NOIT_GENERIC_MAGIC,
1648     .version = NOIT_GENERIC_ABI_VERSION,
1649     .name = "postgres_ingestor",
1650     .description = "postgres drive for data ingestion",
1651     .xml_description = postgres_ingestor_xml_description,
1652     .onload = postgres_ingestor_onload,
1653   }, 
1654   postgres_ingestor_config,
1655   postgres_ingestor_init
1656 };
Note: See TracBrowser for help on using the browser.