root/src/modules/postgres_ingestor.c

Revision c57081bfe551401db8874b81cce5b1f06b7dbdfd, 51.9 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 2 months ago)

Add Some mtev_hash_inits That Were Missing

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <unistd.h>
37 #include <fcntl.h>
38 #include <netinet/in.h>
39 #include <sys/un.h>
40 #include <dirent.h>
41 #include <arpa/inet.h>
42 #include <sys/mman.h>
43 #include <libpq-fe.h>
44 #include <zlib.h>
45 #include <errno.h>
46
47 #include <eventer/eventer.h>
48 #include <mtev_log.h>
49 #include <mtev_b64.h>
50 #include <mtev_str.h>
51 #include <mtev_mkdir.h>
52 #include <mtev_getip.h>
53 #include <mtev_watchdog.h>
54 #include <mtev_conf.h>
55 #include <mtev_rest.h>
56
57 #include "noit_mtev_bridge.h"
58 #include "noit_module.h"
59 #include "stratcon_datastore.h"
60 #include "stratcon_ingest.h"
61 #include "stratcon_realtime_http.h"
62 #include "stratcon_iep.h"
63 #include "noit_check.h"
64 #include "noit_check_log_helpers.h"
65 #include "bundle.pb-c.h"
66
67 #include "postgres_ingestor.xmlh"
68
69 #define DECL_STMT(codename,confname) \
70 static char *codename = NULL; \
71 static const char *codename##_conf = "/stratcon/database/statements/" #confname
72
73 DECL_STMT(storage_post_connect, storagepostconnect);
74 DECL_STMT(metanode_post_connect, metanodepostconnect);
75 DECL_STMT(find_storage, findstoragenode);
76 DECL_STMT(all_storage, allstoragenodes);
77 DECL_STMT(check_map, mapchecktostoragenode);
78 DECL_STMT(check_mapall, mapallchecks);
79 DECL_STMT(check_loadall, allchecks);
80 DECL_STMT(check_find, findcheck);
81 DECL_STMT(check_insert, check);
82 DECL_STMT(status_insert, status);
83 DECL_STMT(metric_insert_numeric, metric_numeric);
84 DECL_STMT(metric_insert_text, metric_text);
85 DECL_STMT(config_insert, config);
86 DECL_STMT(config_get, findconfig);
87
88 static mtev_log_stream_t ds_err = NULL;
89 static mtev_log_stream_t ds_deb = NULL;
90 static mtev_log_stream_t ds_pool_deb = NULL;
91 static mtev_log_stream_t ingest_err = NULL;
92
93 #define GET_QUERY(a) do { \
94   if(a == NULL) \
95     if(!mtev_conf_get_string(NULL, a ## _conf, &(a))) \
96       goto bad_row; \
97 } while(0)
98
99 struct conn_q;
100
101 typedef struct {
102   char            *queue_name; /* the key fqdn+remote_sn */
103   eventer_jobq_t  *jobq;
104   struct conn_q   *head;
105   pthread_mutex_t  lock;
106   pthread_cond_t   cv;
107   int              ttl;
108   int              in_pool;
109   int              outstanding;
110   int              max_allocated;
111   int              max_in_pool;
112 } conn_pool;
113 typedef struct conn_q {
114   time_t           last_use;
115   char            *dsn;        /* Pg connect string */
116   char            *remote_str; /* the IP of the noit*/
117   char            *remote_cn;  /* the Cert CN of the noit */
118   char            *fqdn;       /* the fqdn of the storage node */
119   conn_pool       *pool;
120   struct conn_q   *next;
121   /* Postgres specific stuff */
122   PGconn          *dbh;
123 } conn_q;
124
125
126 #define MAX_PARAMS 8
127 #define POSTGRES_PARTS \
128   PGresult *res; \
129   int rv; \
130   time_t whence; \
131   int nparams; \
132   int metric_type; \
133   char *paramValues[MAX_PARAMS]; \
134   int paramLengths[MAX_PARAMS]; \
135   int paramFormats[MAX_PARAMS]; \
136   int paramAllocd[MAX_PARAMS];
137
138 typedef struct ds_single_detail {
139   POSTGRES_PARTS
140 } ds_single_detail;
141 typedef struct {
142   /* Postgres specific stuff */
143   POSTGRES_PARTS
144   struct realtime_tracker *rt;
145   conn_q *cq; /* connection on which to perform this job */
146   eventer_t completion_event; /* This event should be registered if non NULL */
147 } ds_rt_detail;
148 typedef struct ds_line_detail {
149   /* Postgres specific stuff */
150   POSTGRES_PARTS
151   char *data;
152   int problematic;
153   struct ds_line_detail *next;
154 } ds_line_detail;
155
156 typedef struct {
157   char *remote_str;
158   char *remote_cn;
159   char *fqdn;
160   int storagenode_id;
161   int fd;
162   char *filename;
163   conn_pool *cpool;
164 } pg_interim_journal_t;
165
166 static int stratcon_database_connect(conn_q *cq);
167 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
168 static int storage_node_quick_lookup(const char *uuid_str,
169                                      const char *remote_cn,
170                                      int *sid_out, int *storagenode_id_out,
171                                      const char **remote_cn_out,
172                                      const char **fqdn_out,
173                                      const char **dsn_out);
174
175 static void
176 free_params(ds_single_detail *d) {
177   int i;
178   for(i=0; i<d->nparams; i++)
179     if(d->paramAllocd[i] && d->paramValues[i])
180       free(d->paramValues[i]);
181 }
182
183 static char *basejpath = NULL;
184 static pthread_mutex_t ds_conns_lock;
185 static mtev_hash_table ds_conns;
186 static mtev_hash_table uuid_to_info_cache;
187 static pthread_mutex_t storagenode_to_info_cache_lock;
188 static mtev_hash_table storagenode_to_info_cache;
189
190 /* the fqdn cache needs to be thread safe */
191 typedef struct {
192   char *uuid_str;
193   char *remote_cn;
194   int storagenode_id;
195   int sid;
196 } uuid_info;
197 typedef struct {
198   int storagenode_id;
199   char *fqdn;
200   char *dsn;
201 } storagenode_info;
202
203 /* Thread-safe connection pools */
204
205 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
206 static void
207 release_conn_q_forceable(conn_q *cq, int forcefree) {
208   int putback = 0;
209   cq->last_use = time(NULL);
210   pthread_mutex_lock(&cq->pool->lock);
211   cq->pool->outstanding--;
212   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
213     putback = 1;
214     cq->next = cq->pool->head;
215     cq->pool->head = cq;
216     cq->pool->in_pool++;
217   }
218   pthread_mutex_unlock(&cq->pool->lock);
219   mtevL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)(vpsized_int)pthread_self(),
220         putback ? "to pool" : "and destroy", cq->pool->queue_name);
221   pthread_cond_signal(&cq->pool->cv);
222   if(putback) return;
223
224   /* Not put back, release it */
225   if(cq->dbh) PQfinish(cq->dbh);
226   if(cq->remote_str) free(cq->remote_str);
227   if(cq->remote_cn) free(cq->remote_cn);
228   if(cq->fqdn) free(cq->fqdn);
229   if(cq->dsn) free(cq->dsn);
230   free(cq);
231 }
232 static void
233 ttl_purge_conn_pool(conn_pool *pool) {
234   int old_cnt, new_cnt;
235   time_t now = time(NULL);
236   conn_q *cq, *prev = NULL, *iter;
237   /* because we always replace on the head and update the last_use time when
238      doing so, we know they are ordered LRU on the end.  So, once we hit an
239      old one, we know all the others are old too.
240    */
241   if(!pool->head) return; /* hack short circuit for no locks */
242   pthread_mutex_lock(&pool->lock);
243   old_cnt = pool->in_pool;
244   cq = pool->head;
245   while(cq) {
246     if(cq->last_use + cq->pool->ttl < now) {
247       if(prev) prev->next = NULL;
248       else pool->head = NULL;
249       break;
250     }
251     prev = cq;
252     cq = cq->next;
253   }
254   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
255   /* Fix accounting */
256   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
257   new_cnt = pool->in_pool;
258   pthread_mutex_unlock(&pool->lock);
259
260   /* Force release these without holding the lock */
261   while(cq) {
262     prev = cq;
263     cq = cq->next;
264     release_conn_q_forceable(prev, 1);
265   }
266   if(old_cnt != new_cnt)
267     mtevL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
268           pool->queue_name);
269 }
270 static void
271 release_conn_q(conn_q *cq) {
272   ttl_purge_conn_pool(cq->pool);
273   release_conn_q_forceable(cq, 0);
274 }
275 static conn_pool *
276 get_conn_pool_for_remote(const char *remote_str,
277                          const char *remote_cn, const char *fqdn) {
278   void *vcpool;
279   conn_pool *cpool = NULL;
280   char queue_name[256] = "datastore_";
281   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
282            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
283            fqdn ? fqdn : "default",
284            remote_cn ? remote_cn : "default");
285   pthread_mutex_lock(&ds_conns_lock);
286   if(mtev_hash_retrieve(&ds_conns, (const char *)queue_name,
287                         strlen(queue_name), &vcpool))
288     cpool = vcpool;
289   pthread_mutex_unlock(&ds_conns_lock);
290   if(!cpool) {
291     vcpool = cpool = calloc(1, sizeof(*cpool));
292     cpool->queue_name = strdup(queue_name);
293     pthread_mutex_init(&cpool->lock, NULL);
294     pthread_cond_init(&cpool->cv, NULL);
295     cpool->in_pool = 0;
296     cpool->outstanding = 0;
297     cpool->max_in_pool = 1;
298     cpool->max_allocated = 1;
299     pthread_mutex_lock(&ds_conns_lock);
300     if(!mtev_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
301                         cpool)) {
302       mtev_hash_retrieve(&ds_conns, (const char *)queue_name,
303                          strlen(queue_name), &vcpool);
304     }
305     pthread_mutex_unlock(&ds_conns_lock);
306     if(vcpool != cpool) {
307       /* someone beat us to it */
308       free(cpool->queue_name);
309       pthread_mutex_destroy(&cpool->lock);
310       pthread_cond_destroy(&cpool->cv);
311       free(cpool);
312     }
313     else {
314       int i;
315       /* Our job to setup the pool */
316       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
317       eventer_jobq_init(cpool->jobq, queue_name);
318       /* Add one thread */
319       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
320         eventer_jobq_increase_concurrency(cpool->jobq);
321     }
322     cpool = vcpool;
323   }
324   return cpool;
325 }
326 static conn_q *
327 get_conn_q_for_remote(const char *remote_str,
328                       const char *remote_cn, const char *fqdn,
329                       const char *dsn) {
330   conn_pool *cpool;
331   conn_q *cq;
332   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
333   mtevL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)(vpsized_int)pthread_self(),
334         cpool->queue_name);
335   pthread_mutex_lock(&cpool->lock);
336  again:
337   if(cpool->head) {
338     mtevAssert(cpool->in_pool > 0);
339     cq = cpool->head;
340     cpool->head = cq->next;
341     cpool->in_pool--;
342     cpool->outstanding++;
343     cq->next = NULL;
344     pthread_mutex_unlock(&cpool->lock);
345     return cq;
346   }
347   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
348     mtevL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
349           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
350     pthread_cond_wait(&cpool->cv, &cpool->lock);
351     mtevL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
352           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
353     goto again;
354   }
355   else {
356     cpool->outstanding++;
357     pthread_mutex_unlock(&cpool->lock);
358   }
359  
360   cq = calloc(1, sizeof(*cq));
361   cq->pool = cpool;
362   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
363   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
364   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
365   cq->dsn = dsn ? strdup(dsn) : NULL;
366   return cq;
367 }
368 static conn_q *
369 get_conn_q_for_metanode() {
370   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
371 }
372
373 typedef enum {
374   DS_EXEC_SUCCESS = 0,
375   DS_EXEC_ROW_FAILED = 1,
376   DS_EXEC_TXN_FAILED = 2,
377 } execute_outcome_t;
378
379 #define DECLARE_PARAM_STR(str, len) do { \
380   d->paramValues[d->nparams] = mtev__strndup(str, len); \
381   d->paramLengths[d->nparams] = len; \
382   d->paramFormats[d->nparams] = 0; \
383   d->paramAllocd[d->nparams] = 1; \
384   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
385     free(d->paramValues[d->nparams]); \
386     d->paramValues[d->nparams] = NULL; \
387     d->paramLengths[d->nparams] = 0; \
388     d->paramAllocd[d->nparams] = 0; \
389   } \
390   d->nparams++; \
391 } while(0)
392 #define DECLARE_PARAM_INT(i) do { \
393   int buffer__len; \
394   char buffer__[32]; \
395   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
396   buffer__len = strlen(buffer__); \
397   DECLARE_PARAM_STR(buffer__, buffer__len); \
398 } while(0)
399
400 #define PG_GET_STR_COL(dest, row, name) do { \
401   int colnum = PQfnumber(d->res, name); \
402   dest = NULL; \
403   if (colnum >= 0) \
404     dest = PQgetisnull(d->res, row, colnum) \
405          ? NULL : PQgetvalue(d->res, row, colnum); \
406 } while(0)
407
408 #define PG_EXEC(cmd) do { \
409   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
410                         (const char * const *)d->paramValues, \
411                         d->paramLengths, d->paramFormats, 0); \
412   d->rv = PQresultStatus(d->res); \
413   if(d->rv != PGRES_COMMAND_OK && \
414      d->rv != PGRES_TUPLES_OK) { \
415     const char *pgerr = PQresultErrorMessage(d->res); \
416     const char *pgerr_end = strchr(pgerr, '\n'); \
417     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
418     mtevL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
419           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
420           (int)(pgerr_end - pgerr), pgerr); \
421     PQclear(d->res); \
422     goto bad_row; \
423   } \
424 } while(0)
425
426 #define PG_TM_EXEC(cmd, whence) do { \
427   time_t __w = whence; \
428   char cmdbuf[4096]; \
429   struct tm tbuf, *tm; \
430   tm = gmtime_r(&__w, &tbuf); \
431   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
432   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
433                         (const char * const *)d->paramValues, \
434                         d->paramLengths, d->paramFormats, 0); \
435   d->rv = PQresultStatus(d->res); \
436   if(d->rv != PGRES_COMMAND_OK && \
437      d->rv != PGRES_TUPLES_OK) { \
438     const char *pgerr = PQresultErrorMessage(d->res); \
439     const char *pgerr_end = strchr(pgerr, '\n'); \
440     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
441     mtevL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
442           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
443           (long long unsigned)whence); \
444     PQclear(d->res); \
445     goto bad_row; \
446   } \
447 } while(0)
448
449 static void *
450 stratcon_ingest_check_loadall(void *vsn) {
451   storagenode_info *sn = vsn;
452   ds_single_detail *d;
453   int i, row_count = 0, good = 0;
454   char buff[1024];
455   conn_q *cq = NULL;
456
457   d = calloc(1, sizeof(*d));
458   GET_QUERY(check_loadall);
459   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
460   i = 0;
461   while(stratcon_database_connect(cq)) {
462     if(i++ > 4) {
463       mtevL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
464       release_conn_q(cq);
465       free(d);
466       return (void *)(vpsized_int)good;
467     }
468     sleep(1);
469   }
470   PG_EXEC(check_loadall);
471   row_count = PQntuples(d->res);
472  
473   for(i=0; i<row_count; i++) {
474     int rv;
475     int8_t family;
476     struct sockaddr *sin;
477     struct sockaddr_in sin4 = { .sin_family = AF_INET };
478     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
479     char *remote, *id, *target, *module, *name;
480     PG_GET_STR_COL(remote, i, "remote_address");
481     PG_GET_STR_COL(id, i, "id");
482     PG_GET_STR_COL(target, i, "target");
483     PG_GET_STR_COL(module, i, "module");
484     PG_GET_STR_COL(name, i, "name");
485     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
486
487     family = AF_INET;
488     sin = (struct sockaddr *)&sin4;
489     rv = inet_pton(family, remote, &sin4.sin_addr);
490     if(rv != 1) {
491       family = AF_INET6;
492       sin = (struct sockaddr *)&sin6;
493       rv = inet_pton(family, remote, &sin6.sin6_addr);
494       if(rv != 1) {
495         mtevL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
496         sin = NULL;
497       }
498     }
499
500     /* stratcon_iep_line_processor takes an allocated operand and frees it */
501     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
502     good++;
503   }
504   mtevL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
505         good, row_count, sn->fqdn);
506  bad_row:
507   free_params((ds_single_detail *)d);
508   free(d);
509   if(cq) release_conn_q(cq);
510   return (void *)(vpsized_int)good;
511 }
512 static int
513 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
514                                  struct timeval *now) {
515   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
516   pthread_t *jobs = NULL;
517   int nodes, i = 0, tcnt = 0;
518   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
519   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
520
521   pthread_mutex_lock(&storagenode_to_info_cache_lock);
522   nodes = mtev_hash_size(&storagenode_to_info_cache);
523   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
524   sns = calloc(MAX(1,nodes), sizeof(*sns));
525   if(nodes == 0) sns[nodes++] = &self;
526   else {
527     mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
528     const char *k;
529     void *v;
530     int klen;
531     while(mtev_hash_next(&storagenode_to_info_cache,
532                          &iter, &k, &klen, &v)) {
533       sns[i++] = (storagenode_info *)v;
534     }
535   }
536   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
537
538   for(i=0; i<nodes; i++) {
539     if(pthread_create(&jobs[i], NULL,
540                       stratcon_ingest_check_loadall, sns[i]) != 0) {
541       mtevL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
542     }
543   }
544   for(i=0; i<nodes; i++) {
545     void *good;
546     pthread_join(jobs[i], &good);
547     tcnt += (int)(vpsized_int)good;
548   }
549   free(jobs);
550   free(sns);
551   mtevL(noit_error, "Loaded all %d check states.\n", tcnt);
552   return 0;
553 }
554 static void
555 stratcon_ingest_iep_check_preload() {
556   eventer_t e;
557   conn_pool *cpool;
558
559   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
560   e = eventer_alloc();
561   e->mask = EVENTER_ASYNCH;
562   e->callback = stratcon_ingest_asynch_drive_iep;
563   e->closure = NULL;
564   eventer_add_asynch(cpool->jobq, e);
565 }
566 execute_outcome_t
567 stratcon_ingest_find(ds_rt_detail *d) {
568   conn_q *cq;
569   char *val;
570   int row_count;
571   struct realtime_tracker *node;
572
573   for(node = d->rt; node; node = node->next) {
574     char uuid_str[UUID_STR_LEN+1];
575     const char *fqdn, *dsn, *remote_cn;
576     char remote_ip[32];
577     int storagenode_id;
578
579     uuid_unparse_lower(node->checkid, uuid_str);
580     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
581                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
582       continue;
583
584     mtevL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
585           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
586
587     /* We might be able to find the IP from our config if someone has
588      * specified the expected cn in the noit definition.
589      */
590     if(stratcon_find_noit_ip_by_cn(remote_cn,
591                                    remote_ip, sizeof(remote_ip)) == 0) {
592       node->noit = strdup(remote_ip);
593       mtevL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
594       continue;
595     }
596
597     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
598     if(stratcon_database_connect(cq) != 0) goto bad_row;
599
600     GET_QUERY(check_find);
601     DECLARE_PARAM_INT(node->sid);
602     PG_EXEC(check_find);
603     row_count = PQntuples(d->res);
604     if(row_count != 1) {
605       mtevL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
606       PQclear(d->res);
607       goto bad_row;
608     }
609
610     /* Get the remote_address (which noit owns this) */
611     PG_GET_STR_COL(val, 0, "remote_address");
612     if(!val) {
613       mtevL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
614       PQclear(d->res);
615       goto bad_row;
616     }
617     node->noit = strdup(val);
618     mtevL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
619    bad_row:
620     free_params((ds_single_detail *)d);
621     d->nparams = 0;
622     release_conn_q(cq);
623   }
624   return DS_EXEC_SUCCESS;
625 }
626 execute_outcome_t
627 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
628                         ds_line_detail *d) {
629   int type, len, sid;
630   char *final_buff;
631   uLong final_len, actual_final_len;
632   char *token;
633   char raddr_blank[1] = "";
634   const char *raddr;
635
636   type = d->data[0];
637   raddr = r ? r : raddr_blank;
638
639   /* Parse the log line, but only if we haven't already */
640   if(!d->nparams) {
641     char *scp, *ecp;
642
643     scp = d->data;
644 #define PROCESS_NEXT_FIELD(t,l) do { \
645   if(!*scp) goto bad_row; \
646   ecp = strchr(scp, '\t'); \
647   if(!ecp) goto bad_row; \
648   token = scp; \
649   len = (ecp-scp); \
650   scp = ecp + 1; \
651 } while(0)
652 #define PROCESS_LAST_FIELD(t,l) do { \
653   if(!*scp) ecp = scp; \
654   else { \
655     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
656     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
657   } \
658   t = scp; \
659   l = (ecp-scp); \
660 } while(0)
661
662     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
663     switch(type) {
664       /* See noit_check_log.c for log description */
665       case 'n':
666         DECLARE_PARAM_STR(raddr, strlen(raddr));
667         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
668         DECLARE_PARAM_STR("noitd",5); /* node_type */
669         PROCESS_NEXT_FIELD(token,len);
670         d->whence = (time_t)strtoul(token, NULL, 10);
671         DECLARE_PARAM_STR(token,len); /* timestamp */
672
673         /* This is the expected uncompressed len */
674         PROCESS_NEXT_FIELD(token,len);
675         final_len = atoi(token);
676         final_buff = malloc(final_len);
677         if(!final_buff) goto bad_row;
678  
679         /* The last token is b64 endoded and compressed.
680          * we need to decode it, declare it and then free it.
681          */
682         PROCESS_LAST_FIELD(token, len);
683         /* We can in-place decode this */
684         len = mtev_b64_decode((char *)token, len,
685                               (unsigned char *)token, len);
686         if(len <= 0) {
687           mtevL(noit_error, "noitd config base64 decoding error.\n");
688           free(final_buff);
689           goto bad_row;
690         }
691         actual_final_len = final_len;
692         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
693                               (unsigned char *)token, len)) {
694           mtevL(noit_error, "noitd config decompression failure.\n");
695           free(final_buff);
696           goto bad_row;
697         }
698         if(final_len != actual_final_len) {
699           mtevL(noit_error, "noitd config decompression error.\n");
700           free(final_buff);
701           goto bad_row;
702         }
703         DECLARE_PARAM_STR(final_buff, final_len);
704         free(final_buff);
705         break;
706       case 'D':
707         break;
708       case 'C':
709         DECLARE_PARAM_STR(raddr, strlen(raddr));
710         PROCESS_NEXT_FIELD(token,len);
711         DECLARE_PARAM_STR(token,len); /* timestamp */
712         d->whence = (time_t)strtoul(token, NULL, 10);
713         PROCESS_NEXT_FIELD(token, len);
714         /* uuid is last 36 bytes */
715         if(len > 36) { token += (len-36); len = 36; }
716         sid = uuid_to_sid(token, remote_cn);
717         if(sid == 0) goto bad_row;
718         DECLARE_PARAM_INT(sid); /* sid */
719         DECLARE_PARAM_STR(token,len); /* uuid */
720         PROCESS_NEXT_FIELD(token, len);
721         DECLARE_PARAM_STR(token,len); /* target */
722         PROCESS_NEXT_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* module */
724         PROCESS_LAST_FIELD(token, len);
725         DECLARE_PARAM_STR(token,len); /* name */
726         break;
727       case 'M':
728         PROCESS_NEXT_FIELD(token,len);
729         DECLARE_PARAM_STR(token,len); /* timestamp */
730         d->whence = (time_t)strtoul(token, NULL, 10);
731         PROCESS_NEXT_FIELD(token, len);
732         /* uuid is last 36 bytes */
733         if(len > 36) { token += (len-36); len = 36; }
734         sid = uuid_to_sid(token, remote_cn);
735         if(sid == 0) goto bad_row;
736         DECLARE_PARAM_INT(sid); /* sid */
737         PROCESS_NEXT_FIELD(token, len);
738         DECLARE_PARAM_STR(token,len); /* name */
739         PROCESS_NEXT_FIELD(token,len);
740         d->metric_type = *token;
741         PROCESS_LAST_FIELD(token,len);
742         DECLARE_PARAM_STR(token,len); /* value */
743         break;
744       case 'S':
745         PROCESS_NEXT_FIELD(token,len);
746         DECLARE_PARAM_STR(token,len); /* timestamp */
747         d->whence = (time_t)strtoul(token, NULL, 10);
748         PROCESS_NEXT_FIELD(token, len);
749         /* uuid is last 36 bytes */
750         if(len > 36) { token += (len-36); len = 36; }
751         sid = uuid_to_sid(token, remote_cn);
752         if(sid == 0) goto bad_row;
753         DECLARE_PARAM_INT(sid); /* sid */
754         PROCESS_NEXT_FIELD(token, len);
755         DECLARE_PARAM_STR(token,len); /* state */
756         PROCESS_NEXT_FIELD(token, len);
757         DECLARE_PARAM_STR(token,len); /* availability */
758         PROCESS_NEXT_FIELD(token, len);
759         DECLARE_PARAM_STR(token,len); /* duration */
760         PROCESS_LAST_FIELD(token,len);
761         DECLARE_PARAM_STR(token,len); /* status */
762         break;
763       default:
764         goto bad_row;
765     }
766
767   }
768
769   /* Now execute the query */
770   switch(type) {
771     case 'n':
772       GET_QUERY(config_insert);
773       PG_EXEC(config_insert);
774       PQclear(d->res);
775       break;
776     case 'C':
777       GET_QUERY(check_insert);
778       PG_TM_EXEC(check_insert, d->whence);
779       PQclear(d->res);
780       break;
781     case 'S':
782       GET_QUERY(status_insert);
783       PG_TM_EXEC(status_insert, d->whence);
784       PQclear(d->res);
785       break;
786     case 'D':
787       break;
788     case 'M':
789       switch(d->metric_type) {
790         case METRIC_INT32:
791         case METRIC_UINT32:
792         case METRIC_INT64:
793         case METRIC_UINT64:
794         case METRIC_DOUBLE:
795           GET_QUERY(metric_insert_numeric);
796           PG_TM_EXEC(metric_insert_numeric, d->whence);
797           PQclear(d->res);
798           break;
799         case METRIC_STRING:
800           GET_QUERY(metric_insert_text);
801           PG_TM_EXEC(metric_insert_text, d->whence);
802           PQclear(d->res);
803           break;
804         default:
805           goto bad_row;
806       }
807       break;
808     default:
809       /* should never get here */
810       goto bad_row;
811   }
812   return DS_EXEC_SUCCESS;
813  bad_row:
814   return DS_EXEC_ROW_FAILED;
815 }
816 static int
817 stratcon_database_post_connect(conn_q *cq) {
818   int rv = 0;
819   ds_single_detail _d = { 0 }, *d = &_d;
820   if(cq->fqdn) {
821     char *remote_str, *remote_cn;
822     /* This is the silly way we get null's in through our declare_param_str */
823     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
824     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
825     /* This is a storage node, it gets the storage node post_connect */
826     GET_QUERY(storage_post_connect);
827     rv = -1; /* now we're serious */
828     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
829     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
830     PG_EXEC(storage_post_connect);
831     PQclear(d->res);
832     rv = 0;
833   }
834   else {
835     /* Metanode post_connect */
836     GET_QUERY(metanode_post_connect);
837     rv = -1; /* now we're serious */
838     PG_EXEC(metanode_post_connect);
839     PQclear(d->res);
840     rv = 0;
841   }
842  bad_row:
843   free_params(d);
844   if(rv == -1) {
845     /* Post-connect intentions are serious and fatal */
846     PQfinish(cq->dbh);
847     cq->dbh = NULL;
848   }
849   return rv;
850 }
851 static int
852 stratcon_database_connect(conn_q *cq) {
853   char *dsn, dsn_meta[512];
854   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
855   const char *k, *v;
856   int klen;
857   mtev_hash_table *t;
858
859   dsn_meta[0] = '\0';
860   if(!cq->dsn) {
861     t = mtev_conf_get_hash(NULL, "/stratcon/database/dbconfig");
862     while(mtev_hash_next_str(t, &iter, &k, &klen, &v)) {
863       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
864       strlcat(dsn_meta, k, sizeof(dsn_meta));
865       strlcat(dsn_meta, "=", sizeof(dsn_meta));
866       strlcat(dsn_meta, v, sizeof(dsn_meta));
867     }
868     mtev_hash_destroy(t, free, free);
869     free(t);
870     dsn = dsn_meta;
871   }
872   else {
873     char options[32];
874     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
875     if(mtev_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
876                                options, sizeof(options))) {
877       strlcat(dsn_meta, " ", sizeof(dsn_meta));
878       strlcat(dsn_meta, "user", sizeof(dsn_meta));
879       strlcat(dsn_meta, "=", sizeof(dsn_meta));
880       strlcat(dsn_meta, options, sizeof(dsn_meta));
881     }
882     if(mtev_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
883                                options, sizeof(options))) {
884       strlcat(dsn_meta, " ", sizeof(dsn_meta));
885       strlcat(dsn_meta, "password", sizeof(dsn_meta));
886       strlcat(dsn_meta, "=", sizeof(dsn_meta));
887       strlcat(dsn_meta, options, sizeof(dsn_meta));
888     }
889     dsn = dsn_meta;
890   }
891
892   if(cq->dbh) {
893     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
894     PQreset(cq->dbh);
895     if(PQstatus(cq->dbh) != CONNECTION_OK) {
896       mtevL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
897             dsn, PQerrorMessage(cq->dbh));
898       return -1;
899     }
900     if(stratcon_database_post_connect(cq)) return -1;
901     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
902     mtevL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
903           dsn, PQerrorMessage(cq->dbh));
904     return -1;
905   }
906
907   cq->dbh = PQconnectdb(dsn);
908   if(!cq->dbh) return -1;
909   if(PQstatus(cq->dbh) != CONNECTION_OK) {
910     mtevL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
911           dsn, PQerrorMessage(cq->dbh));
912     return -1;
913   }
914   if(stratcon_database_post_connect(cq)) return -1;
915   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
916   mtevL(noit_error, "Error connection to database: '%s'\nError: %s\n",
917         dsn, PQerrorMessage(cq->dbh));
918   return -1;
919 }
920 static int
921 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
922                              const char *name) {
923   int rv = -1;
924   PGresult *res;
925   char cmd[128];
926   strlcpy(cmd, p, sizeof(cmd));
927   strlcat(cmd, name, sizeof(cmd));
928   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
929   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
930   PQclear(res);
931   return rv;
932 }
933 static int
934 stratcon_ingest_do(conn_q *cq, const char *cmd) {
935   PGresult *res;
936   int rv = -1;
937   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
938   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
939   PQclear(res);
940   return rv;
941 }
942 #define BUSTED(cq) do { \
943   PQfinish((cq)->dbh); \
944   (cq)->dbh = NULL; \
945   goto full_monty; \
946 } while(0)
947 #define SAVEPOINT(name) do { \
948   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
949   last_sp = current; \
950 } while(0)
951 #define ROLLBACK_TO_SAVEPOINT(name) do { \
952   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
953     BUSTED(cq); \
954   last_sp = NULL; \
955 } while(0)
956 #define RELEASE_SAVEPOINT(name) do { \
957   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
958     BUSTED(cq); \
959   last_sp = NULL; \
960 } while(0)
961
962 int
963 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
964                               struct timeval *now) {
965   ds_rt_detail *dsjd = closure;
966   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
967   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
968
969   mtevAssert(dsjd->rt);
970   stratcon_ingest_find(dsjd);
971   if(dsjd->completion_event)
972     eventer_add(dsjd->completion_event);
973
974   free_params((ds_single_detail *)dsjd);
975   free(dsjd);
976   return 0;
977 }
978 static void
979 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
980                                 eventer_t completion) {
981   eventer_t e;
982   conn_pool *cpool;
983   ds_rt_detail *rtdetail;
984
985   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
986   rtdetail = calloc(1, sizeof(*rtdetail));
987   rtdetail->rt = rt;
988   rtdetail->completion_event = completion;
989   e = eventer_alloc();
990   e->mask = EVENTER_ASYNCH;
991   e->callback = stratcon_ingest_asynch_lookup;
992   e->closure = rtdetail;
993   eventer_add_asynch(cpool->jobq, e);
994 }
995 static const char *
996 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
997   void *vinfo;
998   char *dsn = NULL, *fqdn = NULL;
999   int found = 0;
1000   storagenode_info *info = NULL;
1001   pthread_mutex_lock(&storagenode_to_info_cache_lock);
1002   if(mtev_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
1003                         &vinfo)) {
1004     found = 1;
1005     info = vinfo;
1006   }
1007   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1008   if(found) {
1009     if(fqdn_out) *fqdn_out = info->fqdn;
1010     return info->dsn;
1011   }
1012
1013   if(!found && can_use_db) {
1014     ds_single_detail *d;
1015     conn_q *cq;
1016     int row_count;
1017     /* Look it up and store it */
1018     d = calloc(1, sizeof(*d));
1019     cq = get_conn_q_for_metanode();
1020     GET_QUERY(find_storage);
1021     DECLARE_PARAM_INT(id);
1022     PG_EXEC(find_storage);
1023     row_count = PQntuples(d->res);
1024     if(row_count) {
1025       PG_GET_STR_COL(dsn, 0, "dsn");
1026       PG_GET_STR_COL(fqdn, 0, "fqdn");
1027       fqdn = fqdn ? strdup(fqdn) : NULL;
1028       dsn = dsn ? strdup(dsn) : NULL;
1029     }
1030     PQclear(d->res);
1031    bad_row:
1032     free_params(d);
1033     free(d);
1034     release_conn_q(cq);
1035   }
1036   if(fqdn) {
1037     info = calloc(1, sizeof(*info));
1038     info->fqdn = fqdn;
1039     if(fqdn_out) *fqdn_out = info->fqdn;
1040     info->dsn = dsn;
1041     info->storagenode_id = id;
1042     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1043     mtev_hash_store(&storagenode_to_info_cache,
1044                     (void *)&info->storagenode_id, sizeof(int), info);
1045     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1046   }
1047   return info ? info->dsn : NULL;
1048 }
1049 static void
1050 expand_b_record(ds_line_detail **head, ds_line_detail **last,
1051                 const char *line, int len) {
1052   char **outrows;
1053   int i, cnt;
1054   ds_line_detail *next;
1055
1056   cnt = noit_check_log_b_to_sm(line, len, &outrows, 0);
1057   for(i=0;i<cnt;i++) {
1058     if(outrows[i] == NULL) continue;
1059     next = calloc(sizeof(*next), 1);
1060     next->data = outrows[i];
1061     if(!*head) *head = next;
1062     if(*last) (*last)->next = next;
1063     *last = next;
1064   }
1065   if(outrows) free(outrows);
1066 }
1067 static ds_line_detail *
1068 build_insert_batch(pg_interim_journal_t *ij) {
1069   int rv;
1070   off_t len;
1071   const char *buff, *cp, *lcp;
1072   struct stat st;
1073   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1074
1075   if(ij->fd < 0) {
1076     ij->fd = open(ij->filename, O_RDONLY);
1077     if(ij->fd < 0) {
1078       mtevL(noit_error, "Cannot open interim journal '%s': %s\n",
1079             ij->filename, strerror(errno));
1080       mtevAssert(ij->fd >= 0);
1081     }
1082   }
1083   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1084   if(rv == -1) {
1085       mtevL(noit_error, "Cannot stat interim journal '%s': %s\n",
1086             ij->filename, strerror(errno));
1087     mtevAssert(rv != -1);
1088   }
1089   len = st.st_size;
1090   if(len > 0) {
1091     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1092     if(buff == (void *)-1) {
1093       mtevL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1094             ij->filename, strerror(errno));
1095       mtevAssert(buff != (void *)-1);
1096     }
1097     lcp = buff;
1098     while(lcp < (buff + len) &&
1099           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1100       if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') {
1101       /* Bundle records are special and need to be expanded into
1102        * traditional records here
1103        */
1104         noit_compression_type_t ctype = NOIT_COMPRESS_NONE;
1105         switch(lcp[1]) {
1106           case '1': /* version 1 */
1107             ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */
1108           case '2': /* version 2 */
1109               expand_b_record(&head, &last, lcp, cp - lcp);
1110             break;
1111           default:
1112             mtevL(noit_error, "unknown bundle version %c\n", lcp[1]);
1113         }
1114       }
1115       else {
1116         next = calloc(1, sizeof(*next));
1117         next->data = malloc(cp - lcp + 1);
1118         memcpy(next->data, lcp, cp - lcp);
1119         next->data[cp - lcp] = '\0';
1120         if(!head) head = next;
1121         if(last) last->next = next;
1122         last = next;
1123       }
1124       lcp = cp + 1;
1125     }
1126     munmap((void *)buff, len);
1127   }
1128   close(ij->fd);
1129   return head;
1130 }
1131 static void
1132 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1133   unlink(ij->filename);
1134   free(ij->filename);
1135   if(ij->remote_str) free(ij->remote_str);
1136   if(ij->remote_cn) free(ij->remote_cn);
1137   if(ij->fqdn) free(ij->fqdn);
1138   free(ij);
1139 }
1140 static int
1141 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1142                                struct timeval *now) {
1143   int i, total, success, sp_total, sp_success;
1144   pg_interim_journal_t *ij;
1145   ds_line_detail *head = NULL, *current, *last_sp;
1146   const char *dsn;
1147   conn_q *cq;
1148   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1149   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1150
1151   ij = closure;
1152   if(ij->fqdn == NULL) {
1153     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1154     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1155   }
1156   else {
1157     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1158   }
1159   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1160                              ij->fqdn, dsn);
1161   mtevL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1162         ij->remote_str, ij->remote_cn, ij->fqdn);
1163  full_monty:
1164   /* Make sure we have a connection */
1165   i = 1;
1166   while(stratcon_database_connect(cq)) {
1167     mtevL(noit_error, "Error connecting to database: %s\n",
1168           ij->fqdn ? ij->fqdn : "(null)");
1169     sleep(i);
1170     i *= 2;
1171     i = MIN(i, 16);
1172   }
1173
1174   if(head == NULL) head = build_insert_batch(ij);
1175   mtevL(ds_deb, "Starting batch from %s/%s to %s\n",
1176         ij->remote_str ? ij->remote_str : "(null)",
1177         ij->remote_cn ? ij->remote_cn : "(null)",
1178         ij->fqdn ? ij->fqdn : "(null)");
1179   current = head;
1180   last_sp = NULL;
1181   total = success = sp_total = sp_success = 0;
1182   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1183   while(current) {
1184     execute_outcome_t rv;
1185     if(current->data) {
1186       if(!last_sp) {
1187         SAVEPOINT("batch");
1188         sp_success = success;
1189         sp_total = total;
1190       }
1191  
1192       if(current->problematic) {
1193         RELEASE_SAVEPOINT("batch");
1194         current = current->next;
1195         total++;
1196         continue;
1197       }
1198       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1199                                    current);
1200       switch(rv) {
1201         case DS_EXEC_SUCCESS:
1202           total++;
1203           success++;
1204           current = current->next;
1205           break;
1206         case DS_EXEC_ROW_FAILED:
1207           /* rollback to savepoint, mark this record as bad and start again */
1208           if(current->data[0] != 'n')
1209             mtevL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1210           current->problematic = 1;
1211           current = last_sp;
1212           success = sp_success;
1213           total = sp_total;
1214           ROLLBACK_TO_SAVEPOINT("batch");
1215           break;
1216         case DS_EXEC_TXN_FAILED:
1217           mtevL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1218           BUSTED(cq);
1219       }
1220     }
1221   }
1222   if(last_sp) RELEASE_SAVEPOINT("batch");
1223   if(stratcon_ingest_do(cq, "COMMIT")) {
1224     mtevL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1225     BUSTED(cq);
1226   }
1227   /* Cleanup the mess */
1228   while(head) {
1229     ds_line_detail *tofree;
1230     tofree = head;
1231     head = head->next;
1232     if(tofree->data) free(tofree->data);
1233     free_params((ds_single_detail *)tofree);
1234     free(tofree);
1235   }
1236   mtevL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1237         ij->remote_str ? ij->remote_str : "(null)",
1238         ij->remote_cn ? ij->remote_cn : "(null)",
1239         ij->fqdn ? ij->fqdn : "(null)", success, total);
1240   pg_interim_journal_remove(ij);
1241   release_conn_q(cq);
1242   return 0;
1243 }
1244 static int
1245 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1246                           int *sid_out, int *storagenode_id_out,
1247                           const char **remote_cn_out,
1248                           const char **fqdn_out, const char **dsn_out) {
1249   /* only called from the main thread -- no safety issues */
1250   void *vuuidinfo, *vinfo;
1251   uuid_info *uuidinfo;
1252   storagenode_info *info = NULL;
1253   char *fqdn = NULL;
1254   char *dsn = NULL;
1255   char *new_remote_cn = NULL;
1256   int storagenode_id = 0, sid = 0;
1257   if(!mtev_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1258                          &vuuidinfo)) {
1259     int row_count = 0;
1260     char *tmpint;
1261     ds_single_detail *d;
1262     conn_q *cq;
1263
1264     /* We can't do a database lookup without the remote_cn */
1265     if(!remote_cn) {
1266       if(stratcon_datastore_get_enabled()) {
1267         /* We have an authoritatively maintained cache, we don't do lookups */
1268         return -1;
1269       }
1270       else
1271         remote_cn = "[[null]]";
1272     }
1273
1274     d = calloc(1, sizeof(*d));
1275     cq = get_conn_q_for_metanode();
1276     if(stratcon_database_connect(cq) == 0) {
1277       /* Blocking call to service the cache miss */
1278       GET_QUERY(check_map);
1279       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1280       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1281       PG_EXEC(check_map);
1282       row_count = PQntuples(d->res);
1283       if(row_count != 1) {
1284         PQclear(d->res);
1285         goto bad_row;
1286       }
1287       PG_GET_STR_COL(tmpint, 0, "sid");
1288       if(!tmpint) {
1289         row_count = 0;
1290         PQclear(d->res);
1291         goto bad_row;
1292       }
1293       sid = atoi(tmpint);
1294       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1295       if(tmpint) storagenode_id = atoi(tmpint);
1296       PG_GET_STR_COL(fqdn, 0, "fqdn");
1297       PG_GET_STR_COL(dsn, 0, "dsn");
1298       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1299       fqdn = fqdn ? strdup(fqdn) : NULL;
1300       dsn = dsn ? strdup(dsn) : NULL;
1301       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1302       PQclear(d->res);
1303     }
1304    bad_row:
1305     free_params((ds_single_detail *)d);
1306     free(d);
1307     release_conn_q(cq);
1308     if(row_count != 1) {
1309       return -1;
1310     }
1311     /* Place in cache */
1312     uuidinfo = calloc(1, sizeof(*uuidinfo));
1313     uuidinfo->sid = sid;
1314     uuidinfo->uuid_str = strdup(uuid_str);
1315     uuidinfo->storagenode_id = storagenode_id;
1316     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1317     mtev_hash_store(&uuid_to_info_cache,
1318                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1319     /* Also, we may have just witnessed a new storage node, store it */
1320     if(storagenode_id) {
1321       int needs_free = 0;
1322       info = calloc(1, sizeof(*info));
1323       info->storagenode_id = storagenode_id;
1324       info->dsn = dsn ? strdup(dsn) : NULL;
1325       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1326       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1327       if(!mtev_hash_retrieve(&storagenode_to_info_cache,
1328                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1329         /* hack to save memory -- we *never* remove from these caches,
1330            so we can use the same fqdn value in the above cache for the key
1331            in the cache below -- (no strdup) */
1332         mtev_hash_store(&storagenode_to_info_cache,
1333                         (void *)&info->storagenode_id, sizeof(int), info);
1334       }
1335       else needs_free = 1;
1336       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1337       if(needs_free) {
1338         if(info->dsn) free(info->dsn);
1339         if(info->fqdn) free(info->fqdn);
1340         free(info);
1341       }
1342     }
1343   }
1344   else
1345     uuidinfo = vuuidinfo;
1346
1347   if(uuidinfo && uuidinfo->storagenode_id) {
1348     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1349       /* we don't have dsn and we actually want it */
1350       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1351       if(mtev_hash_retrieve(&storagenode_to_info_cache,
1352                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1353                             &vinfo))
1354         info = vinfo;
1355       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1356     }
1357   }
1358
1359   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1360   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1361   mtevAssert(uuidinfo);
1362   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1363   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1364   if(sid_out) *sid_out = uuidinfo->sid;
1365   if(fqdn) free(fqdn);
1366   if(dsn) free(dsn);
1367   if(new_remote_cn) free(new_remote_cn);
1368   return 0;
1369 }
1370 static int
1371 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1372   char uuid_str[UUID_STR_LEN+1];
1373   int sid = 0;
1374   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1375   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1376   return sid;
1377 }
1378
1379 static int
1380 stratcon_ingest_saveconfig() {
1381   int rv = -1;
1382   char *buff;
1383   ds_single_detail _d = { 0 }, *d = &_d;
1384   conn_q *cq;
1385   char ipv4_str[32];
1386   struct in_addr r, l;
1387
1388   r.s_addr = htonl((4 << 24) | (2 << 16) | (2 << 8) | 1);
1389   memset(&l, 0, sizeof(l));
1390   mtev_getip_ipv4(r, &l);
1391   /* Ignore the error.. what are we going to do anyway */
1392   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1393     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1394
1395   cq = get_conn_q_for_metanode();
1396
1397   if(stratcon_database_connect(cq) == 0) {
1398     char time_as_str[20];
1399     size_t len;
1400     buff = mtev_conf_xml_in_mem(&len);
1401     if(!buff) goto bad_row;
1402
1403     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1404     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1405     DECLARE_PARAM_STR("", 0);
1406     DECLARE_PARAM_STR("stratcond", 9);
1407     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1408     DECLARE_PARAM_STR(buff, len);
1409     free(buff);
1410
1411     GET_QUERY(config_insert);
1412     PG_EXEC(config_insert);
1413     PQclear(d->res);
1414     rv = 0;
1415
1416     bad_row:
1417       free_params(d);
1418   }
1419   release_conn_q(cq);
1420   return rv;
1421 }
1422
1423 static int
1424 stratcon_ingest_launch_file_ingestion(const char *path,
1425                                       const char *remote_str,
1426                                       const char *remote_cn,
1427                                       const char *id_str,
1428                                       const mtev_boolean sweeping) {
1429   pg_interim_journal_t *ij;
1430   char pgfile[PATH_MAX];
1431   eventer_t ingest;
1432
1433   (void)sweeping;
1434
1435   if(strcmp(path + strlen(path) - 3, ".pg")) {
1436     snprintf(pgfile, sizeof(pgfile), "%s.pg", path);
1437     if(link(path, pgfile) < 0 && errno != EEXIST) {
1438       mtevL(noit_error, "cannot link journal %s: %s\n", path, strerror(errno));
1439       return -1;
1440     }
1441   }
1442   else
1443     strlcpy(pgfile, path, sizeof(pgfile));
1444   ij = calloc(1, sizeof(*ij));
1445   ij->fd = open(pgfile, O_RDONLY);
1446   if(ij->fd < 0) {
1447     mtevL(noit_error, "cannot open journal '%s': %s\n",
1448           pgfile, strerror(errno));
1449     free(ij);
1450     return -1;
1451   }
1452   close(ij->fd);
1453   ij->fd = -1;
1454   ij->filename = strdup(pgfile);
1455   ij->remote_str = strdup(remote_str);
1456   ij->remote_cn = strdup(remote_cn);
1457   ij->storagenode_id = atoi(id_str);
1458   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1459                                        ij->fqdn);
1460   mtevL(noit_debug, "ingesting payload: %s\n", ij->filename);
1461   ingest = eventer_alloc();
1462   ingest->mask = EVENTER_ASYNCH;
1463   ingest->callback = stratcon_ingest_asynch_execute;
1464   ingest->closure = ij;
1465   eventer_add_asynch(ij->cpool->jobq, ingest);
1466   return 0;
1467 }
1468
1469 int
1470 stratcon_ingest_all_storagenode_info() {
1471   int i, cnt = 0;
1472   ds_single_detail _d = { 0 }, *d = &_d;
1473   conn_q *cq;
1474   cq = get_conn_q_for_metanode();
1475
1476   while(stratcon_database_connect(cq)) {
1477     mtevL(noit_error, "Error connecting to database\n");
1478     sleep(1);
1479   }
1480
1481   GET_QUERY(all_storage);
1482   PG_EXEC(all_storage);
1483   cnt = PQntuples(d->res);
1484   for(i=0; i<cnt; i++) {
1485     void *vinfo;
1486     char *tmpint, *fqdn, *dsn;
1487     int storagenode_id;
1488     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1489     storagenode_id = atoi(tmpint);
1490     PG_GET_STR_COL(fqdn, i, "fqdn");
1491     PG_GET_STR_COL(dsn, i, "dsn");
1492     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1493     storagenode_id = tmpint ? atoi(tmpint) : 0;
1494
1495     if(!mtev_hash_retrieve(&storagenode_to_info_cache,
1496                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1497       storagenode_info *info;
1498       info = calloc(1, sizeof(*info));
1499       info->storagenode_id = storagenode_id;
1500       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1501       info->dsn = dsn ? strdup(dsn) : NULL;
1502       mtev_hash_store(&storagenode_to_info_cache,
1503                       (void *)&info->storagenode_id, sizeof(int), info);
1504       mtevL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1505             info->storagenode_id,
1506             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1507     }
1508     mtev_watchdog_child_heartbeat();
1509   }
1510   PQclear(d->res);
1511  bad_row:
1512   free_params(d);
1513
1514   release_conn_q(cq);
1515   mtevL(noit_error, "Loaded %d storage nodes\n", cnt);
1516   return cnt;
1517 }
1518 int
1519 stratcon_ingest_all_check_info() {
1520   int i, cnt, loaded = 0;
1521   ds_single_detail _d = { 0 }, *d = &_d;
1522   conn_q *cq;
1523   cq = get_conn_q_for_metanode();
1524
1525   while(stratcon_database_connect(cq)) {
1526     mtevL(noit_error, "Error connecting to database\n");
1527     sleep(1);
1528   }
1529
1530   GET_QUERY(check_mapall);
1531   PG_EXEC(check_mapall);
1532   cnt = PQntuples(d->res);
1533   for(i=0; i<cnt; i++) {
1534     void *vinfo;
1535     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1536     int sid, storagenode_id;
1537     uuid_info *uuidinfo;
1538     PG_GET_STR_COL(uuid_str, i, "id");
1539     if(!uuid_str) continue;
1540     PG_GET_STR_COL(tmpint, i, "sid");
1541     if(!tmpint) continue;
1542     sid = atoi(tmpint);
1543     PG_GET_STR_COL(fqdn, i, "fqdn");
1544     PG_GET_STR_COL(dsn, i, "dsn");
1545     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1546     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1547     storagenode_id = tmpint ? atoi(tmpint) : 0;
1548
1549     uuidinfo = calloc(1, sizeof(*uuidinfo));
1550     uuidinfo->uuid_str = strdup(uuid_str);
1551     uuidinfo->remote_cn = strdup(remote_cn);
1552     uuidinfo->storagenode_id = storagenode_id;
1553     uuidinfo->sid = sid;
1554     mtev_hash_store(&uuid_to_info_cache,
1555                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1556     mtevL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1557           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1558     loaded++;
1559     if(!mtev_hash_retrieve(&storagenode_to_info_cache,
1560                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1561       storagenode_info *info;
1562       info = calloc(1, sizeof(*info));
1563       info->storagenode_id = storagenode_id;
1564       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1565       info->dsn = dsn ? strdup(dsn) : NULL;
1566       mtev_hash_store(&storagenode_to_info_cache,
1567                       (void *)&info->storagenode_id, sizeof(int), info);
1568       mtevL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1569             info->storagenode_id,
1570             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1571     }
1572     mtev_watchdog_child_heartbeat();
1573   }
1574   PQclear(d->res);
1575  bad_row:
1576   free_params(d);
1577
1578   release_conn_q(cq);
1579   mtevL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1580   return loaded;
1581 }
1582
1583 static char *
1584 stratcon_get_noit_config(const char *cn) {
1585   ds_single_detail *d;
1586   int row_count = 0;
1587   const char *xml = NULL;
1588   char *xmlcopy = NULL;
1589   conn_q *cq = NULL;
1590
1591   d = calloc(1, sizeof(*d));
1592   GET_QUERY(config_get);
1593   cq = get_conn_q_for_metanode();
1594   if(!cq) goto bad_row;
1595
1596   DECLARE_PARAM_STR(cn, cn ? strlen(cn) : 0);
1597   PG_EXEC(config_get);
1598   row_count = PQntuples(d->res);
1599   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1600
1601   if(xml) xmlcopy = strdup(xml);
1602
1603  bad_row:
1604   free_params((ds_single_detail *)d);
1605   d->nparams = 0;
1606   if(cq) release_conn_q(cq);
1607   free(d);
1608
1609   return xmlcopy;
1610 }
1611
1612 static ingestor_api_t postgres_ingestor_api = {
1613   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1614   .iep_check_preload = stratcon_ingest_iep_check_preload,
1615   .storage_node_lookup = storage_node_quick_lookup,
1616   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1617   .get_noit_config = stratcon_get_noit_config,
1618   .save_config = stratcon_ingest_saveconfig
1619 };
1620
1621 static int postgres_ingestor_config(mtev_dso_generic_t *self, mtev_hash_table *o) {
1622   return 0;
1623 }
1624 static int postgres_ingestor_onload(mtev_image_t *self) {
1625   return 0;
1626 }
1627 static int is_postgres_ingestor_file(const char *file) {
1628   mtev_watchdog_child_heartbeat();
1629   return (strlen(file) == 19 && !strcmp(file + 16, ".pg"));
1630 }
1631 static int postgres_ingestor_init(mtev_dso_generic_t *self) {
1632   stratcon_datastore_core_init();
1633   mtev_hash_init(&ds_conns);
1634   mtev_hash_init(&uuid_to_info_cache);
1635   mtev_hash_init(&storagenode_to_info_cache);
1636   pthread_mutex_init(&ds_conns_lock, NULL);
1637   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1638   ds_err = mtev_log_stream_find("error/datastore");
1639   ds_deb = mtev_log_stream_find("debug/datastore");
1640   ds_pool_deb = mtev_log_stream_find("debug/datastore_pool");
1641   ingest_err = mtev_log_stream_find("error/ingest");
1642   if(!ds_err) ds_err = noit_error;
1643   if(!ingest_err) ingest_err = noit_error;
1644   if(!mtev_conf_get_string(NULL, "/stratcon/database/journal/path",
1645                            &basejpath)) {
1646     mtevL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1647     exit(-1);
1648   }
1649   stratcon_ingest_all_check_info();
1650   stratcon_ingest_all_storagenode_info();
1651   stratcon_ingest_sweep_journals(basejpath, is_postgres_ingestor_file,
1652                                  stratcon_ingest_launch_file_ingestion);
1653   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1654 }
1655
1656 mtev_dso_generic_t postgres_ingestor = {
1657   {
1658     .magic = MTEV_GENERIC_MAGIC,
1659     .version = MTEV_GENERIC_ABI_VERSION,
1660     .name = "postgres_ingestor",
1661     .description = "postgres drive for data ingestion",
1662     .xml_description = postgres_ingestor_xml_description,
1663     .onload = postgres_ingestor_onload,
1664   }, 
1665   postgres_ingestor_config,
1666   postgres_ingestor_init
1667 };
Note: See TracBrowser for help on using the browser.