root/src/modules/postgres_ingestor.c

Revision 304ec80b8cf842fc0abe5f9029790908b6455957, 51.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 months ago)

Convert to libmtev.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <unistd.h>
37 #include <fcntl.h>
38 #include <netinet/in.h>
39 #include <sys/un.h>
40 #include <dirent.h>
41 #include <arpa/inet.h>
42 #include <sys/mman.h>
43 #include <libpq-fe.h>
44 #include <zlib.h>
45 #include <assert.h>
46 #include <errno.h>
47
48 #include <eventer/eventer.h>
49 #include <mtev_log.h>
50 #include <mtev_b64.h>
51 #include <mtev_str.h>
52 #include <mtev_mkdir.h>
53 #include <mtev_getip.h>
54 #include <mtev_watchdog.h>
55 #include <mtev_conf.h>
56 #include <mtev_rest.h>
57
58 #include "noit_mtev_bridge.h"
59 #include "noit_module.h"
60 #include "stratcon_datastore.h"
61 #include "stratcon_ingest.h"
62 #include "stratcon_realtime_http.h"
63 #include "stratcon_iep.h"
64 #include "noit_check.h"
65 #include "noit_check_log_helpers.h"
66 #include "bundle.pb-c.h"
67
68 #include "postgres_ingestor.xmlh"
69
70 #define DECL_STMT(codename,confname) \
71 static char *codename = NULL; \
72 static const char *codename##_conf = "/stratcon/database/statements/" #confname
73
74 DECL_STMT(storage_post_connect, storagepostconnect);
75 DECL_STMT(metanode_post_connect, metanodepostconnect);
76 DECL_STMT(find_storage, findstoragenode);
77 DECL_STMT(all_storage, allstoragenodes);
78 DECL_STMT(check_map, mapchecktostoragenode);
79 DECL_STMT(check_mapall, mapallchecks);
80 DECL_STMT(check_loadall, allchecks);
81 DECL_STMT(check_find, findcheck);
82 DECL_STMT(check_insert, check);
83 DECL_STMT(status_insert, status);
84 DECL_STMT(metric_insert_numeric, metric_numeric);
85 DECL_STMT(metric_insert_text, metric_text);
86 DECL_STMT(config_insert, config);
87 DECL_STMT(config_get, findconfig);
88
89 static mtev_log_stream_t ds_err = NULL;
90 static mtev_log_stream_t ds_deb = NULL;
91 static mtev_log_stream_t ds_pool_deb = NULL;
92 static mtev_log_stream_t ingest_err = NULL;
93
94 #define GET_QUERY(a) do { \
95   if(a == NULL) \
96     if(!mtev_conf_get_string(NULL, a ## _conf, &(a))) \
97       goto bad_row; \
98 } while(0)
99
100 struct conn_q;
101
102 typedef struct {
103   char            *queue_name; /* the key fqdn+remote_sn */
104   eventer_jobq_t  *jobq;
105   struct conn_q   *head;
106   pthread_mutex_t  lock;
107   pthread_cond_t   cv;
108   int              ttl;
109   int              in_pool;
110   int              outstanding;
111   int              max_allocated;
112   int              max_in_pool;
113 } conn_pool;
114 typedef struct conn_q {
115   time_t           last_use;
116   char            *dsn;        /* Pg connect string */
117   char            *remote_str; /* the IP of the noit*/
118   char            *remote_cn;  /* the Cert CN of the noit */
119   char            *fqdn;       /* the fqdn of the storage node */
120   conn_pool       *pool;
121   struct conn_q   *next;
122   /* Postgres specific stuff */
123   PGconn          *dbh;
124 } conn_q;
125
126
127 #define MAX_PARAMS 8
128 #define POSTGRES_PARTS \
129   PGresult *res; \
130   int rv; \
131   time_t whence; \
132   int nparams; \
133   int metric_type; \
134   char *paramValues[MAX_PARAMS]; \
135   int paramLengths[MAX_PARAMS]; \
136   int paramFormats[MAX_PARAMS]; \
137   int paramAllocd[MAX_PARAMS];
138
139 typedef struct ds_single_detail {
140   POSTGRES_PARTS
141 } ds_single_detail;
142 typedef struct {
143   /* Postgres specific stuff */
144   POSTGRES_PARTS
145   struct realtime_tracker *rt;
146   conn_q *cq; /* connection on which to perform this job */
147   eventer_t completion_event; /* This event should be registered if non NULL */
148 } ds_rt_detail;
149 typedef struct ds_line_detail {
150   /* Postgres specific stuff */
151   POSTGRES_PARTS
152   char *data;
153   int problematic;
154   struct ds_line_detail *next;
155 } ds_line_detail;
156
157 typedef struct {
158   char *remote_str;
159   char *remote_cn;
160   char *fqdn;
161   int storagenode_id;
162   int fd;
163   char *filename;
164   conn_pool *cpool;
165 } pg_interim_journal_t;
166
167 static int stratcon_database_connect(conn_q *cq);
168 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
169 static int storage_node_quick_lookup(const char *uuid_str,
170                                      const char *remote_cn,
171                                      int *sid_out, int *storagenode_id_out,
172                                      const char **remote_cn_out,
173                                      const char **fqdn_out,
174                                      const char **dsn_out);
175
176 static void
177 free_params(ds_single_detail *d) {
178   int i;
179   for(i=0; i<d->nparams; i++)
180     if(d->paramAllocd[i] && d->paramValues[i])
181       free(d->paramValues[i]);
182 }
183
184 static char *basejpath = NULL;
185 static pthread_mutex_t ds_conns_lock;
186 static mtev_hash_table ds_conns;
187
188 /* the fqdn cache needs to be thread safe */
189 typedef struct {
190   char *uuid_str;
191   char *remote_cn;
192   int storagenode_id;
193   int sid;
194 } uuid_info;
195 typedef struct {
196   int storagenode_id;
197   char *fqdn;
198   char *dsn;
199 } storagenode_info;
200 mtev_hash_table uuid_to_info_cache;
201 pthread_mutex_t storagenode_to_info_cache_lock;
202 mtev_hash_table storagenode_to_info_cache;
203
204 /* Thread-safe connection pools */
205
206 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
207 static void
208 release_conn_q_forceable(conn_q *cq, int forcefree) {
209   int putback = 0;
210   cq->last_use = time(NULL);
211   pthread_mutex_lock(&cq->pool->lock);
212   cq->pool->outstanding--;
213   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
214     putback = 1;
215     cq->next = cq->pool->head;
216     cq->pool->head = cq;
217     cq->pool->in_pool++;
218   }
219   pthread_mutex_unlock(&cq->pool->lock);
220   mtevL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)(vpsized_int)pthread_self(),
221         putback ? "to pool" : "and destroy", cq->pool->queue_name);
222   pthread_cond_signal(&cq->pool->cv);
223   if(putback) return;
224
225   /* Not put back, release it */
226   if(cq->dbh) PQfinish(cq->dbh);
227   if(cq->remote_str) free(cq->remote_str);
228   if(cq->remote_cn) free(cq->remote_cn);
229   if(cq->fqdn) free(cq->fqdn);
230   if(cq->dsn) free(cq->dsn);
231   free(cq);
232 }
233 static void
234 ttl_purge_conn_pool(conn_pool *pool) {
235   int old_cnt, new_cnt;
236   time_t now = time(NULL);
237   conn_q *cq, *prev = NULL, *iter;
238   /* because we always replace on the head and update the last_use time when
239      doing so, we know they are ordered LRU on the end.  So, once we hit an
240      old one, we know all the others are old too.
241    */
242   if(!pool->head) return; /* hack short circuit for no locks */
243   pthread_mutex_lock(&pool->lock);
244   old_cnt = pool->in_pool;
245   cq = pool->head;
246   while(cq) {
247     if(cq->last_use + cq->pool->ttl < now) {
248       if(prev) prev->next = NULL;
249       else pool->head = NULL;
250       break;
251     }
252     prev = cq;
253     cq = cq->next;
254   }
255   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
256   /* Fix accounting */
257   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
258   new_cnt = pool->in_pool;
259   pthread_mutex_unlock(&pool->lock);
260
261   /* Force release these without holding the lock */
262   while(cq) {
263     prev = cq;
264     cq = cq->next;
265     release_conn_q_forceable(prev, 1);
266   }
267   if(old_cnt != new_cnt)
268     mtevL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
269           pool->queue_name);
270 }
271 static void
272 release_conn_q(conn_q *cq) {
273   ttl_purge_conn_pool(cq->pool);
274   release_conn_q_forceable(cq, 0);
275 }
276 static conn_pool *
277 get_conn_pool_for_remote(const char *remote_str,
278                          const char *remote_cn, const char *fqdn) {
279   void *vcpool;
280   conn_pool *cpool = NULL;
281   char queue_name[256] = "datastore_";
282   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
283            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
284            fqdn ? fqdn : "default",
285            remote_cn ? remote_cn : "default");
286   pthread_mutex_lock(&ds_conns_lock);
287   if(mtev_hash_retrieve(&ds_conns, (const char *)queue_name,
288                         strlen(queue_name), &vcpool))
289     cpool = vcpool;
290   pthread_mutex_unlock(&ds_conns_lock);
291   if(!cpool) {
292     vcpool = cpool = calloc(1, sizeof(*cpool));
293     cpool->queue_name = strdup(queue_name);
294     pthread_mutex_init(&cpool->lock, NULL);
295     pthread_cond_init(&cpool->cv, NULL);
296     cpool->in_pool = 0;
297     cpool->outstanding = 0;
298     cpool->max_in_pool = 1;
299     cpool->max_allocated = 1;
300     pthread_mutex_lock(&ds_conns_lock);
301     if(!mtev_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
302                         cpool)) {
303       mtev_hash_retrieve(&ds_conns, (const char *)queue_name,
304                          strlen(queue_name), &vcpool);
305     }
306     pthread_mutex_unlock(&ds_conns_lock);
307     if(vcpool != cpool) {
308       /* someone beat us to it */
309       free(cpool->queue_name);
310       pthread_mutex_destroy(&cpool->lock);
311       pthread_cond_destroy(&cpool->cv);
312       free(cpool);
313     }
314     else {
315       int i;
316       /* Our job to setup the pool */
317       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
318       eventer_jobq_init(cpool->jobq, queue_name);
319       /* Add one thread */
320       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
321         eventer_jobq_increase_concurrency(cpool->jobq);
322     }
323     cpool = vcpool;
324   }
325   return cpool;
326 }
327 static conn_q *
328 get_conn_q_for_remote(const char *remote_str,
329                       const char *remote_cn, const char *fqdn,
330                       const char *dsn) {
331   conn_pool *cpool;
332   conn_q *cq;
333   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
334   mtevL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)(vpsized_int)pthread_self(),
335         cpool->queue_name);
336   pthread_mutex_lock(&cpool->lock);
337  again:
338   if(cpool->head) {
339     assert(cpool->in_pool > 0);
340     cq = cpool->head;
341     cpool->head = cq->next;
342     cpool->in_pool--;
343     cpool->outstanding++;
344     cq->next = NULL;
345     pthread_mutex_unlock(&cpool->lock);
346     return cq;
347   }
348   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
349     mtevL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
350           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
351     pthread_cond_wait(&cpool->cv, &cpool->lock);
352     mtevL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
353           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
354     goto again;
355   }
356   else {
357     cpool->outstanding++;
358     pthread_mutex_unlock(&cpool->lock);
359   }
360  
361   cq = calloc(1, sizeof(*cq));
362   cq->pool = cpool;
363   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
364   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
365   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
366   cq->dsn = dsn ? strdup(dsn) : NULL;
367   return cq;
368 }
369 static conn_q *
370 get_conn_q_for_metanode() {
371   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
372 }
373
374 typedef enum {
375   DS_EXEC_SUCCESS = 0,
376   DS_EXEC_ROW_FAILED = 1,
377   DS_EXEC_TXN_FAILED = 2,
378 } execute_outcome_t;
379
380 #define DECLARE_PARAM_STR(str, len) do { \
381   d->paramValues[d->nparams] = mtev__strndup(str, len); \
382   d->paramLengths[d->nparams] = len; \
383   d->paramFormats[d->nparams] = 0; \
384   d->paramAllocd[d->nparams] = 1; \
385   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
386     free(d->paramValues[d->nparams]); \
387     d->paramValues[d->nparams] = NULL; \
388     d->paramLengths[d->nparams] = 0; \
389     d->paramAllocd[d->nparams] = 0; \
390   } \
391   d->nparams++; \
392 } while(0)
393 #define DECLARE_PARAM_INT(i) do { \
394   int buffer__len; \
395   char buffer__[32]; \
396   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
397   buffer__len = strlen(buffer__); \
398   DECLARE_PARAM_STR(buffer__, buffer__len); \
399 } while(0)
400
401 #define PG_GET_STR_COL(dest, row, name) do { \
402   int colnum = PQfnumber(d->res, name); \
403   dest = NULL; \
404   if (colnum >= 0) \
405     dest = PQgetisnull(d->res, row, colnum) \
406          ? NULL : PQgetvalue(d->res, row, colnum); \
407 } while(0)
408
409 #define PG_EXEC(cmd) do { \
410   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
411                         (const char * const *)d->paramValues, \
412                         d->paramLengths, d->paramFormats, 0); \
413   d->rv = PQresultStatus(d->res); \
414   if(d->rv != PGRES_COMMAND_OK && \
415      d->rv != PGRES_TUPLES_OK) { \
416     const char *pgerr = PQresultErrorMessage(d->res); \
417     const char *pgerr_end = strchr(pgerr, '\n'); \
418     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
419     mtevL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
420           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
421           (int)(pgerr_end - pgerr), pgerr); \
422     PQclear(d->res); \
423     goto bad_row; \
424   } \
425 } while(0)
426
427 #define PG_TM_EXEC(cmd, whence) do { \
428   time_t __w = whence; \
429   char cmdbuf[4096]; \
430   struct tm tbuf, *tm; \
431   tm = gmtime_r(&__w, &tbuf); \
432   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
433   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
434                         (const char * const *)d->paramValues, \
435                         d->paramLengths, d->paramFormats, 0); \
436   d->rv = PQresultStatus(d->res); \
437   if(d->rv != PGRES_COMMAND_OK && \
438      d->rv != PGRES_TUPLES_OK) { \
439     const char *pgerr = PQresultErrorMessage(d->res); \
440     const char *pgerr_end = strchr(pgerr, '\n'); \
441     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
442     mtevL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
443           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
444           (long long unsigned)whence); \
445     PQclear(d->res); \
446     goto bad_row; \
447   } \
448 } while(0)
449
450 static void *
451 stratcon_ingest_check_loadall(void *vsn) {
452   storagenode_info *sn = vsn;
453   ds_single_detail *d;
454   int i, row_count = 0, good = 0;
455   char buff[1024];
456   conn_q *cq = NULL;
457
458   d = calloc(1, sizeof(*d));
459   GET_QUERY(check_loadall);
460   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
461   i = 0;
462   while(stratcon_database_connect(cq)) {
463     if(i++ > 4) {
464       mtevL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
465       release_conn_q(cq);
466       free(d);
467       return (void *)(vpsized_int)good;
468     }
469     sleep(1);
470   }
471   PG_EXEC(check_loadall);
472   row_count = PQntuples(d->res);
473  
474   for(i=0; i<row_count; i++) {
475     int rv;
476     int8_t family;
477     struct sockaddr *sin;
478     struct sockaddr_in sin4 = { .sin_family = AF_INET };
479     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
480     char *remote, *id, *target, *module, *name;
481     PG_GET_STR_COL(remote, i, "remote_address");
482     PG_GET_STR_COL(id, i, "id");
483     PG_GET_STR_COL(target, i, "target");
484     PG_GET_STR_COL(module, i, "module");
485     PG_GET_STR_COL(name, i, "name");
486     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
487
488     family = AF_INET;
489     sin = (struct sockaddr *)&sin4;
490     rv = inet_pton(family, remote, &sin4.sin_addr);
491     if(rv != 1) {
492       family = AF_INET6;
493       sin = (struct sockaddr *)&sin6;
494       rv = inet_pton(family, remote, &sin6.sin6_addr);
495       if(rv != 1) {
496         mtevL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
497         sin = NULL;
498       }
499     }
500
501     /* stratcon_iep_line_processor takes an allocated operand and frees it */
502     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
503     good++;
504   }
505   mtevL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
506         good, row_count, sn->fqdn);
507  bad_row:
508   free_params((ds_single_detail *)d);
509   free(d);
510   if(cq) release_conn_q(cq);
511   return (void *)(vpsized_int)good;
512 }
513 static int
514 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
515                                  struct timeval *now) {
516   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
517   pthread_t *jobs = NULL;
518   int nodes, i = 0, tcnt = 0;
519   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
520   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
521
522   pthread_mutex_lock(&storagenode_to_info_cache_lock);
523   nodes = mtev_hash_size(&storagenode_to_info_cache);
524   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
525   sns = calloc(MAX(1,nodes), sizeof(*sns));
526   if(nodes == 0) sns[nodes++] = &self;
527   else {
528     mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
529     const char *k;
530     void *v;
531     int klen;
532     while(mtev_hash_next(&storagenode_to_info_cache,
533                          &iter, &k, &klen, &v)) {
534       sns[i++] = (storagenode_info *)v;
535     }
536   }
537   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
538
539   for(i=0; i<nodes; i++) {
540     if(pthread_create(&jobs[i], NULL,
541                       stratcon_ingest_check_loadall, sns[i]) != 0) {
542       mtevL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
543     }
544   }
545   for(i=0; i<nodes; i++) {
546     void *good;
547     pthread_join(jobs[i], &good);
548     tcnt += (int)(vpsized_int)good;
549   }
550   free(jobs);
551   free(sns);
552   mtevL(noit_error, "Loaded all %d check states.\n", tcnt);
553   return 0;
554 }
555 static void
556 stratcon_ingest_iep_check_preload() {
557   eventer_t e;
558   conn_pool *cpool;
559
560   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
561   e = eventer_alloc();
562   e->mask = EVENTER_ASYNCH;
563   e->callback = stratcon_ingest_asynch_drive_iep;
564   e->closure = NULL;
565   eventer_add_asynch(cpool->jobq, e);
566 }
567 execute_outcome_t
568 stratcon_ingest_find(ds_rt_detail *d) {
569   conn_q *cq;
570   char *val;
571   int row_count;
572   struct realtime_tracker *node;
573
574   for(node = d->rt; node; node = node->next) {
575     char uuid_str[UUID_STR_LEN+1];
576     const char *fqdn, *dsn, *remote_cn;
577     char remote_ip[32];
578     int storagenode_id;
579
580     uuid_unparse_lower(node->checkid, uuid_str);
581     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
582                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
583       continue;
584
585     mtevL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
586           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
587
588     /* We might be able to find the IP from our config if someone has
589      * specified the expected cn in the noit definition.
590      */
591     if(stratcon_find_noit_ip_by_cn(remote_cn,
592                                    remote_ip, sizeof(remote_ip)) == 0) {
593       node->noit = strdup(remote_ip);
594       mtevL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
595       continue;
596     }
597
598     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
599     if(stratcon_database_connect(cq) != 0) goto bad_row;
600
601     GET_QUERY(check_find);
602     DECLARE_PARAM_INT(node->sid);
603     PG_EXEC(check_find);
604     row_count = PQntuples(d->res);
605     if(row_count != 1) {
606       mtevL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
607       PQclear(d->res);
608       goto bad_row;
609     }
610
611     /* Get the remote_address (which noit owns this) */
612     PG_GET_STR_COL(val, 0, "remote_address");
613     if(!val) {
614       mtevL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
615       PQclear(d->res);
616       goto bad_row;
617     }
618     node->noit = strdup(val);
619     mtevL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
620    bad_row:
621     free_params((ds_single_detail *)d);
622     d->nparams = 0;
623     release_conn_q(cq);
624   }
625   return DS_EXEC_SUCCESS;
626 }
627 execute_outcome_t
628 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
629                         ds_line_detail *d) {
630   int type, len, sid;
631   char *final_buff;
632   uLong final_len, actual_final_len;
633   char *token;
634   char raddr_blank[1] = "";
635   const char *raddr;
636
637   type = d->data[0];
638   raddr = r ? r : raddr_blank;
639
640   /* Parse the log line, but only if we haven't already */
641   if(!d->nparams) {
642     char *scp, *ecp;
643
644     scp = d->data;
645 #define PROCESS_NEXT_FIELD(t,l) do { \
646   if(!*scp) goto bad_row; \
647   ecp = strchr(scp, '\t'); \
648   if(!ecp) goto bad_row; \
649   token = scp; \
650   len = (ecp-scp); \
651   scp = ecp + 1; \
652 } while(0)
653 #define PROCESS_LAST_FIELD(t,l) do { \
654   if(!*scp) ecp = scp; \
655   else { \
656     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
657     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
658   } \
659   t = scp; \
660   l = (ecp-scp); \
661 } while(0)
662
663     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
664     switch(type) {
665       /* See noit_check_log.c for log description */
666       case 'n':
667         DECLARE_PARAM_STR(raddr, strlen(raddr));
668         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
669         DECLARE_PARAM_STR("noitd",5); /* node_type */
670         PROCESS_NEXT_FIELD(token,len);
671         d->whence = (time_t)strtoul(token, NULL, 10);
672         DECLARE_PARAM_STR(token,len); /* timestamp */
673
674         /* This is the expected uncompressed len */
675         PROCESS_NEXT_FIELD(token,len);
676         final_len = atoi(token);
677         final_buff = malloc(final_len);
678         if(!final_buff) goto bad_row;
679  
680         /* The last token is b64 endoded and compressed.
681          * we need to decode it, declare it and then free it.
682          */
683         PROCESS_LAST_FIELD(token, len);
684         /* We can in-place decode this */
685         len = mtev_b64_decode((char *)token, len,
686                               (unsigned char *)token, len);
687         if(len <= 0) {
688           mtevL(noit_error, "noitd config base64 decoding error.\n");
689           free(final_buff);
690           goto bad_row;
691         }
692         actual_final_len = final_len;
693         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
694                               (unsigned char *)token, len)) {
695           mtevL(noit_error, "noitd config decompression failure.\n");
696           free(final_buff);
697           goto bad_row;
698         }
699         if(final_len != actual_final_len) {
700           mtevL(noit_error, "noitd config decompression error.\n");
701           free(final_buff);
702           goto bad_row;
703         }
704         DECLARE_PARAM_STR(final_buff, final_len);
705         free(final_buff);
706         break;
707       case 'D':
708         break;
709       case 'C':
710         DECLARE_PARAM_STR(raddr, strlen(raddr));
711         PROCESS_NEXT_FIELD(token,len);
712         DECLARE_PARAM_STR(token,len); /* timestamp */
713         d->whence = (time_t)strtoul(token, NULL, 10);
714         PROCESS_NEXT_FIELD(token, len);
715         /* uuid is last 36 bytes */
716         if(len > 36) { token += (len-36); len = 36; }
717         sid = uuid_to_sid(token, remote_cn);
718         if(sid == 0) goto bad_row;
719         DECLARE_PARAM_INT(sid); /* sid */
720         DECLARE_PARAM_STR(token,len); /* uuid */
721         PROCESS_NEXT_FIELD(token, len);
722         DECLARE_PARAM_STR(token,len); /* target */
723         PROCESS_NEXT_FIELD(token, len);
724         DECLARE_PARAM_STR(token,len); /* module */
725         PROCESS_LAST_FIELD(token, len);
726         DECLARE_PARAM_STR(token,len); /* name */
727         break;
728       case 'M':
729         PROCESS_NEXT_FIELD(token,len);
730         DECLARE_PARAM_STR(token,len); /* timestamp */
731         d->whence = (time_t)strtoul(token, NULL, 10);
732         PROCESS_NEXT_FIELD(token, len);
733         /* uuid is last 36 bytes */
734         if(len > 36) { token += (len-36); len = 36; }
735         sid = uuid_to_sid(token, remote_cn);
736         if(sid == 0) goto bad_row;
737         DECLARE_PARAM_INT(sid); /* sid */
738         PROCESS_NEXT_FIELD(token, len);
739         DECLARE_PARAM_STR(token,len); /* name */
740         PROCESS_NEXT_FIELD(token,len);
741         d->metric_type = *token;
742         PROCESS_LAST_FIELD(token,len);
743         DECLARE_PARAM_STR(token,len); /* value */
744         break;
745       case 'S':
746         PROCESS_NEXT_FIELD(token,len);
747         DECLARE_PARAM_STR(token,len); /* timestamp */
748         d->whence = (time_t)strtoul(token, NULL, 10);
749         PROCESS_NEXT_FIELD(token, len);
750         /* uuid is last 36 bytes */
751         if(len > 36) { token += (len-36); len = 36; }
752         sid = uuid_to_sid(token, remote_cn);
753         if(sid == 0) goto bad_row;
754         DECLARE_PARAM_INT(sid); /* sid */
755         PROCESS_NEXT_FIELD(token, len);
756         DECLARE_PARAM_STR(token,len); /* state */
757         PROCESS_NEXT_FIELD(token, len);
758         DECLARE_PARAM_STR(token,len); /* availability */
759         PROCESS_NEXT_FIELD(token, len);
760         DECLARE_PARAM_STR(token,len); /* duration */
761         PROCESS_LAST_FIELD(token,len);
762         DECLARE_PARAM_STR(token,len); /* status */
763         break;
764       default:
765         goto bad_row;
766     }
767
768   }
769
770   /* Now execute the query */
771   switch(type) {
772     case 'n':
773       GET_QUERY(config_insert);
774       PG_EXEC(config_insert);
775       PQclear(d->res);
776       break;
777     case 'C':
778       GET_QUERY(check_insert);
779       PG_TM_EXEC(check_insert, d->whence);
780       PQclear(d->res);
781       break;
782     case 'S':
783       GET_QUERY(status_insert);
784       PG_TM_EXEC(status_insert, d->whence);
785       PQclear(d->res);
786       break;
787     case 'D':
788       break;
789     case 'M':
790       switch(d->metric_type) {
791         case METRIC_INT32:
792         case METRIC_UINT32:
793         case METRIC_INT64:
794         case METRIC_UINT64:
795         case METRIC_DOUBLE:
796           GET_QUERY(metric_insert_numeric);
797           PG_TM_EXEC(metric_insert_numeric, d->whence);
798           PQclear(d->res);
799           break;
800         case METRIC_STRING:
801           GET_QUERY(metric_insert_text);
802           PG_TM_EXEC(metric_insert_text, d->whence);
803           PQclear(d->res);
804           break;
805         default:
806           goto bad_row;
807       }
808       break;
809     default:
810       /* should never get here */
811       goto bad_row;
812   }
813   return DS_EXEC_SUCCESS;
814  bad_row:
815   return DS_EXEC_ROW_FAILED;
816 }
817 static int
818 stratcon_database_post_connect(conn_q *cq) {
819   int rv = 0;
820   ds_single_detail _d = { 0 }, *d = &_d;
821   if(cq->fqdn) {
822     char *remote_str, *remote_cn;
823     /* This is the silly way we get null's in through our declare_param_str */
824     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
825     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
826     /* This is a storage node, it gets the storage node post_connect */
827     GET_QUERY(storage_post_connect);
828     rv = -1; /* now we're serious */
829     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
830     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
831     PG_EXEC(storage_post_connect);
832     PQclear(d->res);
833     rv = 0;
834   }
835   else {
836     /* Metanode post_connect */
837     GET_QUERY(metanode_post_connect);
838     rv = -1; /* now we're serious */
839     PG_EXEC(metanode_post_connect);
840     PQclear(d->res);
841     rv = 0;
842   }
843  bad_row:
844   free_params(d);
845   if(rv == -1) {
846     /* Post-connect intentions are serious and fatal */
847     PQfinish(cq->dbh);
848     cq->dbh = NULL;
849   }
850   return rv;
851 }
852 static int
853 stratcon_database_connect(conn_q *cq) {
854   char *dsn, dsn_meta[512];
855   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
856   const char *k, *v;
857   int klen;
858   mtev_hash_table *t;
859
860   dsn_meta[0] = '\0';
861   if(!cq->dsn) {
862     t = mtev_conf_get_hash(NULL, "/stratcon/database/dbconfig");
863     while(mtev_hash_next_str(t, &iter, &k, &klen, &v)) {
864       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
865       strlcat(dsn_meta, k, sizeof(dsn_meta));
866       strlcat(dsn_meta, "=", sizeof(dsn_meta));
867       strlcat(dsn_meta, v, sizeof(dsn_meta));
868     }
869     mtev_hash_destroy(t, free, free);
870     free(t);
871     dsn = dsn_meta;
872   }
873   else {
874     char options[32];
875     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
876     if(mtev_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
877                                options, sizeof(options))) {
878       strlcat(dsn_meta, " ", sizeof(dsn_meta));
879       strlcat(dsn_meta, "user", sizeof(dsn_meta));
880       strlcat(dsn_meta, "=", sizeof(dsn_meta));
881       strlcat(dsn_meta, options, sizeof(dsn_meta));
882     }
883     if(mtev_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
884                                options, sizeof(options))) {
885       strlcat(dsn_meta, " ", sizeof(dsn_meta));
886       strlcat(dsn_meta, "password", sizeof(dsn_meta));
887       strlcat(dsn_meta, "=", sizeof(dsn_meta));
888       strlcat(dsn_meta, options, sizeof(dsn_meta));
889     }
890     dsn = dsn_meta;
891   }
892
893   if(cq->dbh) {
894     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
895     PQreset(cq->dbh);
896     if(PQstatus(cq->dbh) != CONNECTION_OK) {
897       mtevL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
898             dsn, PQerrorMessage(cq->dbh));
899       return -1;
900     }
901     if(stratcon_database_post_connect(cq)) return -1;
902     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
903     mtevL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
904           dsn, PQerrorMessage(cq->dbh));
905     return -1;
906   }
907
908   cq->dbh = PQconnectdb(dsn);
909   if(!cq->dbh) return -1;
910   if(PQstatus(cq->dbh) != CONNECTION_OK) {
911     mtevL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
912           dsn, PQerrorMessage(cq->dbh));
913     return -1;
914   }
915   if(stratcon_database_post_connect(cq)) return -1;
916   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
917   mtevL(noit_error, "Error connection to database: '%s'\nError: %s\n",
918         dsn, PQerrorMessage(cq->dbh));
919   return -1;
920 }
921 static int
922 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
923                              const char *name) {
924   int rv = -1;
925   PGresult *res;
926   char cmd[128];
927   strlcpy(cmd, p, sizeof(cmd));
928   strlcat(cmd, name, sizeof(cmd));
929   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
930   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
931   PQclear(res);
932   return rv;
933 }
934 static int
935 stratcon_ingest_do(conn_q *cq, const char *cmd) {
936   PGresult *res;
937   int rv = -1;
938   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
939   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
940   PQclear(res);
941   return rv;
942 }
943 #define BUSTED(cq) do { \
944   PQfinish((cq)->dbh); \
945   (cq)->dbh = NULL; \
946   goto full_monty; \
947 } while(0)
948 #define SAVEPOINT(name) do { \
949   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
950   last_sp = current; \
951 } while(0)
952 #define ROLLBACK_TO_SAVEPOINT(name) do { \
953   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
954     BUSTED(cq); \
955   last_sp = NULL; \
956 } while(0)
957 #define RELEASE_SAVEPOINT(name) do { \
958   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
959     BUSTED(cq); \
960   last_sp = NULL; \
961 } while(0)
962
963 int
964 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
965                               struct timeval *now) {
966   ds_rt_detail *dsjd = closure;
967   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
968   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
969
970   assert(dsjd->rt);
971   stratcon_ingest_find(dsjd);
972   if(dsjd->completion_event)
973     eventer_add(dsjd->completion_event);
974
975   free_params((ds_single_detail *)dsjd);
976   free(dsjd);
977   return 0;
978 }
979 static void
980 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
981                                 eventer_t completion) {
982   eventer_t e;
983   conn_pool *cpool;
984   ds_rt_detail *rtdetail;
985
986   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
987   rtdetail = calloc(1, sizeof(*rtdetail));
988   rtdetail->rt = rt;
989   rtdetail->completion_event = completion;
990   e = eventer_alloc();
991   e->mask = EVENTER_ASYNCH;
992   e->callback = stratcon_ingest_asynch_lookup;
993   e->closure = rtdetail;
994   eventer_add_asynch(cpool->jobq, e);
995 }
996 static const char *
997 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
998   void *vinfo;
999   char *dsn = NULL, *fqdn = NULL;
1000   int found = 0;
1001   storagenode_info *info = NULL;
1002   pthread_mutex_lock(&storagenode_to_info_cache_lock);
1003   if(mtev_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
1004                         &vinfo)) {
1005     found = 1;
1006     info = vinfo;
1007   }
1008   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1009   if(found) {
1010     if(fqdn_out) *fqdn_out = info->fqdn;
1011     return info->dsn;
1012   }
1013
1014   if(!found && can_use_db) {
1015     ds_single_detail *d;
1016     conn_q *cq;
1017     int row_count;
1018     /* Look it up and store it */
1019     d = calloc(1, sizeof(*d));
1020     cq = get_conn_q_for_metanode();
1021     GET_QUERY(find_storage);
1022     DECLARE_PARAM_INT(id);
1023     PG_EXEC(find_storage);
1024     row_count = PQntuples(d->res);
1025     if(row_count) {
1026       PG_GET_STR_COL(dsn, 0, "dsn");
1027       PG_GET_STR_COL(fqdn, 0, "fqdn");
1028       fqdn = fqdn ? strdup(fqdn) : NULL;
1029       dsn = dsn ? strdup(dsn) : NULL;
1030     }
1031     PQclear(d->res);
1032    bad_row:
1033     free_params(d);
1034     free(d);
1035     release_conn_q(cq);
1036   }
1037   if(fqdn) {
1038     info = calloc(1, sizeof(*info));
1039     info->fqdn = fqdn;
1040     if(fqdn_out) *fqdn_out = info->fqdn;
1041     info->dsn = dsn;
1042     info->storagenode_id = id;
1043     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1044     mtev_hash_store(&storagenode_to_info_cache,
1045                     (void *)&info->storagenode_id, sizeof(int), info);
1046     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1047   }
1048   return info ? info->dsn : NULL;
1049 }
1050 static void
1051 expand_b_record(ds_line_detail **head, ds_line_detail **last,
1052                 const char *line, int len) {
1053   char **outrows;
1054   int i, cnt;
1055   ds_line_detail *next;
1056
1057   cnt = noit_check_log_b_to_sm(line, len, &outrows, 0);
1058   for(i=0;i<cnt;i++) {
1059     if(outrows[i] == NULL) continue;
1060     next = calloc(sizeof(*next), 1);
1061     next->data = outrows[i];
1062     if(!*head) *head = next;
1063     if(*last) (*last)->next = next;
1064     *last = next;
1065   }
1066   if(outrows) free(outrows);
1067 }
1068 static ds_line_detail *
1069 build_insert_batch(pg_interim_journal_t *ij) {
1070   int rv;
1071   off_t len;
1072   const char *buff, *cp, *lcp;
1073   struct stat st;
1074   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1075
1076   if(ij->fd < 0) {
1077     ij->fd = open(ij->filename, O_RDONLY);
1078     if(ij->fd < 0) {
1079       mtevL(noit_error, "Cannot open interim journal '%s': %s\n",
1080             ij->filename, strerror(errno));
1081       assert(ij->fd >= 0);
1082     }
1083   }
1084   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1085   if(rv == -1) {
1086       mtevL(noit_error, "Cannot stat interim journal '%s': %s\n",
1087             ij->filename, strerror(errno));
1088     assert(rv != -1);
1089   }
1090   len = st.st_size;
1091   if(len > 0) {
1092     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1093     if(buff == (void *)-1) {
1094       mtevL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1095             ij->filename, strerror(errno));
1096       assert(buff != (void *)-1);
1097     }
1098     lcp = buff;
1099     while(lcp < (buff + len) &&
1100           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1101       if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') {
1102       /* Bundle records are special and need to be expanded into
1103        * traditional records here
1104        */
1105         noit_compression_type_t ctype = NOIT_COMPRESS_NONE;
1106         switch(lcp[1]) {
1107           case '1': /* version 1 */
1108             ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */
1109           case '2': /* version 2 */
1110               expand_b_record(&head, &last, lcp, cp - lcp);
1111             break;
1112           default:
1113             mtevL(noit_error, "unknown bundle version %c\n", lcp[1]);
1114         }
1115       }
1116       else {
1117         next = calloc(1, sizeof(*next));
1118         next->data = malloc(cp - lcp + 1);
1119         memcpy(next->data, lcp, cp - lcp);
1120         next->data[cp - lcp] = '\0';
1121         if(!head) head = next;
1122         if(last) last->next = next;
1123         last = next;
1124       }
1125       lcp = cp + 1;
1126     }
1127     munmap((void *)buff, len);
1128   }
1129   close(ij->fd);
1130   return head;
1131 }
1132 static void
1133 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1134   unlink(ij->filename);
1135   free(ij->filename);
1136   if(ij->remote_str) free(ij->remote_str);
1137   if(ij->remote_cn) free(ij->remote_cn);
1138   if(ij->fqdn) free(ij->fqdn);
1139   free(ij);
1140 }
1141 static int
1142 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1143                                struct timeval *now) {
1144   int i, total, success, sp_total, sp_success;
1145   pg_interim_journal_t *ij;
1146   ds_line_detail *head = NULL, *current, *last_sp;
1147   const char *dsn;
1148   conn_q *cq;
1149   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1150   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1151
1152   ij = closure;
1153   if(ij->fqdn == NULL) {
1154     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1155     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1156   }
1157   else {
1158     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1159   }
1160   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1161                              ij->fqdn, dsn);
1162   mtevL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1163         ij->remote_str, ij->remote_cn, ij->fqdn);
1164  full_monty:
1165   /* Make sure we have a connection */
1166   i = 1;
1167   while(stratcon_database_connect(cq)) {
1168     mtevL(noit_error, "Error connecting to database: %s\n",
1169           ij->fqdn ? ij->fqdn : "(null)");
1170     sleep(i);
1171     i *= 2;
1172     i = MIN(i, 16);
1173   }
1174
1175   if(head == NULL) head = build_insert_batch(ij);
1176   mtevL(ds_deb, "Starting batch from %s/%s to %s\n",
1177         ij->remote_str ? ij->remote_str : "(null)",
1178         ij->remote_cn ? ij->remote_cn : "(null)",
1179         ij->fqdn ? ij->fqdn : "(null)");
1180   current = head;
1181   last_sp = NULL;
1182   total = success = sp_total = sp_success = 0;
1183   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1184   while(current) {
1185     execute_outcome_t rv;
1186     if(current->data) {
1187       if(!last_sp) {
1188         SAVEPOINT("batch");
1189         sp_success = success;
1190         sp_total = total;
1191       }
1192  
1193       if(current->problematic) {
1194         RELEASE_SAVEPOINT("batch");
1195         current = current->next;
1196         total++;
1197         continue;
1198       }
1199       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1200                                    current);
1201       switch(rv) {
1202         case DS_EXEC_SUCCESS:
1203           total++;
1204           success++;
1205           current = current->next;
1206           break;
1207         case DS_EXEC_ROW_FAILED:
1208           /* rollback to savepoint, mark this record as bad and start again */
1209           if(current->data[0] != 'n')
1210             mtevL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1211           current->problematic = 1;
1212           current = last_sp;
1213           success = sp_success;
1214           total = sp_total;
1215           ROLLBACK_TO_SAVEPOINT("batch");
1216           break;
1217         case DS_EXEC_TXN_FAILED:
1218           mtevL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1219           BUSTED(cq);
1220       }
1221     }
1222   }
1223   if(last_sp) RELEASE_SAVEPOINT("batch");
1224   if(stratcon_ingest_do(cq, "COMMIT")) {
1225     mtevL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1226     BUSTED(cq);
1227   }
1228   /* Cleanup the mess */
1229   while(head) {
1230     ds_line_detail *tofree;
1231     tofree = head;
1232     head = head->next;
1233     if(tofree->data) free(tofree->data);
1234     free_params((ds_single_detail *)tofree);
1235     free(tofree);
1236   }
1237   mtevL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1238         ij->remote_str ? ij->remote_str : "(null)",
1239         ij->remote_cn ? ij->remote_cn : "(null)",
1240         ij->fqdn ? ij->fqdn : "(null)", success, total);
1241   pg_interim_journal_remove(ij);
1242   release_conn_q(cq);
1243   return 0;
1244 }
1245 static int
1246 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1247                           int *sid_out, int *storagenode_id_out,
1248                           const char **remote_cn_out,
1249                           const char **fqdn_out, const char **dsn_out) {
1250   /* only called from the main thread -- no safety issues */
1251   void *vuuidinfo, *vinfo;
1252   uuid_info *uuidinfo;
1253   storagenode_info *info = NULL;
1254   char *fqdn = NULL;
1255   char *dsn = NULL;
1256   char *new_remote_cn = NULL;
1257   int storagenode_id = 0, sid = 0;
1258   if(!mtev_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1259                          &vuuidinfo)) {
1260     int row_count = 0;
1261     char *tmpint;
1262     ds_single_detail *d;
1263     conn_q *cq;
1264
1265     /* We can't do a database lookup without the remote_cn */
1266     if(!remote_cn) {
1267       if(stratcon_datastore_get_enabled()) {
1268         /* We have an authoritatively maintained cache, we don't do lookups */
1269         return -1;
1270       }
1271       else
1272         remote_cn = "[[null]]";
1273     }
1274
1275     d = calloc(1, sizeof(*d));
1276     cq = get_conn_q_for_metanode();
1277     if(stratcon_database_connect(cq) == 0) {
1278       /* Blocking call to service the cache miss */
1279       GET_QUERY(check_map);
1280       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1281       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1282       PG_EXEC(check_map);
1283       row_count = PQntuples(d->res);
1284       if(row_count != 1) {
1285         PQclear(d->res);
1286         goto bad_row;
1287       }
1288       PG_GET_STR_COL(tmpint, 0, "sid");
1289       if(!tmpint) {
1290         row_count = 0;
1291         PQclear(d->res);
1292         goto bad_row;
1293       }
1294       sid = atoi(tmpint);
1295       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1296       if(tmpint) storagenode_id = atoi(tmpint);
1297       PG_GET_STR_COL(fqdn, 0, "fqdn");
1298       PG_GET_STR_COL(dsn, 0, "dsn");
1299       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1300       fqdn = fqdn ? strdup(fqdn) : NULL;
1301       dsn = dsn ? strdup(dsn) : NULL;
1302       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1303       PQclear(d->res);
1304     }
1305    bad_row:
1306     free_params((ds_single_detail *)d);
1307     free(d);
1308     release_conn_q(cq);
1309     if(row_count != 1) {
1310       return -1;
1311     }
1312     /* Place in cache */
1313     uuidinfo = calloc(1, sizeof(*uuidinfo));
1314     uuidinfo->sid = sid;
1315     uuidinfo->uuid_str = strdup(uuid_str);
1316     uuidinfo->storagenode_id = storagenode_id;
1317     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1318     mtev_hash_store(&uuid_to_info_cache,
1319                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1320     /* Also, we may have just witnessed a new storage node, store it */
1321     if(storagenode_id) {
1322       int needs_free = 0;
1323       info = calloc(1, sizeof(*info));
1324       info->storagenode_id = storagenode_id;
1325       info->dsn = dsn ? strdup(dsn) : NULL;
1326       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1327       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1328       if(!mtev_hash_retrieve(&storagenode_to_info_cache,
1329                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1330         /* hack to save memory -- we *never* remove from these caches,
1331            so we can use the same fqdn value in the above cache for the key
1332            in the cache below -- (no strdup) */
1333         mtev_hash_store(&storagenode_to_info_cache,
1334                         (void *)&info->storagenode_id, sizeof(int), info);
1335       }
1336       else needs_free = 1;
1337       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1338       if(needs_free) {
1339         if(info->dsn) free(info->dsn);
1340         if(info->fqdn) free(info->fqdn);
1341         free(info);
1342       }
1343     }
1344   }
1345   else
1346     uuidinfo = vuuidinfo;
1347
1348   if(uuidinfo && uuidinfo->storagenode_id) {
1349     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1350       /* we don't have dsn and we actually want it */
1351       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1352       if(mtev_hash_retrieve(&storagenode_to_info_cache,
1353                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1354                             &vinfo))
1355         info = vinfo;
1356       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1357     }
1358   }
1359
1360   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1361   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1362   assert(uuidinfo);
1363   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1364   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1365   if(sid_out) *sid_out = uuidinfo->sid;
1366   if(fqdn) free(fqdn);
1367   if(dsn) free(dsn);
1368   if(new_remote_cn) free(new_remote_cn);
1369   return 0;
1370 }
1371 static int
1372 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1373   char uuid_str[UUID_STR_LEN+1];
1374   int sid = 0;
1375   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1376   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1377   return sid;
1378 }
1379
1380 static int
1381 stratcon_ingest_saveconfig() {
1382   int rv = -1;
1383   char *buff;
1384   ds_single_detail _d = { 0 }, *d = &_d;
1385   conn_q *cq;
1386   char ipv4_str[32];
1387   struct in_addr r, l;
1388
1389   r.s_addr = htonl((4 << 24) | (2 << 16) | (2 << 8) | 1);
1390   memset(&l, 0, sizeof(l));
1391   mtev_getip_ipv4(r, &l);
1392   /* Ignore the error.. what are we going to do anyway */
1393   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1394     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1395
1396   cq = get_conn_q_for_metanode();
1397
1398   if(stratcon_database_connect(cq) == 0) {
1399     char time_as_str[20];
1400     size_t len;
1401     buff = mtev_conf_xml_in_mem(&len);
1402     if(!buff) goto bad_row;
1403
1404     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1405     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1406     DECLARE_PARAM_STR("", 0);
1407     DECLARE_PARAM_STR("stratcond", 9);
1408     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1409     DECLARE_PARAM_STR(buff, len);
1410     free(buff);
1411
1412     GET_QUERY(config_insert);
1413     PG_EXEC(config_insert);
1414     PQclear(d->res);
1415     rv = 0;
1416
1417     bad_row:
1418       free_params(d);
1419   }
1420   release_conn_q(cq);
1421   return rv;
1422 }
1423
1424 static int
1425 stratcon_ingest_launch_file_ingestion(const char *path,
1426                                       const char *remote_str,
1427                                       const char *remote_cn,
1428                                       const char *id_str) {
1429   pg_interim_journal_t *ij;
1430   char pgfile[PATH_MAX];
1431   eventer_t ingest;
1432
1433   if(strcmp(path + strlen(path) - 3, ".pg")) {
1434     snprintf(pgfile, sizeof(pgfile), "%s.pg", path);
1435     if(link(path, pgfile) < 0 && errno != EEXIST) {
1436       mtevL(noit_error, "cannot link journal %s: %s\n", path, strerror(errno));
1437       return -1;
1438     }
1439   }
1440   else
1441     strlcpy(pgfile, path, sizeof(pgfile));
1442   ij = calloc(1, sizeof(*ij));
1443   ij->fd = open(pgfile, O_RDONLY);
1444   if(ij->fd < 0) {
1445     mtevL(noit_error, "cannot open journal '%s': %s\n",
1446           pgfile, strerror(errno));
1447     free(ij);
1448     return -1;
1449   }
1450   close(ij->fd);
1451   ij->fd = -1;
1452   ij->filename = strdup(pgfile);
1453   ij->remote_str = strdup(remote_str);
1454   ij->remote_cn = strdup(remote_cn);
1455   ij->storagenode_id = atoi(id_str);
1456   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1457                                        ij->fqdn);
1458   mtevL(noit_debug, "ingesting payload: %s\n", ij->filename);
1459   ingest = eventer_alloc();
1460   ingest->mask = EVENTER_ASYNCH;
1461   ingest->callback = stratcon_ingest_asynch_execute;
1462   ingest->closure = ij;
1463   eventer_add_asynch(ij->cpool->jobq, ingest);
1464   return 0;
1465 }
1466
1467 int
1468 stratcon_ingest_all_storagenode_info() {
1469   int i, cnt = 0;
1470   ds_single_detail _d = { 0 }, *d = &_d;
1471   conn_q *cq;
1472   cq = get_conn_q_for_metanode();
1473
1474   while(stratcon_database_connect(cq)) {
1475     mtevL(noit_error, "Error connecting to database\n");
1476     sleep(1);
1477   }
1478
1479   GET_QUERY(all_storage);
1480   PG_EXEC(all_storage);
1481   cnt = PQntuples(d->res);
1482   for(i=0; i<cnt; i++) {
1483     void *vinfo;
1484     char *tmpint, *fqdn, *dsn;
1485     int storagenode_id;
1486     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1487     storagenode_id = atoi(tmpint);
1488     PG_GET_STR_COL(fqdn, i, "fqdn");
1489     PG_GET_STR_COL(dsn, i, "dsn");
1490     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1491     storagenode_id = tmpint ? atoi(tmpint) : 0;
1492
1493     if(!mtev_hash_retrieve(&storagenode_to_info_cache,
1494                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1495       storagenode_info *info;
1496       info = calloc(1, sizeof(*info));
1497       info->storagenode_id = storagenode_id;
1498       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1499       info->dsn = dsn ? strdup(dsn) : NULL;
1500       mtev_hash_store(&storagenode_to_info_cache,
1501                       (void *)&info->storagenode_id, sizeof(int), info);
1502       mtevL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1503             info->storagenode_id,
1504             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1505     }
1506     mtev_watchdog_child_heartbeat();
1507   }
1508   PQclear(d->res);
1509  bad_row:
1510   free_params(d);
1511
1512   release_conn_q(cq);
1513   mtevL(noit_error, "Loaded %d storage nodes\n", cnt);
1514   return cnt;
1515 }
1516 int
1517 stratcon_ingest_all_check_info() {
1518   int i, cnt, loaded = 0;
1519   ds_single_detail _d = { 0 }, *d = &_d;
1520   conn_q *cq;
1521   cq = get_conn_q_for_metanode();
1522
1523   while(stratcon_database_connect(cq)) {
1524     mtevL(noit_error, "Error connecting to database\n");
1525     sleep(1);
1526   }
1527
1528   GET_QUERY(check_mapall);
1529   PG_EXEC(check_mapall);
1530   cnt = PQntuples(d->res);
1531   for(i=0; i<cnt; i++) {
1532     void *vinfo;
1533     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1534     int sid, storagenode_id;
1535     uuid_info *uuidinfo;
1536     PG_GET_STR_COL(uuid_str, i, "id");
1537     if(!uuid_str) continue;
1538     PG_GET_STR_COL(tmpint, i, "sid");
1539     if(!tmpint) continue;
1540     sid = atoi(tmpint);
1541     PG_GET_STR_COL(fqdn, i, "fqdn");
1542     PG_GET_STR_COL(dsn, i, "dsn");
1543     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1544     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1545     storagenode_id = tmpint ? atoi(tmpint) : 0;
1546
1547     uuidinfo = calloc(1, sizeof(*uuidinfo));
1548     uuidinfo->uuid_str = strdup(uuid_str);
1549     uuidinfo->remote_cn = strdup(remote_cn);
1550     uuidinfo->storagenode_id = storagenode_id;
1551     uuidinfo->sid = sid;
1552     mtev_hash_store(&uuid_to_info_cache,
1553                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1554     mtevL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1555           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1556     loaded++;
1557     if(!mtev_hash_retrieve(&storagenode_to_info_cache,
1558                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1559       storagenode_info *info;
1560       info = calloc(1, sizeof(*info));
1561       info->storagenode_id = storagenode_id;
1562       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1563       info->dsn = dsn ? strdup(dsn) : NULL;
1564       mtev_hash_store(&storagenode_to_info_cache,
1565                       (void *)&info->storagenode_id, sizeof(int), info);
1566       mtevL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1567             info->storagenode_id,
1568             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1569     }
1570     mtev_watchdog_child_heartbeat();
1571   }
1572   PQclear(d->res);
1573  bad_row:
1574   free_params(d);
1575
1576   release_conn_q(cq);
1577   mtevL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1578   return loaded;
1579 }
1580
1581 static char *
1582 stratcon_get_noit_config(const char *cn) {
1583   ds_single_detail *d;
1584   int row_count = 0;
1585   const char *xml = NULL;
1586   char *xmlcopy = NULL;
1587   conn_q *cq = NULL;
1588
1589   d = calloc(1, sizeof(*d));
1590   GET_QUERY(config_get);
1591   cq = get_conn_q_for_metanode();
1592   if(!cq) goto bad_row;
1593
1594   DECLARE_PARAM_STR(cn, cn ? strlen(cn) : 0);
1595   PG_EXEC(config_get);
1596   row_count = PQntuples(d->res);
1597   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1598
1599   if(xml) xmlcopy = strdup(xml);
1600
1601  bad_row:
1602   free_params((ds_single_detail *)d);
1603   d->nparams = 0;
1604   if(cq) release_conn_q(cq);
1605   free(d);
1606
1607   return xmlcopy;
1608 }
1609
1610 static ingestor_api_t postgres_ingestor_api = {
1611   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1612   .iep_check_preload = stratcon_ingest_iep_check_preload,
1613   .storage_node_lookup = storage_node_quick_lookup,
1614   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1615   .get_noit_config = stratcon_get_noit_config,
1616   .save_config = stratcon_ingest_saveconfig
1617 };
1618
1619 static int postgres_ingestor_config(mtev_dso_generic_t *self, mtev_hash_table *o) {
1620   return 0;
1621 }
1622 static int postgres_ingestor_onload(mtev_image_t *self) {
1623   return 0;
1624 }
1625 static int is_postgres_ingestor_file(const char *file) {
1626   mtev_watchdog_child_heartbeat();
1627   return (strlen(file) == 19 && !strcmp(file + 16, ".pg"));
1628 }
1629 static int postgres_ingestor_init(mtev_dso_generic_t *self) {
1630   stratcon_datastore_core_init();
1631   pthread_mutex_init(&ds_conns_lock, NULL);
1632   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1633   ds_err = mtev_log_stream_find("error/datastore");
1634   ds_deb = mtev_log_stream_find("debug/datastore");
1635   ds_pool_deb = mtev_log_stream_find("debug/datastore_pool");
1636   ingest_err = mtev_log_stream_find("error/ingest");
1637   if(!ds_err) ds_err = noit_error;
1638   if(!ingest_err) ingest_err = noit_error;
1639   if(!mtev_conf_get_string(NULL, "/stratcon/database/journal/path",
1640                            &basejpath)) {
1641     mtevL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1642     exit(-1);
1643   }
1644   stratcon_ingest_all_check_info();
1645   stratcon_ingest_all_storagenode_info();
1646   stratcon_ingest_sweep_journals(basejpath, is_postgres_ingestor_file,
1647                                  stratcon_ingest_launch_file_ingestion);
1648   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1649 }
1650
1651 mtev_dso_generic_t postgres_ingestor = {
1652   {
1653     .magic = MTEV_GENERIC_MAGIC,
1654     .version = MTEV_GENERIC_ABI_VERSION,
1655     .name = "postgres_ingestor",
1656     .description = "postgres drive for data ingestion",
1657     .xml_description = postgres_ingestor_xml_description,
1658     .onload = postgres_ingestor_onload,
1659   }, 
1660   postgres_ingestor_config,
1661   postgres_ingestor_init
1662 };
Note: See TracBrowser for help on using the browser.