root/src/modules/postgres_ingestor.c

Revision 4a653ea2ad1a5c2d4fa77037753a169291289a11, 53.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

make the postgres_ingestor work with the B messages

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "stratcon_datastore.h"
42 #include "stratcon_realtime_http.h"
43 #include "stratcon_iep.h"
44 #include "noit_conf.h"
45 #include "noit_check.h"
46 #include "noit_check_log_helpers.h"
47 #include "noit_rest.h"
48 #include <unistd.h>
49 #include <fcntl.h>
50 #include <netinet/in.h>
51 #include <sys/un.h>
52 #include <dirent.h>
53 #include <arpa/inet.h>
54 #include <sys/mman.h>
55 #include <libpq-fe.h>
56 #include <zlib.h>
57 #include <assert.h>
58 #include <errno.h>
59 #include "postgres_ingestor.xmlh"
60 #include "bundle.pb-c.h"
61
62 #define DECL_STMT(codename,confname) \
63 static char *codename = NULL; \
64 static const char *codename##_conf = "/stratcon/database/statements/" #confname
65
66 DECL_STMT(storage_post_connect, storagepostconnect);
67 DECL_STMT(metanode_post_connect, metanodepostconnect);
68 DECL_STMT(find_storage, findstoragenode);
69 DECL_STMT(all_storage, allstoragenodes);
70 DECL_STMT(check_map, mapchecktostoragenode);
71 DECL_STMT(check_mapall, mapallchecks);
72 DECL_STMT(check_loadall, allchecks);
73 DECL_STMT(check_find, findcheck);
74 DECL_STMT(check_insert, check);
75 DECL_STMT(status_insert, status);
76 DECL_STMT(metric_insert_numeric, metric_numeric);
77 DECL_STMT(metric_insert_text, metric_text);
78 DECL_STMT(config_insert, config);
79 DECL_STMT(config_get, findconfig);
80
81 static noit_log_stream_t ds_err = NULL;
82 static noit_log_stream_t ds_deb = NULL;
83 static noit_log_stream_t ds_pool_deb = NULL;
84 static noit_log_stream_t ingest_err = NULL;
85
86 #define GET_QUERY(a) do { \
87   if(a == NULL) \
88     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
89       goto bad_row; \
90 } while(0)
91
92 struct conn_q;
93
94 typedef struct {
95   char            *queue_name; /* the key fqdn+remote_sn */
96   eventer_jobq_t  *jobq;
97   struct conn_q   *head;
98   pthread_mutex_t  lock;
99   pthread_cond_t   cv;
100   int              ttl;
101   int              in_pool;
102   int              outstanding;
103   int              max_allocated;
104   int              max_in_pool;
105 } conn_pool;
106 typedef struct conn_q {
107   time_t           last_use;
108   char            *dsn;        /* Pg connect string */
109   char            *remote_str; /* the IP of the noit*/
110   char            *remote_cn;  /* the Cert CN of the noit */
111   char            *fqdn;       /* the fqdn of the storage node */
112   conn_pool       *pool;
113   struct conn_q   *next;
114   /* Postgres specific stuff */
115   PGconn          *dbh;
116 } conn_q;
117
118
119 #define MAX_PARAMS 8
120 #define POSTGRES_PARTS \
121   PGresult *res; \
122   int rv; \
123   time_t whence; \
124   int nparams; \
125   int metric_type; \
126   char *paramValues[MAX_PARAMS]; \
127   int paramLengths[MAX_PARAMS]; \
128   int paramFormats[MAX_PARAMS]; \
129   int paramAllocd[MAX_PARAMS];
130
131 typedef struct ds_single_detail {
132   POSTGRES_PARTS
133 } ds_single_detail;
134 typedef struct {
135   /* Postgres specific stuff */
136   POSTGRES_PARTS
137   struct realtime_tracker *rt;
138   conn_q *cq; /* connection on which to perform this job */
139   eventer_t completion_event; /* This event should be registered if non NULL */
140 } ds_rt_detail;
141 typedef struct ds_line_detail {
142   /* Postgres specific stuff */
143   POSTGRES_PARTS
144   char *data;
145   int problematic;
146   struct ds_line_detail *next;
147 } ds_line_detail;
148
149 typedef struct {
150   char *remote_str;
151   char *remote_cn;
152   char *fqdn;
153   int storagenode_id;
154   int fd;
155   char *filename;
156   conn_pool *cpool;
157 } pg_interim_journal_t;
158
159 static int stratcon_database_connect(conn_q *cq);
160 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
161 static int storage_node_quick_lookup(const char *uuid_str,
162                                      const char *remote_cn,
163                                      int *sid_out, int *storagenode_id_out,
164                                      const char **remote_cn_out,
165                                      const char **fqdn_out,
166                                      const char **dsn_out);
167
168 static void
169 free_params(ds_single_detail *d) {
170   int i;
171   for(i=0; i<d->nparams; i++)
172     if(d->paramAllocd[i] && d->paramValues[i])
173       free(d->paramValues[i]);
174 }
175
176 static char *basejpath = NULL;
177 static pthread_mutex_t ds_conns_lock;
178 static noit_hash_table ds_conns;
179
180 /* the fqdn cache needs to be thread safe */
181 typedef struct {
182   char *uuid_str;
183   char *remote_cn;
184   int storagenode_id;
185   int sid;
186 } uuid_info;
187 typedef struct {
188   int storagenode_id;
189   char *fqdn;
190   char *dsn;
191 } storagenode_info;
192 noit_hash_table uuid_to_info_cache;
193 pthread_mutex_t storagenode_to_info_cache_lock;
194 noit_hash_table storagenode_to_info_cache;
195
196 /* Thread-safe connection pools */
197
198 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
199 static void
200 release_conn_q_forceable(conn_q *cq, int forcefree) {
201   int putback = 0;
202   cq->last_use = time(NULL);
203   pthread_mutex_lock(&cq->pool->lock);
204   cq->pool->outstanding--;
205   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
206     putback = 1;
207     cq->next = cq->pool->head;
208     cq->pool->head = cq;
209     cq->pool->in_pool++;
210   }
211   pthread_mutex_unlock(&cq->pool->lock);
212   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
213         putback ? "to pool" : "and destroy", cq->pool->queue_name);
214   pthread_cond_signal(&cq->pool->cv);
215   if(putback) return;
216
217   /* Not put back, release it */
218   if(cq->dbh) PQfinish(cq->dbh);
219   if(cq->remote_str) free(cq->remote_str);
220   if(cq->remote_cn) free(cq->remote_cn);
221   if(cq->fqdn) free(cq->fqdn);
222   if(cq->dsn) free(cq->dsn);
223   free(cq);
224 }
225 static void
226 ttl_purge_conn_pool(conn_pool *pool) {
227   int old_cnt, new_cnt;
228   time_t now = time(NULL);
229   conn_q *cq, *prev = NULL, *iter;
230   /* because we always replace on the head and update the last_use time when
231      doing so, we know they are ordered LRU on the end.  So, once we hit an
232      old one, we know all the others are old too.
233    */
234   if(!pool->head) return; /* hack short circuit for no locks */
235   pthread_mutex_lock(&pool->lock);
236   old_cnt = pool->in_pool;
237   cq = pool->head;
238   while(cq) {
239     if(cq->last_use + cq->pool->ttl < now) {
240       if(prev) prev->next = NULL;
241       else pool->head = NULL;
242       break;
243     }
244     prev = cq;
245     cq = cq->next;
246   }
247   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
248   /* Fix accounting */
249   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
250   new_cnt = pool->in_pool;
251   pthread_mutex_unlock(&pool->lock);
252
253   /* Force release these without holding the lock */
254   while(cq) {
255     cq = cq->next;
256     release_conn_q_forceable(cq, 1);
257   }
258   if(old_cnt != new_cnt)
259     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
260           pool->queue_name);
261 }
262 static void
263 release_conn_q(conn_q *cq) {
264   ttl_purge_conn_pool(cq->pool);
265   release_conn_q_forceable(cq, 0);
266 }
267 static conn_pool *
268 get_conn_pool_for_remote(const char *remote_str,
269                          const char *remote_cn, const char *fqdn) {
270   void *vcpool;
271   conn_pool *cpool = NULL;
272   char queue_name[256] = "datastore_";
273   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
274            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
275            fqdn ? fqdn : "default",
276            remote_cn ? remote_cn : "default");
277   pthread_mutex_lock(&ds_conns_lock);
278   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
279                         strlen(queue_name), &vcpool))
280     cpool = vcpool;
281   pthread_mutex_unlock(&ds_conns_lock);
282   if(!cpool) {
283     vcpool = cpool = calloc(1, sizeof(*cpool));
284     cpool->queue_name = strdup(queue_name);
285     pthread_mutex_init(&cpool->lock, NULL);
286     pthread_cond_init(&cpool->cv, NULL);
287     cpool->in_pool = 0;
288     cpool->outstanding = 0;
289     cpool->max_in_pool = 1;
290     cpool->max_allocated = 1;
291     pthread_mutex_lock(&ds_conns_lock);
292     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
293                         cpool)) {
294       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
295                          strlen(queue_name), &vcpool);
296     }
297     pthread_mutex_unlock(&ds_conns_lock);
298     if(vcpool != cpool) {
299       /* someone beat us to it */
300       free(cpool->queue_name);
301       pthread_mutex_destroy(&cpool->lock);
302       pthread_cond_destroy(&cpool->cv);
303       free(cpool);
304     }
305     else {
306       int i;
307       /* Our job to setup the pool */
308       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
309       eventer_jobq_init(cpool->jobq, queue_name);
310       cpool->jobq->backq = eventer_default_backq();
311       /* Add one thread */
312       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
313         eventer_jobq_increase_concurrency(cpool->jobq);
314     }
315     cpool = vcpool;
316   }
317   return cpool;
318 }
319 static conn_q *
320 get_conn_q_for_remote(const char *remote_str,
321                       const char *remote_cn, const char *fqdn,
322                       const char *dsn) {
323   conn_pool *cpool;
324   conn_q *cq;
325   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
326   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
327         cpool->queue_name);
328   pthread_mutex_lock(&cpool->lock);
329  again:
330   if(cpool->head) {
331     assert(cpool->in_pool > 0);
332     cq = cpool->head;
333     cpool->head = cq->next;
334     cpool->in_pool--;
335     cpool->outstanding++;
336     cq->next = NULL;
337     pthread_mutex_unlock(&cpool->lock);
338     return cq;
339   }
340   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
341     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
342           (void *)pthread_self(), cpool->queue_name);
343     pthread_cond_wait(&cpool->cv, &cpool->lock);
344     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
345           (void *)pthread_self(), cpool->queue_name);
346     goto again;
347   }
348   else {
349     cpool->outstanding++;
350     pthread_mutex_unlock(&cpool->lock);
351   }
352  
353   cq = calloc(1, sizeof(*cq));
354   cq->pool = cpool;
355   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
356   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
357   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
358   cq->dsn = dsn ? strdup(dsn) : NULL;
359   return cq;
360 }
361 static conn_q *
362 get_conn_q_for_metanode() {
363   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
364 }
365
366 typedef enum {
367   DS_EXEC_SUCCESS = 0,
368   DS_EXEC_ROW_FAILED = 1,
369   DS_EXEC_TXN_FAILED = 2,
370 } execute_outcome_t;
371
372 #define DECLARE_PARAM_STR(str, len) do { \
373   d->paramValues[d->nparams] = noit__strndup(str, len); \
374   d->paramLengths[d->nparams] = len; \
375   d->paramFormats[d->nparams] = 0; \
376   d->paramAllocd[d->nparams] = 1; \
377   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
378     free(d->paramValues[d->nparams]); \
379     d->paramValues[d->nparams] = NULL; \
380     d->paramLengths[d->nparams] = 0; \
381     d->paramAllocd[d->nparams] = 0; \
382   } \
383   d->nparams++; \
384 } while(0)
385 #define DECLARE_PARAM_INT(i) do { \
386   int buffer__len; \
387   char buffer__[32]; \
388   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
389   buffer__len = strlen(buffer__); \
390   DECLARE_PARAM_STR(buffer__, buffer__len); \
391 } while(0)
392
393 #define PG_GET_STR_COL(dest, row, name) do { \
394   int colnum = PQfnumber(d->res, name); \
395   dest = NULL; \
396   if (colnum >= 0) \
397     dest = PQgetisnull(d->res, row, colnum) \
398          ? NULL : PQgetvalue(d->res, row, colnum); \
399 } while(0)
400
401 #define PG_EXEC(cmd) do { \
402   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
403                         (const char * const *)d->paramValues, \
404                         d->paramLengths, d->paramFormats, 0); \
405   d->rv = PQresultStatus(d->res); \
406   if(d->rv != PGRES_COMMAND_OK && \
407      d->rv != PGRES_TUPLES_OK) { \
408     const char *pgerr = PQresultErrorMessage(d->res); \
409     const char *pgerr_end = strchr(pgerr, '\n'); \
410     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
411     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
412           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
413           (int)(pgerr_end - pgerr), pgerr); \
414     PQclear(d->res); \
415     goto bad_row; \
416   } \
417 } while(0)
418
419 #define PG_TM_EXEC(cmd, whence) do { \
420   time_t __w = whence; \
421   char cmdbuf[4096]; \
422   struct tm tbuf, *tm; \
423   tm = gmtime_r(&__w, &tbuf); \
424   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
425   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
426                         (const char * const *)d->paramValues, \
427                         d->paramLengths, d->paramFormats, 0); \
428   d->rv = PQresultStatus(d->res); \
429   if(d->rv != PGRES_COMMAND_OK && \
430      d->rv != PGRES_TUPLES_OK) { \
431     const char *pgerr = PQresultErrorMessage(d->res); \
432     const char *pgerr_end = strchr(pgerr, '\n'); \
433     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
434     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
435           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
436           (long long unsigned)whence); \
437     PQclear(d->res); \
438     goto bad_row; \
439   } \
440 } while(0)
441
442 static void *
443 stratcon_ingest_check_loadall(void *vsn) {
444   storagenode_info *sn = vsn;
445   ds_single_detail *d;
446   int i, row_count = 0, good = 0;
447   char buff[1024];
448   conn_q *cq = NULL;
449
450   d = calloc(1, sizeof(*d));
451   GET_QUERY(check_loadall);
452   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
453   i = 0;
454   while(stratcon_database_connect(cq)) {
455     if(i++ > 4) {
456       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
457       release_conn_q(cq);
458       return (void *)(vpsized_int)good;
459     }
460     sleep(1);
461   }
462   PG_EXEC(check_loadall);
463   row_count = PQntuples(d->res);
464  
465   for(i=0; i<row_count; i++) {
466     int rv;
467     int8_t family;
468     struct sockaddr *sin;
469     struct sockaddr_in sin4 = { .sin_family = AF_INET };
470     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
471     char *remote, *id, *target, *module, *name;
472     PG_GET_STR_COL(remote, i, "remote_address");
473     PG_GET_STR_COL(id, i, "id");
474     PG_GET_STR_COL(target, i, "target");
475     PG_GET_STR_COL(module, i, "module");
476     PG_GET_STR_COL(name, i, "name");
477     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
478
479     family = AF_INET;
480     sin = (struct sockaddr *)&sin4;
481     rv = inet_pton(family, remote, &sin4.sin_addr);
482     if(rv != 1) {
483       family = AF_INET6;
484       sin = (struct sockaddr *)&sin6;
485       rv = inet_pton(family, remote, &sin6.sin6_addr);
486       if(rv != 1) {
487         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
488         sin = NULL;
489       }
490     }
491
492     /* stratcon_iep_line_processor takes an allocated operand and frees it */
493     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
494     good++;
495   }
496   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
497         good, row_count, sn->fqdn);
498  bad_row:
499   free_params((ds_single_detail *)d);
500   free(d);
501   if(cq) release_conn_q(cq);
502   return (void *)(vpsized_int)good;
503 }
504 static int
505 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
506                                  struct timeval *now) {
507   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
508   pthread_t *jobs = NULL;
509   int nodes, i = 0, tcnt = 0;
510   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
511   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
512
513   pthread_mutex_lock(&storagenode_to_info_cache_lock);
514   nodes = storagenode_to_info_cache.size;
515   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
516   sns = calloc(MAX(1,nodes), sizeof(*sns));
517   if(nodes == 0) sns[nodes++] = &self;
518   else {
519     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
520     const char *k;
521     void *v;
522     int klen;
523     while(noit_hash_next(&storagenode_to_info_cache,
524                          &iter, &k, &klen, &v)) {
525       sns[i++] = (storagenode_info *)v;
526     }
527   }
528   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
529
530   for(i=0; i<nodes; i++) {
531     if(pthread_create(&jobs[i], NULL,
532                       stratcon_ingest_check_loadall, sns[i]) != 0) {
533       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
534     }
535   }
536   for(i=0; i<nodes; i++) {
537     void *good;
538     pthread_join(jobs[i], &good);
539     tcnt += (int)(vpsized_int)good;
540   }
541   free(jobs);
542   free(sns);
543   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
544   return 0;
545 }
546 static void
547 stratcon_ingest_iep_check_preload() {
548   eventer_t e;
549   conn_pool *cpool;
550
551   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
552   e = eventer_alloc();
553   e->mask = EVENTER_ASYNCH;
554   e->callback = stratcon_ingest_asynch_drive_iep;
555   e->closure = NULL;
556   eventer_add_asynch(cpool->jobq, e);
557 }
558 execute_outcome_t
559 stratcon_ingest_find(ds_rt_detail *d) {
560   conn_q *cq;
561   char *val;
562   int row_count;
563   struct realtime_tracker *node;
564
565   for(node = d->rt; node; node = node->next) {
566     char uuid_str[UUID_STR_LEN+1];
567     const char *fqdn, *dsn, *remote_cn;
568     char remote_ip[32];
569     int storagenode_id;
570
571     uuid_unparse_lower(node->checkid, uuid_str);
572     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
573                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
574       continue;
575
576     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
577           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
578
579     /* We might be able to find the IP from our config if someone has
580      * specified the expected cn in the noit definition.
581      */
582     if(stratcon_find_noit_ip_by_cn(remote_cn,
583                                    remote_ip, sizeof(remote_ip)) == 0) {
584       node->noit = strdup(remote_ip);
585       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
586       continue;
587     }
588
589     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
590     stratcon_database_connect(cq);
591
592     GET_QUERY(check_find);
593     DECLARE_PARAM_INT(node->sid);
594     PG_EXEC(check_find);
595     row_count = PQntuples(d->res);
596     if(row_count != 1) {
597       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
598       PQclear(d->res);
599       goto bad_row;
600     }
601
602     /* Get the remote_address (which noit owns this) */
603     PG_GET_STR_COL(val, 0, "remote_address");
604     if(!val) {
605       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
606       PQclear(d->res);
607       goto bad_row;
608     }
609     node->noit = strdup(val);
610     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
611    bad_row:
612     free_params((ds_single_detail *)d);
613     d->nparams = 0;
614     release_conn_q(cq);
615   }
616   return DS_EXEC_SUCCESS;
617 }
618 execute_outcome_t
619 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
620                         ds_line_detail *d) {
621   int type, len, sid;
622   char *final_buff;
623   uLong final_len, actual_final_len;
624   char *token;
625   char raddr_blank[1] = "";
626   const char *raddr;
627
628   type = d->data[0];
629   raddr = r ? r : raddr_blank;
630
631   /* Parse the log line, but only if we haven't already */
632   if(!d->nparams) {
633     char *scp, *ecp;
634
635     scp = d->data;
636 #define PROCESS_NEXT_FIELD(t,l) do { \
637   if(!*scp) goto bad_row; \
638   ecp = strchr(scp, '\t'); \
639   if(!ecp) goto bad_row; \
640   token = scp; \
641   len = (ecp-scp); \
642   scp = ecp + 1; \
643 } while(0)
644 #define PROCESS_LAST_FIELD(t,l) do { \
645   if(!*scp) ecp = scp; \
646   else { \
647     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
648     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
649   } \
650   t = scp; \
651   l = (ecp-scp); \
652 } while(0)
653
654     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
655     switch(type) {
656       /* See noit_check_log.c for log description */
657       case 'n':
658         DECLARE_PARAM_STR(raddr, strlen(raddr));
659         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
660         DECLARE_PARAM_STR("noitd",5); /* node_type */
661         PROCESS_NEXT_FIELD(token,len);
662         d->whence = (time_t)strtoul(token, NULL, 10);
663         DECLARE_PARAM_STR(token,len); /* timestamp */
664
665         /* This is the expected uncompressed len */
666         PROCESS_NEXT_FIELD(token,len);
667         final_len = atoi(token);
668         final_buff = malloc(final_len);
669         if(!final_buff) goto bad_row;
670  
671         /* The last token is b64 endoded and compressed.
672          * we need to decode it, declare it and then free it.
673          */
674         PROCESS_LAST_FIELD(token, len);
675         /* We can in-place decode this */
676         len = noit_b64_decode((char *)token, len,
677                               (unsigned char *)token, len);
678         if(len <= 0) {
679           noitL(noit_error, "noitd config base64 decoding error.\n");
680           free(final_buff);
681           goto bad_row;
682         }
683         actual_final_len = final_len;
684         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
685                               (unsigned char *)token, len)) {
686           noitL(noit_error, "noitd config decompression failure.\n");
687           free(final_buff);
688           goto bad_row;
689         }
690         if(final_len != actual_final_len) {
691           noitL(noit_error, "noitd config decompression error.\n");
692           free(final_buff);
693           goto bad_row;
694         }
695         DECLARE_PARAM_STR(final_buff, final_len);
696         free(final_buff);
697         break;
698       case 'D':
699         break;
700       case 'C':
701         DECLARE_PARAM_STR(raddr, strlen(raddr));
702         PROCESS_NEXT_FIELD(token,len);
703         DECLARE_PARAM_STR(token,len); /* timestamp */
704         d->whence = (time_t)strtoul(token, NULL, 10);
705         PROCESS_NEXT_FIELD(token, len);
706         /* uuid is last 36 bytes */
707         if(len > 36) { token += (len-36); len = 36; }
708         sid = uuid_to_sid(token, remote_cn);
709         if(sid == 0) goto bad_row;
710         DECLARE_PARAM_INT(sid); /* sid */
711         DECLARE_PARAM_STR(token,len); /* uuid */
712         PROCESS_NEXT_FIELD(token, len);
713         DECLARE_PARAM_STR(token,len); /* target */
714         PROCESS_NEXT_FIELD(token, len);
715         DECLARE_PARAM_STR(token,len); /* module */
716         PROCESS_LAST_FIELD(token, len);
717         DECLARE_PARAM_STR(token,len); /* name */
718         break;
719       case 'M':
720         PROCESS_NEXT_FIELD(token,len);
721         DECLARE_PARAM_STR(token,len); /* timestamp */
722         d->whence = (time_t)strtoul(token, NULL, 10);
723         PROCESS_NEXT_FIELD(token, len);
724         /* uuid is last 36 bytes */
725         if(len > 36) { token += (len-36); len = 36; }
726         sid = uuid_to_sid(token, remote_cn);
727         if(sid == 0) goto bad_row;
728         DECLARE_PARAM_INT(sid); /* sid */
729         PROCESS_NEXT_FIELD(token, len);
730         DECLARE_PARAM_STR(token,len); /* name */
731         PROCESS_NEXT_FIELD(token,len);
732         d->metric_type = *token;
733         PROCESS_LAST_FIELD(token,len);
734         DECLARE_PARAM_STR(token,len); /* value */
735         break;
736       case 'S':
737         PROCESS_NEXT_FIELD(token,len);
738         DECLARE_PARAM_STR(token,len); /* timestamp */
739         d->whence = (time_t)strtoul(token, NULL, 10);
740         PROCESS_NEXT_FIELD(token, len);
741         /* uuid is last 36 bytes */
742         if(len > 36) { token += (len-36); len = 36; }
743         sid = uuid_to_sid(token, remote_cn);
744         if(sid == 0) goto bad_row;
745         DECLARE_PARAM_INT(sid); /* sid */
746         PROCESS_NEXT_FIELD(token, len);
747         DECLARE_PARAM_STR(token,len); /* state */
748         PROCESS_NEXT_FIELD(token, len);
749         DECLARE_PARAM_STR(token,len); /* availability */
750         PROCESS_NEXT_FIELD(token, len);
751         DECLARE_PARAM_STR(token,len); /* duration */
752         PROCESS_LAST_FIELD(token,len);
753         DECLARE_PARAM_STR(token,len); /* status */
754         break;
755       default:
756         goto bad_row;
757     }
758
759   }
760
761   /* Now execute the query */
762   switch(type) {
763     case 'n':
764       GET_QUERY(config_insert);
765       PG_EXEC(config_insert);
766       PQclear(d->res);
767       break;
768     case 'C':
769       GET_QUERY(check_insert);
770       PG_TM_EXEC(check_insert, d->whence);
771       PQclear(d->res);
772       break;
773     case 'S':
774       GET_QUERY(status_insert);
775       PG_TM_EXEC(status_insert, d->whence);
776       PQclear(d->res);
777       break;
778     case 'D':
779       break;
780     case 'M':
781       switch(d->metric_type) {
782         case METRIC_INT32:
783         case METRIC_UINT32:
784         case METRIC_INT64:
785         case METRIC_UINT64:
786         case METRIC_DOUBLE:
787           GET_QUERY(metric_insert_numeric);
788           PG_TM_EXEC(metric_insert_numeric, d->whence);
789           PQclear(d->res);
790           break;
791         case METRIC_STRING:
792           GET_QUERY(metric_insert_text);
793           PG_TM_EXEC(metric_insert_text, d->whence);
794           PQclear(d->res);
795           break;
796         default:
797           goto bad_row;
798       }
799       break;
800     default:
801       /* should never get here */
802       goto bad_row;
803   }
804   return DS_EXEC_SUCCESS;
805  bad_row:
806   return DS_EXEC_ROW_FAILED;
807 }
808 static int
809 stratcon_database_post_connect(conn_q *cq) {
810   int rv = 0;
811   ds_single_detail _d = { 0 }, *d = &_d;
812   if(cq->fqdn) {
813     char *remote_str, *remote_cn;
814     /* This is the silly way we get null's in through our declare_param_str */
815     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
816     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
817     /* This is a storage node, it gets the storage node post_connect */
818     GET_QUERY(storage_post_connect);
819     rv = -1; /* now we're serious */
820     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
821     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
822     PG_EXEC(storage_post_connect);
823     PQclear(d->res);
824     rv = 0;
825   }
826   else {
827     /* Metanode post_connect */
828     GET_QUERY(metanode_post_connect);
829     rv = -1; /* now we're serious */
830     PG_EXEC(metanode_post_connect);
831     PQclear(d->res);
832     rv = 0;
833   }
834  bad_row:
835   free_params(d);
836   if(rv == -1) {
837     /* Post-connect intentions are serious and fatal */
838     PQfinish(cq->dbh);
839     cq->dbh = NULL;
840   }
841   return rv;
842 }
843 static int
844 stratcon_database_connect(conn_q *cq) {
845   char *dsn, dsn_meta[512];
846   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
847   const char *k, *v;
848   int klen;
849   noit_hash_table *t;
850
851   dsn_meta[0] = '\0';
852   if(!cq->dsn) {
853     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
854     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
855       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
856       strlcat(dsn_meta, k, sizeof(dsn_meta));
857       strlcat(dsn_meta, "=", sizeof(dsn_meta));
858       strlcat(dsn_meta, v, sizeof(dsn_meta));
859     }
860     noit_hash_destroy(t, free, free);
861     free(t);
862     dsn = dsn_meta;
863   }
864   else {
865     char options[32];
866     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
867     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
868                                options, sizeof(options))) {
869       strlcat(dsn_meta, " ", sizeof(dsn_meta));
870       strlcat(dsn_meta, "user", sizeof(dsn_meta));
871       strlcat(dsn_meta, "=", sizeof(dsn_meta));
872       strlcat(dsn_meta, options, sizeof(dsn_meta));
873     }
874     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
875                                options, sizeof(options))) {
876       strlcat(dsn_meta, " ", sizeof(dsn_meta));
877       strlcat(dsn_meta, "password", sizeof(dsn_meta));
878       strlcat(dsn_meta, "=", sizeof(dsn_meta));
879       strlcat(dsn_meta, options, sizeof(dsn_meta));
880     }
881     dsn = dsn_meta;
882   }
883
884   if(cq->dbh) {
885     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
886     PQreset(cq->dbh);
887     if(PQstatus(cq->dbh) != CONNECTION_OK) {
888       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
889             dsn, PQerrorMessage(cq->dbh));
890       return -1;
891     }
892     if(stratcon_database_post_connect(cq)) return -1;
893     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
894     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
895           dsn, PQerrorMessage(cq->dbh));
896     return -1;
897   }
898
899   cq->dbh = PQconnectdb(dsn);
900   if(!cq->dbh) return -1;
901   if(PQstatus(cq->dbh) != CONNECTION_OK) {
902     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
903           dsn, PQerrorMessage(cq->dbh));
904     return -1;
905   }
906   if(stratcon_database_post_connect(cq)) return -1;
907   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
908   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
909         dsn, PQerrorMessage(cq->dbh));
910   return -1;
911 }
912 static int
913 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
914                              const char *name) {
915   int rv = -1;
916   PGresult *res;
917   char cmd[128];
918   strlcpy(cmd, p, sizeof(cmd));
919   strlcat(cmd, name, sizeof(cmd));
920   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
921   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
922   PQclear(res);
923   return rv;
924 }
925 static int
926 stratcon_ingest_do(conn_q *cq, const char *cmd) {
927   PGresult *res;
928   int rv = -1;
929   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
930   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
931   PQclear(res);
932   return rv;
933 }
934 #define BUSTED(cq) do { \
935   PQfinish((cq)->dbh); \
936   (cq)->dbh = NULL; \
937   goto full_monty; \
938 } while(0)
939 #define SAVEPOINT(name) do { \
940   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
941   last_sp = current; \
942 } while(0)
943 #define ROLLBACK_TO_SAVEPOINT(name) do { \
944   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
945     BUSTED(cq); \
946   last_sp = NULL; \
947 } while(0)
948 #define RELEASE_SAVEPOINT(name) do { \
949   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
950     BUSTED(cq); \
951   last_sp = NULL; \
952 } while(0)
953
954 int
955 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
956                               struct timeval *now) {
957   ds_rt_detail *dsjd = closure;
958   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
959   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
960
961   assert(dsjd->rt);
962   stratcon_ingest_find(dsjd);
963   if(dsjd->completion_event)
964     eventer_add(dsjd->completion_event);
965
966   free_params((ds_single_detail *)dsjd);
967   free(dsjd);
968   return 0;
969 }
970 static void
971 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
972                                 eventer_t completion) {
973   eventer_t e;
974   conn_pool *cpool;
975   ds_rt_detail *rtdetail;
976
977   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
978   rtdetail = calloc(1, sizeof(*rtdetail));
979   rtdetail->rt = rt;
980   rtdetail->completion_event = completion;
981   e = eventer_alloc();
982   e->mask = EVENTER_ASYNCH;
983   e->callback = stratcon_ingest_asynch_lookup;
984   e->closure = rtdetail;
985   eventer_add_asynch(cpool->jobq, e);
986 }
987 static const char *
988 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
989   void *vinfo;
990   char *dsn = NULL, *fqdn = NULL;
991   int found = 0;
992   storagenode_info *info = NULL;
993   pthread_mutex_lock(&storagenode_to_info_cache_lock);
994   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
995                         &vinfo)) {
996     found = 1;
997     info = vinfo;
998   }
999   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1000   if(found) {
1001     if(fqdn_out) *fqdn_out = info->fqdn;
1002     return info->dsn;
1003   }
1004
1005   if(!found && can_use_db) {
1006     ds_single_detail *d;
1007     conn_q *cq;
1008     int row_count;
1009     /* Look it up and store it */
1010     d = calloc(1, sizeof(*d));
1011     cq = get_conn_q_for_metanode();
1012     GET_QUERY(find_storage);
1013     DECLARE_PARAM_INT(id);
1014     PG_EXEC(find_storage);
1015     row_count = PQntuples(d->res);
1016     if(row_count) {
1017       PG_GET_STR_COL(dsn, 0, "dsn");
1018       PG_GET_STR_COL(fqdn, 0, "fqdn");
1019       fqdn = fqdn ? strdup(fqdn) : NULL;
1020       dsn = dsn ? strdup(dsn) : NULL;
1021     }
1022     PQclear(d->res);
1023    bad_row:
1024     free_params(d);
1025     free(d);
1026     release_conn_q(cq);
1027   }
1028   if(fqdn) {
1029     info = calloc(1, sizeof(*info));
1030     info->fqdn = fqdn;
1031     if(fqdn_out) *fqdn_out = info->fqdn;
1032     info->dsn = dsn;
1033     info->storagenode_id = id;
1034     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1035     noit_hash_store(&storagenode_to_info_cache,
1036                     (void *)&info->storagenode_id, sizeof(int), info);
1037     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1038   }
1039   return info ? info->dsn : NULL;
1040 }
1041 static void
1042 expand_b_record(ds_line_detail **head, ds_line_detail **last,
1043                 const char *line, int len) {
1044   char **outrows;
1045   int i, cnt;
1046   ds_line_detail *next;
1047
1048   cnt = noit_check_log_b_to_sm(line, len, &outrows);
1049   for(i=0;i<cnt;i++) {
1050     if(outrows[i] == NULL) continue;
1051     next = calloc(sizeof(*next), 1);
1052     next->data = outrows[i];
1053     if(!*head) *head = next;
1054     if(*last) (*last)->next = next;
1055     *last = next;
1056   }
1057   if(outrows) free(outrows);
1058 }
1059 static ds_line_detail *
1060 build_insert_batch(pg_interim_journal_t *ij) {
1061   int rv;
1062   off_t len;
1063   const char *buff, *cp, *lcp;
1064   struct stat st;
1065   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1066
1067   if(ij->fd < 0) {
1068     ij->fd = open(ij->filename, O_RDONLY);
1069     if(ij->fd < 0) {
1070       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1071             ij->filename, strerror(errno));
1072       assert(ij->fd >= 0);
1073     }
1074   }
1075   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1076   if(rv == -1) {
1077       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1078             ij->filename, strerror(errno));
1079     assert(rv != -1);
1080   }
1081   len = st.st_size;
1082   if(len > 0) {
1083     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1084     if(buff == (void *)-1) {
1085       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1086             ij->filename, strerror(errno));
1087       assert(buff != (void *)-1);
1088     }
1089     lcp = buff;
1090     while(lcp < (buff + len) &&
1091           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1092       if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') {
1093       /* Bundle records are special and need to be expanded into
1094        * traditional records here
1095        */
1096         noit_compression_type_t ctype = NOIT_COMPRESS_NONE;
1097         switch(lcp[1]) {
1098           case '1': /* version 1 */
1099             ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */
1100           case '2': /* version 2 */
1101               expand_b_record(&head, &last, lcp, cp - lcp);
1102             break;
1103           default:
1104             noitL(noit_error, "unknown bundle version %c\n", lcp[1]);
1105         }
1106       }
1107       else {
1108         next = calloc(1, sizeof(*next));
1109         next->data = malloc(cp - lcp + 1);
1110         memcpy(next->data, lcp, cp - lcp);
1111         next->data[cp - lcp] = '\0';
1112         if(!head) head = next;
1113         if(last) last->next = next;
1114         last = next;
1115       }
1116       lcp = cp + 1;
1117     }
1118     munmap((void *)buff, len);
1119   }
1120   close(ij->fd);
1121   return head;
1122 }
1123 static void
1124 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1125   unlink(ij->filename);
1126   if(ij->filename) free(ij->filename);
1127   if(ij->remote_str) free(ij->remote_str);
1128   if(ij->remote_cn) free(ij->remote_cn);
1129   if(ij->fqdn) free(ij->fqdn);
1130   free(ij);
1131 }
1132 static int
1133 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1134                                struct timeval *now) {
1135   int i, total, success, sp_total, sp_success;
1136   pg_interim_journal_t *ij;
1137   ds_line_detail *head = NULL, *current, *last_sp;
1138   const char *dsn;
1139   conn_q *cq;
1140   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1141   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1142
1143   ij = closure;
1144   if(ij->fqdn == NULL) {
1145     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1146     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1147   }
1148   else {
1149     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1150   }
1151   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1152                              ij->fqdn, dsn);
1153   noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1154         ij->remote_str, ij->remote_cn, ij->fqdn);
1155  full_monty:
1156   /* Make sure we have a connection */
1157   i = 1;
1158   while(stratcon_database_connect(cq)) {
1159     noitL(noit_error, "Error connecting to database: %s\n",
1160           ij->fqdn ? ij->fqdn : "(null)");
1161     sleep(i);
1162     i *= 2;
1163     i = MIN(i, 16);
1164   }
1165
1166   if(head == NULL) head = build_insert_batch(ij);
1167   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1168         ij->remote_str ? ij->remote_str : "(null)",
1169         ij->remote_cn ? ij->remote_cn : "(null)",
1170         ij->fqdn ? ij->fqdn : "(null)");
1171   current = head;
1172   last_sp = NULL;
1173   total = success = sp_total = sp_success = 0;
1174   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1175   while(current) {
1176     execute_outcome_t rv;
1177     if(current->data) {
1178       if(!last_sp) {
1179         SAVEPOINT("batch");
1180         sp_success = success;
1181         sp_total = total;
1182       }
1183  
1184       if(current->problematic) {
1185         RELEASE_SAVEPOINT("batch");
1186         current = current->next;
1187         total++;
1188         continue;
1189       }
1190       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1191                                    current);
1192       switch(rv) {
1193         case DS_EXEC_SUCCESS:
1194           total++;
1195           success++;
1196           current = current->next;
1197           break;
1198         case DS_EXEC_ROW_FAILED:
1199           /* rollback to savepoint, mark this record as bad and start again */
1200           if(current->data[0] != 'n')
1201             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1202           current->problematic = 1;
1203           current = last_sp;
1204           success = sp_success;
1205           total = sp_total;
1206           ROLLBACK_TO_SAVEPOINT("batch");
1207           break;
1208         case DS_EXEC_TXN_FAILED:
1209           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1210           BUSTED(cq);
1211       }
1212     }
1213   }
1214   if(last_sp) RELEASE_SAVEPOINT("batch");
1215   if(stratcon_ingest_do(cq, "COMMIT")) {
1216     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1217     BUSTED(cq);
1218   }
1219   /* Cleanup the mess */
1220   while(head) {
1221     ds_line_detail *tofree;
1222     tofree = head;
1223     head = head->next;
1224     if(tofree->data) free(tofree->data);
1225     free_params((ds_single_detail *)tofree);
1226     free(tofree);
1227   }
1228   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1229         ij->remote_str ? ij->remote_str : "(null)",
1230         ij->remote_cn ? ij->remote_cn : "(null)",
1231         ij->fqdn ? ij->fqdn : "(null)", success, total);
1232   pg_interim_journal_remove(ij);
1233   release_conn_q(cq);
1234   return 0;
1235 }
1236 static int
1237 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1238                           int *sid_out, int *storagenode_id_out,
1239                           const char **remote_cn_out,
1240                           const char **fqdn_out, const char **dsn_out) {
1241   /* only called from the main thread -- no safety issues */
1242   void *vuuidinfo, *vinfo;
1243   uuid_info *uuidinfo;
1244   storagenode_info *info = NULL;
1245   char *fqdn = NULL;
1246   char *dsn = NULL;
1247   char *new_remote_cn = NULL;
1248   int storagenode_id = 0, sid = 0;
1249   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1250                          &vuuidinfo)) {
1251     int row_count = 0;
1252     char *tmpint;
1253     ds_single_detail *d;
1254     conn_q *cq;
1255
1256     /* We can't do a database lookup without the remote_cn */
1257     if(!remote_cn) {
1258       if(stratcon_datastore_get_enabled()) {
1259         /* We have an authoritatively maintained cache, we don't do lookups */
1260         return -1;
1261       }
1262       else
1263         remote_cn = "[[null]]";
1264     }
1265
1266     d = calloc(1, sizeof(*d));
1267     cq = get_conn_q_for_metanode();
1268     if(stratcon_database_connect(cq) == 0) {
1269       /* Blocking call to service the cache miss */
1270       GET_QUERY(check_map);
1271       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1272       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1273       PG_EXEC(check_map);
1274       row_count = PQntuples(d->res);
1275       if(row_count != 1) {
1276         PQclear(d->res);
1277         goto bad_row;
1278       }
1279       PG_GET_STR_COL(tmpint, 0, "sid");
1280       if(!tmpint) {
1281         row_count = 0;
1282         PQclear(d->res);
1283         goto bad_row;
1284       }
1285       sid = atoi(tmpint);
1286       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1287       if(tmpint) storagenode_id = atoi(tmpint);
1288       PG_GET_STR_COL(fqdn, 0, "fqdn");
1289       PG_GET_STR_COL(dsn, 0, "dsn");
1290       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1291       fqdn = fqdn ? strdup(fqdn) : NULL;
1292       dsn = dsn ? strdup(dsn) : NULL;
1293       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1294       PQclear(d->res);
1295     }
1296    bad_row:
1297     free_params((ds_single_detail *)d);
1298     free(d);
1299     release_conn_q(cq);
1300     if(row_count != 1) {
1301       return -1;
1302     }
1303     /* Place in cache */
1304     uuidinfo = calloc(1, sizeof(*uuidinfo));
1305     uuidinfo->sid = sid;
1306     uuidinfo->uuid_str = strdup(uuid_str);
1307     uuidinfo->storagenode_id = storagenode_id;
1308     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1309     noit_hash_store(&uuid_to_info_cache,
1310                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1311     /* Also, we may have just witnessed a new storage node, store it */
1312     if(storagenode_id) {
1313       int needs_free = 0;
1314       info = calloc(1, sizeof(*info));
1315       info->storagenode_id = storagenode_id;
1316       info->dsn = dsn ? strdup(dsn) : NULL;
1317       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1318       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1319       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1320                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1321         /* hack to save memory -- we *never* remove from these caches,
1322            so we can use the same fqdn value in the above cache for the key
1323            in the cache below -- (no strdup) */
1324         noit_hash_store(&storagenode_to_info_cache,
1325                         (void *)&info->storagenode_id, sizeof(int), info);
1326       }
1327       else needs_free = 1;
1328       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1329       if(needs_free) {
1330         if(info->dsn) free(info->dsn);
1331         if(info->fqdn) free(info->fqdn);
1332         free(info);
1333       }
1334     }
1335   }
1336   else
1337     uuidinfo = vuuidinfo;
1338
1339   if(uuidinfo && uuidinfo->storagenode_id) {
1340     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1341       /* we don't have dsn and we actually want it */
1342       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1343       if(noit_hash_retrieve(&storagenode_to_info_cache,
1344                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1345                             &vinfo))
1346         info = vinfo;
1347       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1348     }
1349   }
1350
1351   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1352   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1353   assert(uuidinfo);
1354   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1355   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1356   if(sid_out) *sid_out = uuidinfo->sid;
1357   if(fqdn) free(fqdn);
1358   if(dsn) free(dsn);
1359   if(new_remote_cn) free(new_remote_cn);
1360   return 0;
1361 }
1362 static int
1363 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1364   char uuid_str[UUID_STR_LEN+1];
1365   int sid = 0;
1366   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1367   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1368   return sid;
1369 }
1370
1371 static int
1372 stratcon_ingest_saveconfig() {
1373   int rv = -1;
1374   char *buff;
1375   ds_single_detail _d = { 0 }, *d = &_d;
1376   conn_q *cq;
1377   char ipv4_str[32];
1378   struct in_addr r, l;
1379
1380   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1381   memset(&l, 0, sizeof(l));
1382   noit_getip_ipv4(r, &l);
1383   /* Ignore the error.. what are we going to do anyway */
1384   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1385     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1386
1387   cq = get_conn_q_for_metanode();
1388
1389   if(stratcon_database_connect(cq) == 0) {
1390     char time_as_str[20];
1391     size_t len;
1392     buff = noit_conf_xml_in_mem(&len);
1393     if(!buff) goto bad_row;
1394
1395     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1396     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1397     DECLARE_PARAM_STR("", 0);
1398     DECLARE_PARAM_STR("stratcond", 9);
1399     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1400     DECLARE_PARAM_STR(buff, len);
1401     free(buff);
1402
1403     GET_QUERY(config_insert);
1404     PG_EXEC(config_insert);
1405     PQclear(d->res);
1406     rv = 0;
1407
1408     bad_row:
1409       free_params(d);
1410   }
1411   release_conn_q(cq);
1412   return rv;
1413 }
1414
1415 static void
1416 stratcon_ingest_launch_file_ingestion(const char *path,
1417                                       const char *remote_str,
1418                                       const char *remote_cn,
1419                                       const char *id_str) {
1420   pg_interim_journal_t *ij;
1421   eventer_t ingest;
1422
1423   ij = calloc(1, sizeof(*ij));
1424   ij->fd = open(path, O_RDONLY);
1425   if(ij->fd < 0) {
1426     noitL(noit_error, "cannot open journal '%s': %s\n",
1427           path, strerror(errno));
1428     free(ij);
1429     return;
1430   }
1431   close(ij->fd);
1432   ij->fd = -1;
1433   ij->filename = strdup(path);
1434   ij->remote_str = strdup(remote_str);
1435   ij->remote_cn = strdup(remote_cn);
1436   ij->storagenode_id = atoi(id_str);
1437   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1438                                        ij->fqdn);
1439   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1440   ingest = eventer_alloc();
1441   ingest->mask = EVENTER_ASYNCH;
1442   ingest->callback = stratcon_ingest_asynch_execute;
1443   ingest->closure = ij;
1444   eventer_add_asynch(ij->cpool->jobq, ingest);
1445 }
1446 static void
1447 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third) {
1448   char path[PATH_MAX];
1449   DIR *root;
1450   struct dirent *de, *entry;
1451   int i = 0, cnt = 0;
1452   char **entries;
1453   int size = 0;
1454
1455   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1456            first ? "/" : "", first ? first : "",
1457            second ? "/" : "", second ? second : "",
1458            third ? "/" : "", third ? third : "");
1459 #ifdef _PC_NAME_MAX
1460   size = pathconf(path, _PC_NAME_MAX);
1461 #endif
1462   size = MIN(size, PATH_MAX + 128);
1463   de = alloca(size);
1464   root = opendir(path);
1465   if(!root) return;
1466   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1467   closedir(root);
1468   root = opendir(path);
1469   if(!root) return;
1470   entries = malloc(sizeof(*entries) * cnt);
1471   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1472     if(i < cnt) {
1473       entries[i++] = strdup(entry->d_name);
1474     }
1475   }
1476   closedir(root);
1477   cnt = i; /* could have changed, directories are fickle */
1478   qsort(entries, i, sizeof(*entries),
1479         (int (*)(const void *, const void *))strcasecmp);
1480   for(i=0; i<cnt; i++) {
1481     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1482     noitL(ds_deb, "Processing L%d entry '%s'\n",
1483           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1484     if(!first)
1485       stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL);
1486     else if(!second)
1487       stratcon_ingest_sweep_journals_int(first, entries[i], NULL);
1488     else if(!third)
1489       stratcon_ingest_sweep_journals_int(first, second, entries[i]);
1490     else if(strlen(entries[i]) == 16) {
1491       char fullpath[PATH_MAX];
1492       snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath,
1493                first,second,third,entries[i]);
1494       stratcon_ingest_launch_file_ingestion(fullpath,first,second,third);
1495     }
1496   }
1497   for(i=0; i<cnt; i++)
1498     free(entries[i]);
1499   free(entries);
1500 }
1501 static void
1502 stratcon_ingest_sweep_journals() {
1503   stratcon_ingest_sweep_journals_int(NULL,NULL,NULL);
1504 }
1505
1506 int
1507 stratcon_ingest_all_storagenode_info() {
1508   int i, cnt = 0;
1509   ds_single_detail _d = { 0 }, *d = &_d;
1510   conn_q *cq;
1511   cq = get_conn_q_for_metanode();
1512
1513   while(stratcon_database_connect(cq)) {
1514     noitL(noit_error, "Error connecting to database\n");
1515     sleep(1);
1516   }
1517
1518   GET_QUERY(all_storage);
1519   PG_EXEC(all_storage);
1520   cnt = PQntuples(d->res);
1521   for(i=0; i<cnt; i++) {
1522     void *vinfo;
1523     char *tmpint, *fqdn, *dsn;
1524     int storagenode_id;
1525     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1526     storagenode_id = atoi(tmpint);
1527     PG_GET_STR_COL(fqdn, i, "fqdn");
1528     PG_GET_STR_COL(dsn, i, "dsn");
1529     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1530     storagenode_id = tmpint ? atoi(tmpint) : 0;
1531
1532     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1533                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1534       storagenode_info *info;
1535       info = calloc(1, sizeof(*info));
1536       info->storagenode_id = storagenode_id;
1537       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1538       info->dsn = dsn ? strdup(dsn) : NULL;
1539       noit_hash_store(&storagenode_to_info_cache,
1540                       (void *)&info->storagenode_id, sizeof(int), info);
1541       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1542             info->storagenode_id,
1543             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1544     }
1545   }
1546   PQclear(d->res);
1547  bad_row:
1548   free_params(d);
1549
1550   release_conn_q(cq);
1551   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1552   return cnt;
1553 }
1554 int
1555 stratcon_ingest_all_check_info() {
1556   int i, cnt, loaded = 0;
1557   ds_single_detail _d = { 0 }, *d = &_d;
1558   conn_q *cq;
1559   cq = get_conn_q_for_metanode();
1560
1561   while(stratcon_database_connect(cq)) {
1562     noitL(noit_error, "Error connecting to database\n");
1563     sleep(1);
1564   }
1565
1566   GET_QUERY(check_mapall);
1567   PG_EXEC(check_mapall);
1568   cnt = PQntuples(d->res);
1569   for(i=0; i<cnt; i++) {
1570     void *vinfo;
1571     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1572     int sid, storagenode_id;
1573     uuid_info *uuidinfo;
1574     PG_GET_STR_COL(uuid_str, i, "id");
1575     if(!uuid_str) continue;
1576     PG_GET_STR_COL(tmpint, i, "sid");
1577     if(!tmpint) continue;
1578     sid = atoi(tmpint);
1579     PG_GET_STR_COL(fqdn, i, "fqdn");
1580     PG_GET_STR_COL(dsn, i, "dsn");
1581     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1582     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1583     storagenode_id = tmpint ? atoi(tmpint) : 0;
1584
1585     uuidinfo = calloc(1, sizeof(*uuidinfo));
1586     uuidinfo->uuid_str = strdup(uuid_str);
1587     uuidinfo->remote_cn = strdup(remote_cn);
1588     uuidinfo->storagenode_id = storagenode_id;
1589     uuidinfo->sid = sid;
1590     noit_hash_store(&uuid_to_info_cache,
1591                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1592     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1593           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1594     loaded++;
1595     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1596                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1597       storagenode_info *info;
1598       info = calloc(1, sizeof(*info));
1599       info->storagenode_id = storagenode_id;
1600       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1601       info->dsn = dsn ? strdup(dsn) : NULL;
1602       noit_hash_store(&storagenode_to_info_cache,
1603                       (void *)&info->storagenode_id, sizeof(int), info);
1604       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1605             info->storagenode_id,
1606             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1607     }
1608   }
1609   PQclear(d->res);
1610  bad_row:
1611   free_params(d);
1612
1613   release_conn_q(cq);
1614   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1615   return loaded;
1616 }
1617
1618 static int
1619 rest_get_noit_config(noit_http_rest_closure_t *restc,
1620                      int npats, char **pats) {
1621   noit_http_session_ctx *ctx = restc->http_ctx;
1622   ds_single_detail *d;
1623   int row_count = 0;
1624   const char *xml = NULL;
1625   conn_q *cq = NULL;
1626
1627   if(npats != 0) {
1628     noit_http_response_server_error(ctx, "text/xml");
1629     noit_http_response_end(ctx);
1630     return 0;
1631   }
1632   d = calloc(1, sizeof(*d));
1633   GET_QUERY(config_get);
1634   cq = get_conn_q_for_metanode();
1635   if(!cq) {
1636     noit_http_response_server_error(ctx, "text/xml");
1637     goto bad_row;
1638   }
1639
1640   DECLARE_PARAM_STR(restc->remote_cn,
1641                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1642   PG_EXEC(config_get);
1643   row_count = PQntuples(d->res);
1644   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1645
1646   if(xml == NULL) {
1647     char buff[1024];
1648     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1649                                  "<row_count>%d</row_count></error>\n",
1650              restc->remote_cn, row_count);
1651     noit_http_response_append(ctx, buff, strlen(buff));
1652     noit_http_response_not_found(ctx, "text/xml");
1653   }
1654   else {
1655     noit_http_response_append(ctx, xml, strlen(xml));
1656     noit_http_response_ok(ctx, "text/xml");
1657   }
1658  bad_row:
1659   free_params((ds_single_detail *)d);
1660   d->nparams = 0;
1661   if(cq) release_conn_q(cq);
1662
1663   noit_http_response_end(ctx);
1664   return 0;
1665 }
1666
1667 static ingestor_api_t postgres_ingestor_api = {
1668   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1669   .iep_check_preload = stratcon_ingest_iep_check_preload,
1670   .storage_node_lookup = storage_node_quick_lookup,
1671   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1672   .get_noit_config = NULL,
1673   .save_config = stratcon_ingest_saveconfig
1674 };
1675
1676 static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
1677   return 0;
1678 }
1679 static int postgres_ingestor_onload(noit_image_t *self) {
1680   return 0;
1681 }
1682 static int postgres_ingestor_init(noit_module_generic_t *self) {
1683   pthread_mutex_init(&ds_conns_lock, NULL);
1684   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1685   ds_err = noit_log_stream_find("error/datastore");
1686   ds_deb = noit_log_stream_find("debug/datastore");
1687   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1688   ingest_err = noit_log_stream_find("error/ingest");
1689   if(!ds_err) ds_err = noit_error;
1690   if(!ingest_err) ingest_err = noit_error;
1691   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1692                            &basejpath)) {
1693     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1694     exit(-1);
1695   }
1696   stratcon_ingest_all_check_info();
1697   stratcon_ingest_all_storagenode_info();
1698   stratcon_ingest_sweep_journals();
1699   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1700 }
1701
1702 noit_module_generic_t postgres_ingestor = {
1703   {
1704     NOIT_GENERIC_MAGIC,
1705     NOIT_GENERIC_ABI_VERSION,
1706     "postgres_ingestor",
1707     "postgres drive for data ingestion",
1708     postgres_ingestor_xml_description,
1709     postgres_ingestor_onload,
1710   }, 
1711   postgres_ingestor_config,
1712   postgres_ingestor_init
1713 };
Note: See TracBrowser for help on using the browser.