root/src/modules/postgres_ingestor.c

Revision 9d92fd6e7f517697099fa55c5aa32bd67d1d3e9e, 51.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

hookup get_noit_config function

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "utils/noit_watchdog.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_realtime_http.h"
44 #include "stratcon_iep.h"
45 #include "noit_conf.h"
46 #include "noit_check.h"
47 #include "noit_check_log_helpers.h"
48 #include "noit_rest.h"
49 #include <unistd.h>
50 #include <fcntl.h>
51 #include <netinet/in.h>
52 #include <sys/un.h>
53 #include <dirent.h>
54 #include <arpa/inet.h>
55 #include <sys/mman.h>
56 #include <libpq-fe.h>
57 #include <zlib.h>
58 #include <assert.h>
59 #include <errno.h>
60 #include "postgres_ingestor.xmlh"
61 #include "bundle.pb-c.h"
62
63 #define DECL_STMT(codename,confname) \
64 static char *codename = NULL; \
65 static const char *codename##_conf = "/stratcon/database/statements/" #confname
66
67 DECL_STMT(storage_post_connect, storagepostconnect);
68 DECL_STMT(metanode_post_connect, metanodepostconnect);
69 DECL_STMT(find_storage, findstoragenode);
70 DECL_STMT(all_storage, allstoragenodes);
71 DECL_STMT(check_map, mapchecktostoragenode);
72 DECL_STMT(check_mapall, mapallchecks);
73 DECL_STMT(check_loadall, allchecks);
74 DECL_STMT(check_find, findcheck);
75 DECL_STMT(check_insert, check);
76 DECL_STMT(status_insert, status);
77 DECL_STMT(metric_insert_numeric, metric_numeric);
78 DECL_STMT(metric_insert_text, metric_text);
79 DECL_STMT(config_insert, config);
80 DECL_STMT(config_get, findconfig);
81
82 static noit_log_stream_t ds_err = NULL;
83 static noit_log_stream_t ds_deb = NULL;
84 static noit_log_stream_t ds_pool_deb = NULL;
85 static noit_log_stream_t ingest_err = NULL;
86
87 #define GET_QUERY(a) do { \
88   if(a == NULL) \
89     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
90       goto bad_row; \
91 } while(0)
92
93 struct conn_q;
94
95 typedef struct {
96   char            *queue_name; /* the key fqdn+remote_sn */
97   eventer_jobq_t  *jobq;
98   struct conn_q   *head;
99   pthread_mutex_t  lock;
100   pthread_cond_t   cv;
101   int              ttl;
102   int              in_pool;
103   int              outstanding;
104   int              max_allocated;
105   int              max_in_pool;
106 } conn_pool;
107 typedef struct conn_q {
108   time_t           last_use;
109   char            *dsn;        /* Pg connect string */
110   char            *remote_str; /* the IP of the noit*/
111   char            *remote_cn;  /* the Cert CN of the noit */
112   char            *fqdn;       /* the fqdn of the storage node */
113   conn_pool       *pool;
114   struct conn_q   *next;
115   /* Postgres specific stuff */
116   PGconn          *dbh;
117 } conn_q;
118
119
120 #define MAX_PARAMS 8
121 #define POSTGRES_PARTS \
122   PGresult *res; \
123   int rv; \
124   time_t whence; \
125   int nparams; \
126   int metric_type; \
127   char *paramValues[MAX_PARAMS]; \
128   int paramLengths[MAX_PARAMS]; \
129   int paramFormats[MAX_PARAMS]; \
130   int paramAllocd[MAX_PARAMS];
131
132 typedef struct ds_single_detail {
133   POSTGRES_PARTS
134 } ds_single_detail;
135 typedef struct {
136   /* Postgres specific stuff */
137   POSTGRES_PARTS
138   struct realtime_tracker *rt;
139   conn_q *cq; /* connection on which to perform this job */
140   eventer_t completion_event; /* This event should be registered if non NULL */
141 } ds_rt_detail;
142 typedef struct ds_line_detail {
143   /* Postgres specific stuff */
144   POSTGRES_PARTS
145   char *data;
146   int problematic;
147   struct ds_line_detail *next;
148 } ds_line_detail;
149
150 typedef struct {
151   char *remote_str;
152   char *remote_cn;
153   char *fqdn;
154   int storagenode_id;
155   int fd;
156   char *filename;
157   conn_pool *cpool;
158 } pg_interim_journal_t;
159
160 static int stratcon_database_connect(conn_q *cq);
161 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
162 static int storage_node_quick_lookup(const char *uuid_str,
163                                      const char *remote_cn,
164                                      int *sid_out, int *storagenode_id_out,
165                                      const char **remote_cn_out,
166                                      const char **fqdn_out,
167                                      const char **dsn_out);
168
169 static void
170 free_params(ds_single_detail *d) {
171   int i;
172   for(i=0; i<d->nparams; i++)
173     if(d->paramAllocd[i] && d->paramValues[i])
174       free(d->paramValues[i]);
175 }
176
177 static char *basejpath = NULL;
178 static pthread_mutex_t ds_conns_lock;
179 static noit_hash_table ds_conns;
180
181 /* the fqdn cache needs to be thread safe */
182 typedef struct {
183   char *uuid_str;
184   char *remote_cn;
185   int storagenode_id;
186   int sid;
187 } uuid_info;
188 typedef struct {
189   int storagenode_id;
190   char *fqdn;
191   char *dsn;
192 } storagenode_info;
193 noit_hash_table uuid_to_info_cache;
194 pthread_mutex_t storagenode_to_info_cache_lock;
195 noit_hash_table storagenode_to_info_cache;
196
197 /* Thread-safe connection pools */
198
199 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
200 static void
201 release_conn_q_forceable(conn_q *cq, int forcefree) {
202   int putback = 0;
203   cq->last_use = time(NULL);
204   pthread_mutex_lock(&cq->pool->lock);
205   cq->pool->outstanding--;
206   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
207     putback = 1;
208     cq->next = cq->pool->head;
209     cq->pool->head = cq;
210     cq->pool->in_pool++;
211   }
212   pthread_mutex_unlock(&cq->pool->lock);
213   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
214         putback ? "to pool" : "and destroy", cq->pool->queue_name);
215   pthread_cond_signal(&cq->pool->cv);
216   if(putback) return;
217
218   /* Not put back, release it */
219   if(cq->dbh) PQfinish(cq->dbh);
220   if(cq->remote_str) free(cq->remote_str);
221   if(cq->remote_cn) free(cq->remote_cn);
222   if(cq->fqdn) free(cq->fqdn);
223   if(cq->dsn) free(cq->dsn);
224   free(cq);
225 }
226 static void
227 ttl_purge_conn_pool(conn_pool *pool) {
228   int old_cnt, new_cnt;
229   time_t now = time(NULL);
230   conn_q *cq, *prev = NULL, *iter;
231   /* because we always replace on the head and update the last_use time when
232      doing so, we know they are ordered LRU on the end.  So, once we hit an
233      old one, we know all the others are old too.
234    */
235   if(!pool->head) return; /* hack short circuit for no locks */
236   pthread_mutex_lock(&pool->lock);
237   old_cnt = pool->in_pool;
238   cq = pool->head;
239   while(cq) {
240     if(cq->last_use + cq->pool->ttl < now) {
241       if(prev) prev->next = NULL;
242       else pool->head = NULL;
243       break;
244     }
245     prev = cq;
246     cq = cq->next;
247   }
248   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
249   /* Fix accounting */
250   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
251   new_cnt = pool->in_pool;
252   pthread_mutex_unlock(&pool->lock);
253
254   /* Force release these without holding the lock */
255   while(cq) {
256     cq = cq->next;
257     release_conn_q_forceable(cq, 1);
258   }
259   if(old_cnt != new_cnt)
260     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
261           pool->queue_name);
262 }
263 static void
264 release_conn_q(conn_q *cq) {
265   ttl_purge_conn_pool(cq->pool);
266   release_conn_q_forceable(cq, 0);
267 }
268 static conn_pool *
269 get_conn_pool_for_remote(const char *remote_str,
270                          const char *remote_cn, const char *fqdn) {
271   void *vcpool;
272   conn_pool *cpool = NULL;
273   char queue_name[256] = "datastore_";
274   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
275            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
276            fqdn ? fqdn : "default",
277            remote_cn ? remote_cn : "default");
278   pthread_mutex_lock(&ds_conns_lock);
279   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
280                         strlen(queue_name), &vcpool))
281     cpool = vcpool;
282   pthread_mutex_unlock(&ds_conns_lock);
283   if(!cpool) {
284     vcpool = cpool = calloc(1, sizeof(*cpool));
285     cpool->queue_name = strdup(queue_name);
286     pthread_mutex_init(&cpool->lock, NULL);
287     pthread_cond_init(&cpool->cv, NULL);
288     cpool->in_pool = 0;
289     cpool->outstanding = 0;
290     cpool->max_in_pool = 1;
291     cpool->max_allocated = 1;
292     pthread_mutex_lock(&ds_conns_lock);
293     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
294                         cpool)) {
295       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
296                          strlen(queue_name), &vcpool);
297     }
298     pthread_mutex_unlock(&ds_conns_lock);
299     if(vcpool != cpool) {
300       /* someone beat us to it */
301       free(cpool->queue_name);
302       pthread_mutex_destroy(&cpool->lock);
303       pthread_cond_destroy(&cpool->cv);
304       free(cpool);
305     }
306     else {
307       int i;
308       /* Our job to setup the pool */
309       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
310       eventer_jobq_init(cpool->jobq, queue_name);
311       cpool->jobq->backq = eventer_default_backq();
312       /* Add one thread */
313       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
314         eventer_jobq_increase_concurrency(cpool->jobq);
315     }
316     cpool = vcpool;
317   }
318   return cpool;
319 }
320 static conn_q *
321 get_conn_q_for_remote(const char *remote_str,
322                       const char *remote_cn, const char *fqdn,
323                       const char *dsn) {
324   conn_pool *cpool;
325   conn_q *cq;
326   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
327   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
328         cpool->queue_name);
329   pthread_mutex_lock(&cpool->lock);
330  again:
331   if(cpool->head) {
332     assert(cpool->in_pool > 0);
333     cq = cpool->head;
334     cpool->head = cq->next;
335     cpool->in_pool--;
336     cpool->outstanding++;
337     cq->next = NULL;
338     pthread_mutex_unlock(&cpool->lock);
339     return cq;
340   }
341   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
342     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
343           (void *)pthread_self(), cpool->queue_name);
344     pthread_cond_wait(&cpool->cv, &cpool->lock);
345     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
346           (void *)pthread_self(), cpool->queue_name);
347     goto again;
348   }
349   else {
350     cpool->outstanding++;
351     pthread_mutex_unlock(&cpool->lock);
352   }
353  
354   cq = calloc(1, sizeof(*cq));
355   cq->pool = cpool;
356   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
357   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
358   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
359   cq->dsn = dsn ? strdup(dsn) : NULL;
360   return cq;
361 }
362 static conn_q *
363 get_conn_q_for_metanode() {
364   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
365 }
366
367 typedef enum {
368   DS_EXEC_SUCCESS = 0,
369   DS_EXEC_ROW_FAILED = 1,
370   DS_EXEC_TXN_FAILED = 2,
371 } execute_outcome_t;
372
373 #define DECLARE_PARAM_STR(str, len) do { \
374   d->paramValues[d->nparams] = noit__strndup(str, len); \
375   d->paramLengths[d->nparams] = len; \
376   d->paramFormats[d->nparams] = 0; \
377   d->paramAllocd[d->nparams] = 1; \
378   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
379     free(d->paramValues[d->nparams]); \
380     d->paramValues[d->nparams] = NULL; \
381     d->paramLengths[d->nparams] = 0; \
382     d->paramAllocd[d->nparams] = 0; \
383   } \
384   d->nparams++; \
385 } while(0)
386 #define DECLARE_PARAM_INT(i) do { \
387   int buffer__len; \
388   char buffer__[32]; \
389   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
390   buffer__len = strlen(buffer__); \
391   DECLARE_PARAM_STR(buffer__, buffer__len); \
392 } while(0)
393
394 #define PG_GET_STR_COL(dest, row, name) do { \
395   int colnum = PQfnumber(d->res, name); \
396   dest = NULL; \
397   if (colnum >= 0) \
398     dest = PQgetisnull(d->res, row, colnum) \
399          ? NULL : PQgetvalue(d->res, row, colnum); \
400 } while(0)
401
402 #define PG_EXEC(cmd) do { \
403   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
404                         (const char * const *)d->paramValues, \
405                         d->paramLengths, d->paramFormats, 0); \
406   d->rv = PQresultStatus(d->res); \
407   if(d->rv != PGRES_COMMAND_OK && \
408      d->rv != PGRES_TUPLES_OK) { \
409     const char *pgerr = PQresultErrorMessage(d->res); \
410     const char *pgerr_end = strchr(pgerr, '\n'); \
411     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
412     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
413           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
414           (int)(pgerr_end - pgerr), pgerr); \
415     PQclear(d->res); \
416     goto bad_row; \
417   } \
418 } while(0)
419
420 #define PG_TM_EXEC(cmd, whence) do { \
421   time_t __w = whence; \
422   char cmdbuf[4096]; \
423   struct tm tbuf, *tm; \
424   tm = gmtime_r(&__w, &tbuf); \
425   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
426   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
427                         (const char * const *)d->paramValues, \
428                         d->paramLengths, d->paramFormats, 0); \
429   d->rv = PQresultStatus(d->res); \
430   if(d->rv != PGRES_COMMAND_OK && \
431      d->rv != PGRES_TUPLES_OK) { \
432     const char *pgerr = PQresultErrorMessage(d->res); \
433     const char *pgerr_end = strchr(pgerr, '\n'); \
434     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
435     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
436           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
437           (long long unsigned)whence); \
438     PQclear(d->res); \
439     goto bad_row; \
440   } \
441 } while(0)
442
443 static void *
444 stratcon_ingest_check_loadall(void *vsn) {
445   storagenode_info *sn = vsn;
446   ds_single_detail *d;
447   int i, row_count = 0, good = 0;
448   char buff[1024];
449   conn_q *cq = NULL;
450
451   d = calloc(1, sizeof(*d));
452   GET_QUERY(check_loadall);
453   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
454   i = 0;
455   while(stratcon_database_connect(cq)) {
456     if(i++ > 4) {
457       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
458       release_conn_q(cq);
459       return (void *)(vpsized_int)good;
460     }
461     sleep(1);
462   }
463   PG_EXEC(check_loadall);
464   row_count = PQntuples(d->res);
465  
466   for(i=0; i<row_count; i++) {
467     int rv;
468     int8_t family;
469     struct sockaddr *sin;
470     struct sockaddr_in sin4 = { .sin_family = AF_INET };
471     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
472     char *remote, *id, *target, *module, *name;
473     PG_GET_STR_COL(remote, i, "remote_address");
474     PG_GET_STR_COL(id, i, "id");
475     PG_GET_STR_COL(target, i, "target");
476     PG_GET_STR_COL(module, i, "module");
477     PG_GET_STR_COL(name, i, "name");
478     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
479
480     family = AF_INET;
481     sin = (struct sockaddr *)&sin4;
482     rv = inet_pton(family, remote, &sin4.sin_addr);
483     if(rv != 1) {
484       family = AF_INET6;
485       sin = (struct sockaddr *)&sin6;
486       rv = inet_pton(family, remote, &sin6.sin6_addr);
487       if(rv != 1) {
488         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
489         sin = NULL;
490       }
491     }
492
493     /* stratcon_iep_line_processor takes an allocated operand and frees it */
494     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
495     good++;
496   }
497   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
498         good, row_count, sn->fqdn);
499  bad_row:
500   free_params((ds_single_detail *)d);
501   free(d);
502   if(cq) release_conn_q(cq);
503   return (void *)(vpsized_int)good;
504 }
505 static int
506 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
507                                  struct timeval *now) {
508   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
509   pthread_t *jobs = NULL;
510   int nodes, i = 0, tcnt = 0;
511   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
512   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
513
514   pthread_mutex_lock(&storagenode_to_info_cache_lock);
515   nodes = storagenode_to_info_cache.size;
516   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
517   sns = calloc(MAX(1,nodes), sizeof(*sns));
518   if(nodes == 0) sns[nodes++] = &self;
519   else {
520     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
521     const char *k;
522     void *v;
523     int klen;
524     while(noit_hash_next(&storagenode_to_info_cache,
525                          &iter, &k, &klen, &v)) {
526       sns[i++] = (storagenode_info *)v;
527     }
528   }
529   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
530
531   for(i=0; i<nodes; i++) {
532     if(pthread_create(&jobs[i], NULL,
533                       stratcon_ingest_check_loadall, sns[i]) != 0) {
534       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
535     }
536   }
537   for(i=0; i<nodes; i++) {
538     void *good;
539     pthread_join(jobs[i], &good);
540     tcnt += (int)(vpsized_int)good;
541   }
542   free(jobs);
543   free(sns);
544   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
545   return 0;
546 }
547 static void
548 stratcon_ingest_iep_check_preload() {
549   eventer_t e;
550   conn_pool *cpool;
551
552   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
553   e = eventer_alloc();
554   e->mask = EVENTER_ASYNCH;
555   e->callback = stratcon_ingest_asynch_drive_iep;
556   e->closure = NULL;
557   eventer_add_asynch(cpool->jobq, e);
558 }
559 execute_outcome_t
560 stratcon_ingest_find(ds_rt_detail *d) {
561   conn_q *cq;
562   char *val;
563   int row_count;
564   struct realtime_tracker *node;
565
566   for(node = d->rt; node; node = node->next) {
567     char uuid_str[UUID_STR_LEN+1];
568     const char *fqdn, *dsn, *remote_cn;
569     char remote_ip[32];
570     int storagenode_id;
571
572     uuid_unparse_lower(node->checkid, uuid_str);
573     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
574                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
575       continue;
576
577     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
578           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
579
580     /* We might be able to find the IP from our config if someone has
581      * specified the expected cn in the noit definition.
582      */
583     if(stratcon_find_noit_ip_by_cn(remote_cn,
584                                    remote_ip, sizeof(remote_ip)) == 0) {
585       node->noit = strdup(remote_ip);
586       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
587       continue;
588     }
589
590     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
591     stratcon_database_connect(cq);
592
593     GET_QUERY(check_find);
594     DECLARE_PARAM_INT(node->sid);
595     PG_EXEC(check_find);
596     row_count = PQntuples(d->res);
597     if(row_count != 1) {
598       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
599       PQclear(d->res);
600       goto bad_row;
601     }
602
603     /* Get the remote_address (which noit owns this) */
604     PG_GET_STR_COL(val, 0, "remote_address");
605     if(!val) {
606       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
607       PQclear(d->res);
608       goto bad_row;
609     }
610     node->noit = strdup(val);
611     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
612    bad_row:
613     free_params((ds_single_detail *)d);
614     d->nparams = 0;
615     release_conn_q(cq);
616   }
617   return DS_EXEC_SUCCESS;
618 }
619 execute_outcome_t
620 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
621                         ds_line_detail *d) {
622   int type, len, sid;
623   char *final_buff;
624   uLong final_len, actual_final_len;
625   char *token;
626   char raddr_blank[1] = "";
627   const char *raddr;
628
629   type = d->data[0];
630   raddr = r ? r : raddr_blank;
631
632   /* Parse the log line, but only if we haven't already */
633   if(!d->nparams) {
634     char *scp, *ecp;
635
636     scp = d->data;
637 #define PROCESS_NEXT_FIELD(t,l) do { \
638   if(!*scp) goto bad_row; \
639   ecp = strchr(scp, '\t'); \
640   if(!ecp) goto bad_row; \
641   token = scp; \
642   len = (ecp-scp); \
643   scp = ecp + 1; \
644 } while(0)
645 #define PROCESS_LAST_FIELD(t,l) do { \
646   if(!*scp) ecp = scp; \
647   else { \
648     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
649     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
650   } \
651   t = scp; \
652   l = (ecp-scp); \
653 } while(0)
654
655     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
656     switch(type) {
657       /* See noit_check_log.c for log description */
658       case 'n':
659         DECLARE_PARAM_STR(raddr, strlen(raddr));
660         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
661         DECLARE_PARAM_STR("noitd",5); /* node_type */
662         PROCESS_NEXT_FIELD(token,len);
663         d->whence = (time_t)strtoul(token, NULL, 10);
664         DECLARE_PARAM_STR(token,len); /* timestamp */
665
666         /* This is the expected uncompressed len */
667         PROCESS_NEXT_FIELD(token,len);
668         final_len = atoi(token);
669         final_buff = malloc(final_len);
670         if(!final_buff) goto bad_row;
671  
672         /* The last token is b64 endoded and compressed.
673          * we need to decode it, declare it and then free it.
674          */
675         PROCESS_LAST_FIELD(token, len);
676         /* We can in-place decode this */
677         len = noit_b64_decode((char *)token, len,
678                               (unsigned char *)token, len);
679         if(len <= 0) {
680           noitL(noit_error, "noitd config base64 decoding error.\n");
681           free(final_buff);
682           goto bad_row;
683         }
684         actual_final_len = final_len;
685         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
686                               (unsigned char *)token, len)) {
687           noitL(noit_error, "noitd config decompression failure.\n");
688           free(final_buff);
689           goto bad_row;
690         }
691         if(final_len != actual_final_len) {
692           noitL(noit_error, "noitd config decompression error.\n");
693           free(final_buff);
694           goto bad_row;
695         }
696         DECLARE_PARAM_STR(final_buff, final_len);
697         free(final_buff);
698         break;
699       case 'D':
700         break;
701       case 'C':
702         DECLARE_PARAM_STR(raddr, strlen(raddr));
703         PROCESS_NEXT_FIELD(token,len);
704         DECLARE_PARAM_STR(token,len); /* timestamp */
705         d->whence = (time_t)strtoul(token, NULL, 10);
706         PROCESS_NEXT_FIELD(token, len);
707         /* uuid is last 36 bytes */
708         if(len > 36) { token += (len-36); len = 36; }
709         sid = uuid_to_sid(token, remote_cn);
710         if(sid == 0) goto bad_row;
711         DECLARE_PARAM_INT(sid); /* sid */
712         DECLARE_PARAM_STR(token,len); /* uuid */
713         PROCESS_NEXT_FIELD(token, len);
714         DECLARE_PARAM_STR(token,len); /* target */
715         PROCESS_NEXT_FIELD(token, len);
716         DECLARE_PARAM_STR(token,len); /* module */
717         PROCESS_LAST_FIELD(token, len);
718         DECLARE_PARAM_STR(token,len); /* name */
719         break;
720       case 'M':
721         PROCESS_NEXT_FIELD(token,len);
722         DECLARE_PARAM_STR(token,len); /* timestamp */
723         d->whence = (time_t)strtoul(token, NULL, 10);
724         PROCESS_NEXT_FIELD(token, len);
725         /* uuid is last 36 bytes */
726         if(len > 36) { token += (len-36); len = 36; }
727         sid = uuid_to_sid(token, remote_cn);
728         if(sid == 0) goto bad_row;
729         DECLARE_PARAM_INT(sid); /* sid */
730         PROCESS_NEXT_FIELD(token, len);
731         DECLARE_PARAM_STR(token,len); /* name */
732         PROCESS_NEXT_FIELD(token,len);
733         d->metric_type = *token;
734         PROCESS_LAST_FIELD(token,len);
735         DECLARE_PARAM_STR(token,len); /* value */
736         break;
737       case 'S':
738         PROCESS_NEXT_FIELD(token,len);
739         DECLARE_PARAM_STR(token,len); /* timestamp */
740         d->whence = (time_t)strtoul(token, NULL, 10);
741         PROCESS_NEXT_FIELD(token, len);
742         /* uuid is last 36 bytes */
743         if(len > 36) { token += (len-36); len = 36; }
744         sid = uuid_to_sid(token, remote_cn);
745         if(sid == 0) goto bad_row;
746         DECLARE_PARAM_INT(sid); /* sid */
747         PROCESS_NEXT_FIELD(token, len);
748         DECLARE_PARAM_STR(token,len); /* state */
749         PROCESS_NEXT_FIELD(token, len);
750         DECLARE_PARAM_STR(token,len); /* availability */
751         PROCESS_NEXT_FIELD(token, len);
752         DECLARE_PARAM_STR(token,len); /* duration */
753         PROCESS_LAST_FIELD(token,len);
754         DECLARE_PARAM_STR(token,len); /* status */
755         break;
756       default:
757         goto bad_row;
758     }
759
760   }
761
762   /* Now execute the query */
763   switch(type) {
764     case 'n':
765       GET_QUERY(config_insert);
766       PG_EXEC(config_insert);
767       PQclear(d->res);
768       break;
769     case 'C':
770       GET_QUERY(check_insert);
771       PG_TM_EXEC(check_insert, d->whence);
772       PQclear(d->res);
773       break;
774     case 'S':
775       GET_QUERY(status_insert);
776       PG_TM_EXEC(status_insert, d->whence);
777       PQclear(d->res);
778       break;
779     case 'D':
780       break;
781     case 'M':
782       switch(d->metric_type) {
783         case METRIC_INT32:
784         case METRIC_UINT32:
785         case METRIC_INT64:
786         case METRIC_UINT64:
787         case METRIC_DOUBLE:
788           GET_QUERY(metric_insert_numeric);
789           PG_TM_EXEC(metric_insert_numeric, d->whence);
790           PQclear(d->res);
791           break;
792         case METRIC_STRING:
793           GET_QUERY(metric_insert_text);
794           PG_TM_EXEC(metric_insert_text, d->whence);
795           PQclear(d->res);
796           break;
797         default:
798           goto bad_row;
799       }
800       break;
801     default:
802       /* should never get here */
803       goto bad_row;
804   }
805   return DS_EXEC_SUCCESS;
806  bad_row:
807   return DS_EXEC_ROW_FAILED;
808 }
809 static int
810 stratcon_database_post_connect(conn_q *cq) {
811   int rv = 0;
812   ds_single_detail _d = { 0 }, *d = &_d;
813   if(cq->fqdn) {
814     char *remote_str, *remote_cn;
815     /* This is the silly way we get null's in through our declare_param_str */
816     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
817     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
818     /* This is a storage node, it gets the storage node post_connect */
819     GET_QUERY(storage_post_connect);
820     rv = -1; /* now we're serious */
821     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
822     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
823     PG_EXEC(storage_post_connect);
824     PQclear(d->res);
825     rv = 0;
826   }
827   else {
828     /* Metanode post_connect */
829     GET_QUERY(metanode_post_connect);
830     rv = -1; /* now we're serious */
831     PG_EXEC(metanode_post_connect);
832     PQclear(d->res);
833     rv = 0;
834   }
835  bad_row:
836   free_params(d);
837   if(rv == -1) {
838     /* Post-connect intentions are serious and fatal */
839     PQfinish(cq->dbh);
840     cq->dbh = NULL;
841   }
842   return rv;
843 }
844 static int
845 stratcon_database_connect(conn_q *cq) {
846   char *dsn, dsn_meta[512];
847   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
848   const char *k, *v;
849   int klen;
850   noit_hash_table *t;
851
852   dsn_meta[0] = '\0';
853   if(!cq->dsn) {
854     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
855     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
856       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
857       strlcat(dsn_meta, k, sizeof(dsn_meta));
858       strlcat(dsn_meta, "=", sizeof(dsn_meta));
859       strlcat(dsn_meta, v, sizeof(dsn_meta));
860     }
861     noit_hash_destroy(t, free, free);
862     free(t);
863     dsn = dsn_meta;
864   }
865   else {
866     char options[32];
867     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
868     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
869                                options, sizeof(options))) {
870       strlcat(dsn_meta, " ", sizeof(dsn_meta));
871       strlcat(dsn_meta, "user", sizeof(dsn_meta));
872       strlcat(dsn_meta, "=", sizeof(dsn_meta));
873       strlcat(dsn_meta, options, sizeof(dsn_meta));
874     }
875     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
876                                options, sizeof(options))) {
877       strlcat(dsn_meta, " ", sizeof(dsn_meta));
878       strlcat(dsn_meta, "password", sizeof(dsn_meta));
879       strlcat(dsn_meta, "=", sizeof(dsn_meta));
880       strlcat(dsn_meta, options, sizeof(dsn_meta));
881     }
882     dsn = dsn_meta;
883   }
884
885   if(cq->dbh) {
886     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
887     PQreset(cq->dbh);
888     if(PQstatus(cq->dbh) != CONNECTION_OK) {
889       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
890             dsn, PQerrorMessage(cq->dbh));
891       return -1;
892     }
893     if(stratcon_database_post_connect(cq)) return -1;
894     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
895     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
896           dsn, PQerrorMessage(cq->dbh));
897     return -1;
898   }
899
900   cq->dbh = PQconnectdb(dsn);
901   if(!cq->dbh) return -1;
902   if(PQstatus(cq->dbh) != CONNECTION_OK) {
903     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
904           dsn, PQerrorMessage(cq->dbh));
905     return -1;
906   }
907   if(stratcon_database_post_connect(cq)) return -1;
908   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
909   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
910         dsn, PQerrorMessage(cq->dbh));
911   return -1;
912 }
913 static int
914 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
915                              const char *name) {
916   int rv = -1;
917   PGresult *res;
918   char cmd[128];
919   strlcpy(cmd, p, sizeof(cmd));
920   strlcat(cmd, name, sizeof(cmd));
921   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
922   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
923   PQclear(res);
924   return rv;
925 }
926 static int
927 stratcon_ingest_do(conn_q *cq, const char *cmd) {
928   PGresult *res;
929   int rv = -1;
930   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
931   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
932   PQclear(res);
933   return rv;
934 }
935 #define BUSTED(cq) do { \
936   PQfinish((cq)->dbh); \
937   (cq)->dbh = NULL; \
938   goto full_monty; \
939 } while(0)
940 #define SAVEPOINT(name) do { \
941   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
942   last_sp = current; \
943 } while(0)
944 #define ROLLBACK_TO_SAVEPOINT(name) do { \
945   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
946     BUSTED(cq); \
947   last_sp = NULL; \
948 } while(0)
949 #define RELEASE_SAVEPOINT(name) do { \
950   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
951     BUSTED(cq); \
952   last_sp = NULL; \
953 } while(0)
954
955 int
956 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
957                               struct timeval *now) {
958   ds_rt_detail *dsjd = closure;
959   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
960   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
961
962   assert(dsjd->rt);
963   stratcon_ingest_find(dsjd);
964   if(dsjd->completion_event)
965     eventer_add(dsjd->completion_event);
966
967   free_params((ds_single_detail *)dsjd);
968   free(dsjd);
969   return 0;
970 }
971 static void
972 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
973                                 eventer_t completion) {
974   eventer_t e;
975   conn_pool *cpool;
976   ds_rt_detail *rtdetail;
977
978   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
979   rtdetail = calloc(1, sizeof(*rtdetail));
980   rtdetail->rt = rt;
981   rtdetail->completion_event = completion;
982   e = eventer_alloc();
983   e->mask = EVENTER_ASYNCH;
984   e->callback = stratcon_ingest_asynch_lookup;
985   e->closure = rtdetail;
986   eventer_add_asynch(cpool->jobq, e);
987 }
988 static const char *
989 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
990   void *vinfo;
991   char *dsn = NULL, *fqdn = NULL;
992   int found = 0;
993   storagenode_info *info = NULL;
994   pthread_mutex_lock(&storagenode_to_info_cache_lock);
995   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
996                         &vinfo)) {
997     found = 1;
998     info = vinfo;
999   }
1000   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1001   if(found) {
1002     if(fqdn_out) *fqdn_out = info->fqdn;
1003     return info->dsn;
1004   }
1005
1006   if(!found && can_use_db) {
1007     ds_single_detail *d;
1008     conn_q *cq;
1009     int row_count;
1010     /* Look it up and store it */
1011     d = calloc(1, sizeof(*d));
1012     cq = get_conn_q_for_metanode();
1013     GET_QUERY(find_storage);
1014     DECLARE_PARAM_INT(id);
1015     PG_EXEC(find_storage);
1016     row_count = PQntuples(d->res);
1017     if(row_count) {
1018       PG_GET_STR_COL(dsn, 0, "dsn");
1019       PG_GET_STR_COL(fqdn, 0, "fqdn");
1020       fqdn = fqdn ? strdup(fqdn) : NULL;
1021       dsn = dsn ? strdup(dsn) : NULL;
1022     }
1023     PQclear(d->res);
1024    bad_row:
1025     free_params(d);
1026     free(d);
1027     release_conn_q(cq);
1028   }
1029   if(fqdn) {
1030     info = calloc(1, sizeof(*info));
1031     info->fqdn = fqdn;
1032     if(fqdn_out) *fqdn_out = info->fqdn;
1033     info->dsn = dsn;
1034     info->storagenode_id = id;
1035     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1036     noit_hash_store(&storagenode_to_info_cache,
1037                     (void *)&info->storagenode_id, sizeof(int), info);
1038     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1039   }
1040   return info ? info->dsn : NULL;
1041 }
1042 static void
1043 expand_b_record(ds_line_detail **head, ds_line_detail **last,
1044                 const char *line, int len) {
1045   char **outrows;
1046   int i, cnt;
1047   ds_line_detail *next;
1048
1049   cnt = noit_check_log_b_to_sm(line, len, &outrows);
1050   for(i=0;i<cnt;i++) {
1051     if(outrows[i] == NULL) continue;
1052     next = calloc(sizeof(*next), 1);
1053     next->data = outrows[i];
1054     if(!*head) *head = next;
1055     if(*last) (*last)->next = next;
1056     *last = next;
1057   }
1058   if(outrows) free(outrows);
1059 }
1060 static ds_line_detail *
1061 build_insert_batch(pg_interim_journal_t *ij) {
1062   int rv;
1063   off_t len;
1064   const char *buff, *cp, *lcp;
1065   struct stat st;
1066   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1067
1068   if(ij->fd < 0) {
1069     ij->fd = open(ij->filename, O_RDONLY);
1070     if(ij->fd < 0) {
1071       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1072             ij->filename, strerror(errno));
1073       assert(ij->fd >= 0);
1074     }
1075   }
1076   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1077   if(rv == -1) {
1078       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1079             ij->filename, strerror(errno));
1080     assert(rv != -1);
1081   }
1082   len = st.st_size;
1083   if(len > 0) {
1084     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1085     if(buff == (void *)-1) {
1086       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1087             ij->filename, strerror(errno));
1088       assert(buff != (void *)-1);
1089     }
1090     lcp = buff;
1091     while(lcp < (buff + len) &&
1092           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1093       if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') {
1094       /* Bundle records are special and need to be expanded into
1095        * traditional records here
1096        */
1097         noit_compression_type_t ctype = NOIT_COMPRESS_NONE;
1098         switch(lcp[1]) {
1099           case '1': /* version 1 */
1100             ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */
1101           case '2': /* version 2 */
1102               expand_b_record(&head, &last, lcp, cp - lcp);
1103             break;
1104           default:
1105             noitL(noit_error, "unknown bundle version %c\n", lcp[1]);
1106         }
1107       }
1108       else {
1109         next = calloc(1, sizeof(*next));
1110         next->data = malloc(cp - lcp + 1);
1111         memcpy(next->data, lcp, cp - lcp);
1112         next->data[cp - lcp] = '\0';
1113         if(!head) head = next;
1114         if(last) last->next = next;
1115         last = next;
1116       }
1117       lcp = cp + 1;
1118     }
1119     munmap((void *)buff, len);
1120   }
1121   close(ij->fd);
1122   return head;
1123 }
1124 static void
1125 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1126   unlink(ij->filename);
1127   if(ij->filename) free(ij->filename);
1128   if(ij->remote_str) free(ij->remote_str);
1129   if(ij->remote_cn) free(ij->remote_cn);
1130   if(ij->fqdn) free(ij->fqdn);
1131   free(ij);
1132 }
1133 static int
1134 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1135                                struct timeval *now) {
1136   int i, total, success, sp_total, sp_success;
1137   pg_interim_journal_t *ij;
1138   ds_line_detail *head = NULL, *current, *last_sp;
1139   const char *dsn;
1140   conn_q *cq;
1141   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1142   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1143
1144   ij = closure;
1145   if(ij->fqdn == NULL) {
1146     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1147     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1148   }
1149   else {
1150     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1151   }
1152   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1153                              ij->fqdn, dsn);
1154   noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1155         ij->remote_str, ij->remote_cn, ij->fqdn);
1156  full_monty:
1157   /* Make sure we have a connection */
1158   i = 1;
1159   while(stratcon_database_connect(cq)) {
1160     noitL(noit_error, "Error connecting to database: %s\n",
1161           ij->fqdn ? ij->fqdn : "(null)");
1162     sleep(i);
1163     i *= 2;
1164     i = MIN(i, 16);
1165   }
1166
1167   if(head == NULL) head = build_insert_batch(ij);
1168   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1169         ij->remote_str ? ij->remote_str : "(null)",
1170         ij->remote_cn ? ij->remote_cn : "(null)",
1171         ij->fqdn ? ij->fqdn : "(null)");
1172   current = head;
1173   last_sp = NULL;
1174   total = success = sp_total = sp_success = 0;
1175   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1176   while(current) {
1177     execute_outcome_t rv;
1178     if(current->data) {
1179       if(!last_sp) {
1180         SAVEPOINT("batch");
1181         sp_success = success;
1182         sp_total = total;
1183       }
1184  
1185       if(current->problematic) {
1186         RELEASE_SAVEPOINT("batch");
1187         current = current->next;
1188         total++;
1189         continue;
1190       }
1191       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1192                                    current);
1193       switch(rv) {
1194         case DS_EXEC_SUCCESS:
1195           total++;
1196           success++;
1197           current = current->next;
1198           break;
1199         case DS_EXEC_ROW_FAILED:
1200           /* rollback to savepoint, mark this record as bad and start again */
1201           if(current->data[0] != 'n')
1202             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1203           current->problematic = 1;
1204           current = last_sp;
1205           success = sp_success;
1206           total = sp_total;
1207           ROLLBACK_TO_SAVEPOINT("batch");
1208           break;
1209         case DS_EXEC_TXN_FAILED:
1210           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1211           BUSTED(cq);
1212       }
1213     }
1214   }
1215   if(last_sp) RELEASE_SAVEPOINT("batch");
1216   if(stratcon_ingest_do(cq, "COMMIT")) {
1217     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1218     BUSTED(cq);
1219   }
1220   /* Cleanup the mess */
1221   while(head) {
1222     ds_line_detail *tofree;
1223     tofree = head;
1224     head = head->next;
1225     if(tofree->data) free(tofree->data);
1226     free_params((ds_single_detail *)tofree);
1227     free(tofree);
1228   }
1229   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1230         ij->remote_str ? ij->remote_str : "(null)",
1231         ij->remote_cn ? ij->remote_cn : "(null)",
1232         ij->fqdn ? ij->fqdn : "(null)", success, total);
1233   pg_interim_journal_remove(ij);
1234   release_conn_q(cq);
1235   return 0;
1236 }
1237 static int
1238 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1239                           int *sid_out, int *storagenode_id_out,
1240                           const char **remote_cn_out,
1241                           const char **fqdn_out, const char **dsn_out) {
1242   /* only called from the main thread -- no safety issues */
1243   void *vuuidinfo, *vinfo;
1244   uuid_info *uuidinfo;
1245   storagenode_info *info = NULL;
1246   char *fqdn = NULL;
1247   char *dsn = NULL;
1248   char *new_remote_cn = NULL;
1249   int storagenode_id = 0, sid = 0;
1250   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1251                          &vuuidinfo)) {
1252     int row_count = 0;
1253     char *tmpint;
1254     ds_single_detail *d;
1255     conn_q *cq;
1256
1257     /* We can't do a database lookup without the remote_cn */
1258     if(!remote_cn) {
1259       if(stratcon_datastore_get_enabled()) {
1260         /* We have an authoritatively maintained cache, we don't do lookups */
1261         return -1;
1262       }
1263       else
1264         remote_cn = "[[null]]";
1265     }
1266
1267     d = calloc(1, sizeof(*d));
1268     cq = get_conn_q_for_metanode();
1269     if(stratcon_database_connect(cq) == 0) {
1270       /* Blocking call to service the cache miss */
1271       GET_QUERY(check_map);
1272       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1273       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1274       PG_EXEC(check_map);
1275       row_count = PQntuples(d->res);
1276       if(row_count != 1) {
1277         PQclear(d->res);
1278         goto bad_row;
1279       }
1280       PG_GET_STR_COL(tmpint, 0, "sid");
1281       if(!tmpint) {
1282         row_count = 0;
1283         PQclear(d->res);
1284         goto bad_row;
1285       }
1286       sid = atoi(tmpint);
1287       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1288       if(tmpint) storagenode_id = atoi(tmpint);
1289       PG_GET_STR_COL(fqdn, 0, "fqdn");
1290       PG_GET_STR_COL(dsn, 0, "dsn");
1291       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1292       fqdn = fqdn ? strdup(fqdn) : NULL;
1293       dsn = dsn ? strdup(dsn) : NULL;
1294       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1295       PQclear(d->res);
1296     }
1297    bad_row:
1298     free_params((ds_single_detail *)d);
1299     free(d);
1300     release_conn_q(cq);
1301     if(row_count != 1) {
1302       return -1;
1303     }
1304     /* Place in cache */
1305     uuidinfo = calloc(1, sizeof(*uuidinfo));
1306     uuidinfo->sid = sid;
1307     uuidinfo->uuid_str = strdup(uuid_str);
1308     uuidinfo->storagenode_id = storagenode_id;
1309     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1310     noit_hash_store(&uuid_to_info_cache,
1311                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1312     /* Also, we may have just witnessed a new storage node, store it */
1313     if(storagenode_id) {
1314       int needs_free = 0;
1315       info = calloc(1, sizeof(*info));
1316       info->storagenode_id = storagenode_id;
1317       info->dsn = dsn ? strdup(dsn) : NULL;
1318       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1319       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1320       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1321                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1322         /* hack to save memory -- we *never* remove from these caches,
1323            so we can use the same fqdn value in the above cache for the key
1324            in the cache below -- (no strdup) */
1325         noit_hash_store(&storagenode_to_info_cache,
1326                         (void *)&info->storagenode_id, sizeof(int), info);
1327       }
1328       else needs_free = 1;
1329       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1330       if(needs_free) {
1331         if(info->dsn) free(info->dsn);
1332         if(info->fqdn) free(info->fqdn);
1333         free(info);
1334       }
1335     }
1336   }
1337   else
1338     uuidinfo = vuuidinfo;
1339
1340   if(uuidinfo && uuidinfo->storagenode_id) {
1341     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1342       /* we don't have dsn and we actually want it */
1343       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1344       if(noit_hash_retrieve(&storagenode_to_info_cache,
1345                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1346                             &vinfo))
1347         info = vinfo;
1348       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1349     }
1350   }
1351
1352   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1353   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1354   assert(uuidinfo);
1355   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1356   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1357   if(sid_out) *sid_out = uuidinfo->sid;
1358   if(fqdn) free(fqdn);
1359   if(dsn) free(dsn);
1360   if(new_remote_cn) free(new_remote_cn);
1361   return 0;
1362 }
1363 static int
1364 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1365   char uuid_str[UUID_STR_LEN+1];
1366   int sid = 0;
1367   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1368   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1369   return sid;
1370 }
1371
1372 static int
1373 stratcon_ingest_saveconfig() {
1374   int rv = -1;
1375   char *buff;
1376   ds_single_detail _d = { 0 }, *d = &_d;
1377   conn_q *cq;
1378   char ipv4_str[32];
1379   struct in_addr r, l;
1380
1381   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1382   memset(&l, 0, sizeof(l));
1383   noit_getip_ipv4(r, &l);
1384   /* Ignore the error.. what are we going to do anyway */
1385   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1386     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1387
1388   cq = get_conn_q_for_metanode();
1389
1390   if(stratcon_database_connect(cq) == 0) {
1391     char time_as_str[20];
1392     size_t len;
1393     buff = noit_conf_xml_in_mem(&len);
1394     if(!buff) goto bad_row;
1395
1396     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1397     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1398     DECLARE_PARAM_STR("", 0);
1399     DECLARE_PARAM_STR("stratcond", 9);
1400     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1401     DECLARE_PARAM_STR(buff, len);
1402     free(buff);
1403
1404     GET_QUERY(config_insert);
1405     PG_EXEC(config_insert);
1406     PQclear(d->res);
1407     rv = 0;
1408
1409     bad_row:
1410       free_params(d);
1411   }
1412   release_conn_q(cq);
1413   return rv;
1414 }
1415
1416 static int
1417 stratcon_ingest_launch_file_ingestion(const char *path,
1418                                       const char *remote_str,
1419                                       const char *remote_cn,
1420                                       const char *id_str) {
1421   pg_interim_journal_t *ij;
1422   char pgfile[PATH_MAX];
1423   eventer_t ingest;
1424
1425   if(strcmp(path + strlen(path) - 3, ".pg")) {
1426     snprintf(pgfile, sizeof(pgfile), "%s.pg", path);
1427     if(link(path, pgfile) < 0 && errno != EEXIST) {
1428       noitL(noit_error, "cannot link journal %s: %s\n", path, strerror(errno));
1429       free(ij);
1430       return -1;
1431     }
1432   }
1433   else
1434     strlcpy(pgfile, path, sizeof(pgfile));
1435   ij = calloc(1, sizeof(*ij));
1436   ij->fd = open(pgfile, O_RDONLY);
1437   if(ij->fd < 0) {
1438     noitL(noit_error, "cannot open journal '%s': %s\n",
1439           pgfile, strerror(errno));
1440     free(ij);
1441     return -1;
1442   }
1443   close(ij->fd);
1444   ij->fd = -1;
1445   ij->filename = strdup(pgfile);
1446   ij->remote_str = strdup(remote_str);
1447   ij->remote_cn = strdup(remote_cn);
1448   ij->storagenode_id = atoi(id_str);
1449   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1450                                        ij->fqdn);
1451   noitL(noit_debug, "ingesting payload: %s\n", ij->filename);
1452   ingest = eventer_alloc();
1453   ingest->mask = EVENTER_ASYNCH;
1454   ingest->callback = stratcon_ingest_asynch_execute;
1455   ingest->closure = ij;
1456   eventer_add_asynch(ij->cpool->jobq, ingest);
1457   return 0;
1458 }
1459
1460 int
1461 stratcon_ingest_all_storagenode_info() {
1462   int i, cnt = 0;
1463   ds_single_detail _d = { 0 }, *d = &_d;
1464   conn_q *cq;
1465   cq = get_conn_q_for_metanode();
1466
1467   while(stratcon_database_connect(cq)) {
1468     noitL(noit_error, "Error connecting to database\n");
1469     sleep(1);
1470   }
1471
1472   GET_QUERY(all_storage);
1473   PG_EXEC(all_storage);
1474   cnt = PQntuples(d->res);
1475   for(i=0; i<cnt; i++) {
1476     void *vinfo;
1477     char *tmpint, *fqdn, *dsn;
1478     int storagenode_id;
1479     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1480     storagenode_id = atoi(tmpint);
1481     PG_GET_STR_COL(fqdn, i, "fqdn");
1482     PG_GET_STR_COL(dsn, i, "dsn");
1483     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1484     storagenode_id = tmpint ? atoi(tmpint) : 0;
1485
1486     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1487                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1488       storagenode_info *info;
1489       info = calloc(1, sizeof(*info));
1490       info->storagenode_id = storagenode_id;
1491       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1492       info->dsn = dsn ? strdup(dsn) : NULL;
1493       noit_hash_store(&storagenode_to_info_cache,
1494                       (void *)&info->storagenode_id, sizeof(int), info);
1495       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1496             info->storagenode_id,
1497             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1498     }
1499     noit_watchdog_child_heartbeat();
1500   }
1501   PQclear(d->res);
1502  bad_row:
1503   free_params(d);
1504
1505   release_conn_q(cq);
1506   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1507   return cnt;
1508 }
1509 int
1510 stratcon_ingest_all_check_info() {
1511   int i, cnt, loaded = 0;
1512   ds_single_detail _d = { 0 }, *d = &_d;
1513   conn_q *cq;
1514   cq = get_conn_q_for_metanode();
1515
1516   while(stratcon_database_connect(cq)) {
1517     noitL(noit_error, "Error connecting to database\n");
1518     sleep(1);
1519   }
1520
1521   GET_QUERY(check_mapall);
1522   PG_EXEC(check_mapall);
1523   cnt = PQntuples(d->res);
1524   for(i=0; i<cnt; i++) {
1525     void *vinfo;
1526     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1527     int sid, storagenode_id;
1528     uuid_info *uuidinfo;
1529     PG_GET_STR_COL(uuid_str, i, "id");
1530     if(!uuid_str) continue;
1531     PG_GET_STR_COL(tmpint, i, "sid");
1532     if(!tmpint) continue;
1533     sid = atoi(tmpint);
1534     PG_GET_STR_COL(fqdn, i, "fqdn");
1535     PG_GET_STR_COL(dsn, i, "dsn");
1536     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1537     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1538     storagenode_id = tmpint ? atoi(tmpint) : 0;
1539
1540     uuidinfo = calloc(1, sizeof(*uuidinfo));
1541     uuidinfo->uuid_str = strdup(uuid_str);
1542     uuidinfo->remote_cn = strdup(remote_cn);
1543     uuidinfo->storagenode_id = storagenode_id;
1544     uuidinfo->sid = sid;
1545     noit_hash_store(&uuid_to_info_cache,
1546                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1547     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1548           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1549     loaded++;
1550     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1551                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1552       storagenode_info *info;
1553       info = calloc(1, sizeof(*info));
1554       info->storagenode_id = storagenode_id;
1555       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1556       info->dsn = dsn ? strdup(dsn) : NULL;
1557       noit_hash_store(&storagenode_to_info_cache,
1558                       (void *)&info->storagenode_id, sizeof(int), info);
1559       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1560             info->storagenode_id,
1561             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1562     }
1563     noit_watchdog_child_heartbeat();
1564   }
1565   PQclear(d->res);
1566  bad_row:
1567   free_params(d);
1568
1569   release_conn_q(cq);
1570   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1571   return loaded;
1572 }
1573
1574 static char *
1575 stratcon_get_noit_config(const char *cn) {
1576   ds_single_detail *d;
1577   int row_count = 0;
1578   const char *xml = NULL;
1579   char *xmlcopy = NULL;
1580   conn_q *cq = NULL;
1581
1582   d = calloc(1, sizeof(*d));
1583   GET_QUERY(config_get);
1584   cq = get_conn_q_for_metanode();
1585   if(!cq) goto bad_row;
1586
1587   DECLARE_PARAM_STR(cn, cn ? strlen(cn) : 0);
1588   PG_EXEC(config_get);
1589   row_count = PQntuples(d->res);
1590   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1591
1592   if(xml) xmlcopy = strdup(xml);
1593
1594  bad_row:
1595   free_params((ds_single_detail *)d);
1596   d->nparams = 0;
1597   if(cq) release_conn_q(cq);
1598
1599   return xmlcopy;
1600 }
1601
1602 static ingestor_api_t postgres_ingestor_api = {
1603   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1604   .iep_check_preload = stratcon_ingest_iep_check_preload,
1605   .storage_node_lookup = storage_node_quick_lookup,
1606   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1607   .get_noit_config = stratcon_get_noit_config,
1608   .save_config = stratcon_ingest_saveconfig
1609 };
1610
1611 static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
1612   return 0;
1613 }
1614 static int postgres_ingestor_onload(noit_image_t *self) {
1615   return 0;
1616 }
1617 static int is_postgres_ingestor_file(const char *file) {
1618   noit_watchdog_child_heartbeat();
1619   return (strlen(file) == 19 && !strcmp(file + 16, ".pg"));
1620 }
1621 static int postgres_ingestor_init(noit_module_generic_t *self) {
1622   stratcon_datastore_core_init();
1623   pthread_mutex_init(&ds_conns_lock, NULL);
1624   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1625   ds_err = noit_log_stream_find("error/datastore");
1626   ds_deb = noit_log_stream_find("debug/datastore");
1627   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1628   ingest_err = noit_log_stream_find("error/ingest");
1629   if(!ds_err) ds_err = noit_error;
1630   if(!ingest_err) ingest_err = noit_error;
1631   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1632                            &basejpath)) {
1633     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1634     exit(-1);
1635   }
1636   stratcon_ingest_all_check_info();
1637   stratcon_ingest_all_storagenode_info();
1638   stratcon_ingest_sweep_journals(is_postgres_ingestor_file,
1639                                  stratcon_ingest_launch_file_ingestion);
1640   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1641 }
1642
1643 noit_module_generic_t postgres_ingestor = {
1644   {
1645     NOIT_GENERIC_MAGIC,
1646     NOIT_GENERIC_ABI_VERSION,
1647     "postgres_ingestor",
1648     "postgres drive for data ingestion",
1649     postgres_ingestor_xml_description,
1650     postgres_ingestor_onload,
1651   }, 
1652   postgres_ingestor_config,
1653   postgres_ingestor_init
1654 };
Note: See TracBrowser for help on using the browser.