root/src/modules/postgres_ingestor.c

Revision 5f816fc68a68eaeb73588d831044fdaadbafe5e9, 52.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

fixes #358

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "stratcon_datastore.h"
42 #include "stratcon_realtime_http.h"
43 #include "stratcon_iep.h"
44 #include "noit_conf.h"
45 #include "noit_check.h"
46 #include "noit_rest.h"
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <netinet/in.h>
50 #include <sys/un.h>
51 #include <dirent.h>
52 #include <arpa/inet.h>
53 #include <sys/mman.h>
54 #include <libpq-fe.h>
55 #include <zlib.h>
56 #include <assert.h>
57 #include <errno.h>
58
59 #define DECL_STMT(codename,confname) \
60 static char *codename = NULL; \
61 static const char *codename##_conf = "/stratcon/database/statements/" #confname
62
63 DECL_STMT(storage_post_connect, storagepostconnect);
64 DECL_STMT(metanode_post_connect, metanodepostconnect);
65 DECL_STMT(find_storage, findstoragenode);
66 DECL_STMT(all_storage, allstoragenodes);
67 DECL_STMT(check_map, mapchecktostoragenode);
68 DECL_STMT(check_mapall, mapallchecks);
69 DECL_STMT(check_loadall, allchecks);
70 DECL_STMT(check_find, findcheck);
71 DECL_STMT(check_insert, check);
72 DECL_STMT(status_insert, status);
73 DECL_STMT(metric_insert_numeric, metric_numeric);
74 DECL_STMT(metric_insert_text, metric_text);
75 DECL_STMT(config_insert, config);
76 DECL_STMT(config_get, findconfig);
77
78 static noit_log_stream_t ds_err = NULL;
79 static noit_log_stream_t ds_deb = NULL;
80 static noit_log_stream_t ds_pool_deb = NULL;
81 static noit_log_stream_t ingest_err = NULL;
82
83 #define GET_QUERY(a) do { \
84   if(a == NULL) \
85     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
86       goto bad_row; \
87 } while(0)
88
89 struct conn_q;
90
91 typedef struct {
92   char            *queue_name; /* the key fqdn+remote_sn */
93   eventer_jobq_t  *jobq;
94   struct conn_q   *head;
95   pthread_mutex_t  lock;
96   pthread_cond_t   cv;
97   int              ttl;
98   int              in_pool;
99   int              outstanding;
100   int              max_allocated;
101   int              max_in_pool;
102 } conn_pool;
103 typedef struct conn_q {
104   time_t           last_use;
105   char            *dsn;        /* Pg connect string */
106   char            *remote_str; /* the IP of the noit*/
107   char            *remote_cn;  /* the Cert CN of the noit */
108   char            *fqdn;       /* the fqdn of the storage node */
109   conn_pool       *pool;
110   struct conn_q   *next;
111   /* Postgres specific stuff */
112   PGconn          *dbh;
113 } conn_q;
114
115
116 #define MAX_PARAMS 8
117 #define POSTGRES_PARTS \
118   PGresult *res; \
119   int rv; \
120   time_t whence; \
121   int nparams; \
122   int metric_type; \
123   char *paramValues[MAX_PARAMS]; \
124   int paramLengths[MAX_PARAMS]; \
125   int paramFormats[MAX_PARAMS]; \
126   int paramAllocd[MAX_PARAMS];
127
128 typedef struct ds_single_detail {
129   POSTGRES_PARTS
130 } ds_single_detail;
131 typedef struct {
132   /* Postgres specific stuff */
133   POSTGRES_PARTS
134   struct realtime_tracker *rt;
135   conn_q *cq; /* connection on which to perform this job */
136   eventer_t completion_event; /* This event should be registered if non NULL */
137 } ds_rt_detail;
138 typedef struct ds_line_detail {
139   /* Postgres specific stuff */
140   POSTGRES_PARTS
141   char *data;
142   int problematic;
143   struct ds_line_detail *next;
144 } ds_line_detail;
145
146 typedef struct {
147   char *remote_str;
148   char *remote_cn;
149   char *fqdn;
150   int storagenode_id;
151   int fd;
152   char *filename;
153   conn_pool *cpool;
154 } pg_interim_journal_t;
155
156 static int stratcon_database_connect(conn_q *cq);
157 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
158 static int storage_node_quick_lookup(const char *uuid_str,
159                                      const char *remote_cn,
160                                      int *sid_out, int *storagenode_id_out,
161                                      const char **remote_cn_out,
162                                      const char **fqdn_out,
163                                      const char **dsn_out);
164
165 static void
166 free_params(ds_single_detail *d) {
167   int i;
168   for(i=0; i<d->nparams; i++)
169     if(d->paramAllocd[i] && d->paramValues[i])
170       free(d->paramValues[i]);
171 }
172
173 static char *basejpath = NULL;
174 static pthread_mutex_t ds_conns_lock;
175 static noit_hash_table ds_conns;
176
177 /* the fqdn cache needs to be thread safe */
178 typedef struct {
179   char *uuid_str;
180   char *remote_cn;
181   int storagenode_id;
182   int sid;
183 } uuid_info;
184 typedef struct {
185   int storagenode_id;
186   char *fqdn;
187   char *dsn;
188 } storagenode_info;
189 noit_hash_table uuid_to_info_cache;
190 pthread_mutex_t storagenode_to_info_cache_lock;
191 noit_hash_table storagenode_to_info_cache;
192
193 /* Thread-safe connection pools */
194
195 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
196 static void
197 release_conn_q_forceable(conn_q *cq, int forcefree) {
198   int putback = 0;
199   cq->last_use = time(NULL);
200   pthread_mutex_lock(&cq->pool->lock);
201   cq->pool->outstanding--;
202   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
203     putback = 1;
204     cq->next = cq->pool->head;
205     cq->pool->head = cq;
206     cq->pool->in_pool++;
207   }
208   pthread_mutex_unlock(&cq->pool->lock);
209   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
210         putback ? "to pool" : "and destroy", cq->pool->queue_name);
211   pthread_cond_signal(&cq->pool->cv);
212   if(putback) return;
213
214   /* Not put back, release it */
215   if(cq->dbh) PQfinish(cq->dbh);
216   if(cq->remote_str) free(cq->remote_str);
217   if(cq->remote_cn) free(cq->remote_cn);
218   if(cq->fqdn) free(cq->fqdn);
219   if(cq->dsn) free(cq->dsn);
220   free(cq);
221 }
222 static void
223 ttl_purge_conn_pool(conn_pool *pool) {
224   int old_cnt, new_cnt;
225   time_t now = time(NULL);
226   conn_q *cq, *prev = NULL, *iter;
227   /* because we always replace on the head and update the last_use time when
228      doing so, we know they are ordered LRU on the end.  So, once we hit an
229      old one, we know all the others are old too.
230    */
231   if(!pool->head) return; /* hack short circuit for no locks */
232   pthread_mutex_lock(&pool->lock);
233   old_cnt = pool->in_pool;
234   cq = pool->head;
235   while(cq) {
236     if(cq->last_use + cq->pool->ttl < now) {
237       if(prev) prev->next = NULL;
238       else pool->head = NULL;
239       break;
240     }
241     prev = cq;
242     cq = cq->next;
243   }
244   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
245   /* Fix accounting */
246   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
247   new_cnt = pool->in_pool;
248   pthread_mutex_unlock(&pool->lock);
249
250   /* Force release these without holding the lock */
251   while(cq) {
252     cq = cq->next;
253     release_conn_q_forceable(cq, 1);
254   }
255   if(old_cnt != new_cnt)
256     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
257           pool->queue_name);
258 }
259 static void
260 release_conn_q(conn_q *cq) {
261   ttl_purge_conn_pool(cq->pool);
262   release_conn_q_forceable(cq, 0);
263 }
264 static conn_pool *
265 get_conn_pool_for_remote(const char *remote_str,
266                          const char *remote_cn, const char *fqdn) {
267   void *vcpool;
268   conn_pool *cpool = NULL;
269   char queue_name[256] = "datastore_";
270   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
271            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
272            fqdn ? fqdn : "default",
273            remote_cn ? remote_cn : "default");
274   pthread_mutex_lock(&ds_conns_lock);
275   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
276                         strlen(queue_name), &vcpool))
277     cpool = vcpool;
278   pthread_mutex_unlock(&ds_conns_lock);
279   if(!cpool) {
280     vcpool = cpool = calloc(1, sizeof(*cpool));
281     cpool->queue_name = strdup(queue_name);
282     pthread_mutex_init(&cpool->lock, NULL);
283     pthread_cond_init(&cpool->cv, NULL);
284     cpool->in_pool = 0;
285     cpool->outstanding = 0;
286     cpool->max_in_pool = 1;
287     cpool->max_allocated = 1;
288     pthread_mutex_lock(&ds_conns_lock);
289     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
290                         cpool)) {
291       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
292                          strlen(queue_name), &vcpool);
293     }
294     pthread_mutex_unlock(&ds_conns_lock);
295     if(vcpool != cpool) {
296       /* someone beat us to it */
297       free(cpool->queue_name);
298       pthread_mutex_destroy(&cpool->lock);
299       pthread_cond_destroy(&cpool->cv);
300       free(cpool);
301     }
302     else {
303       int i;
304       /* Our job to setup the pool */
305       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
306       eventer_jobq_init(cpool->jobq, queue_name);
307       cpool->jobq->backq = eventer_default_backq();
308       /* Add one thread */
309       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
310         eventer_jobq_increase_concurrency(cpool->jobq);
311     }
312     cpool = vcpool;
313   }
314   return cpool;
315 }
316 static conn_q *
317 get_conn_q_for_remote(const char *remote_str,
318                       const char *remote_cn, const char *fqdn,
319                       const char *dsn) {
320   conn_pool *cpool;
321   conn_q *cq;
322   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
323   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
324         cpool->queue_name);
325   pthread_mutex_lock(&cpool->lock);
326  again:
327   if(cpool->head) {
328     assert(cpool->in_pool > 0);
329     cq = cpool->head;
330     cpool->head = cq->next;
331     cpool->in_pool--;
332     cpool->outstanding++;
333     cq->next = NULL;
334     pthread_mutex_unlock(&cpool->lock);
335     return cq;
336   }
337   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
338     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
339           (void *)pthread_self(), cpool->queue_name);
340     pthread_cond_wait(&cpool->cv, &cpool->lock);
341     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
342           (void *)pthread_self(), cpool->queue_name);
343     goto again;
344   }
345   else {
346     cpool->outstanding++;
347     pthread_mutex_unlock(&cpool->lock);
348   }
349  
350   cq = calloc(1, sizeof(*cq));
351   cq->pool = cpool;
352   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
353   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
354   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
355   cq->dsn = dsn ? strdup(dsn) : NULL;
356   return cq;
357 }
358 static conn_q *
359 get_conn_q_for_metanode() {
360   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
361 }
362
363 typedef enum {
364   DS_EXEC_SUCCESS = 0,
365   DS_EXEC_ROW_FAILED = 1,
366   DS_EXEC_TXN_FAILED = 2,
367 } execute_outcome_t;
368
369 #define DECLARE_PARAM_STR(str, len) do { \
370   d->paramValues[d->nparams] = noit__strndup(str, len); \
371   d->paramLengths[d->nparams] = len; \
372   d->paramFormats[d->nparams] = 0; \
373   d->paramAllocd[d->nparams] = 1; \
374   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
375     free(d->paramValues[d->nparams]); \
376     d->paramValues[d->nparams] = NULL; \
377     d->paramLengths[d->nparams] = 0; \
378     d->paramAllocd[d->nparams] = 0; \
379   } \
380   d->nparams++; \
381 } while(0)
382 #define DECLARE_PARAM_INT(i) do { \
383   int buffer__len; \
384   char buffer__[32]; \
385   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
386   buffer__len = strlen(buffer__); \
387   DECLARE_PARAM_STR(buffer__, buffer__len); \
388 } while(0)
389
390 #define PG_GET_STR_COL(dest, row, name) do { \
391   int colnum = PQfnumber(d->res, name); \
392   dest = NULL; \
393   if (colnum >= 0) \
394     dest = PQgetisnull(d->res, row, colnum) \
395          ? NULL : PQgetvalue(d->res, row, colnum); \
396 } while(0)
397
398 #define PG_EXEC(cmd) do { \
399   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
400                         (const char * const *)d->paramValues, \
401                         d->paramLengths, d->paramFormats, 0); \
402   d->rv = PQresultStatus(d->res); \
403   if(d->rv != PGRES_COMMAND_OK && \
404      d->rv != PGRES_TUPLES_OK) { \
405     const char *pgerr = PQresultErrorMessage(d->res); \
406     const char *pgerr_end = strchr(pgerr, '\n'); \
407     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
408     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
409           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
410           (int)(pgerr_end - pgerr), pgerr); \
411     PQclear(d->res); \
412     goto bad_row; \
413   } \
414 } while(0)
415
416 #define PG_TM_EXEC(cmd, whence) do { \
417   time_t __w = whence; \
418   char cmdbuf[4096]; \
419   struct tm tbuf, *tm; \
420   tm = gmtime_r(&__w, &tbuf); \
421   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
422   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
423                         (const char * const *)d->paramValues, \
424                         d->paramLengths, d->paramFormats, 0); \
425   d->rv = PQresultStatus(d->res); \
426   if(d->rv != PGRES_COMMAND_OK && \
427      d->rv != PGRES_TUPLES_OK) { \
428     const char *pgerr = PQresultErrorMessage(d->res); \
429     const char *pgerr_end = strchr(pgerr, '\n'); \
430     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
431     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
432           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
433           (long long unsigned)whence); \
434     PQclear(d->res); \
435     goto bad_row; \
436   } \
437 } while(0)
438
439 static void *
440 stratcon_ingest_check_loadall(void *vsn) {
441   storagenode_info *sn = vsn;
442   ds_single_detail *d;
443   int i, row_count = 0, good = 0;
444   char buff[1024];
445   conn_q *cq = NULL;
446
447   d = calloc(1, sizeof(*d));
448   GET_QUERY(check_loadall);
449   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
450   i = 0;
451   while(stratcon_database_connect(cq)) {
452     if(i++ > 4) {
453       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
454       release_conn_q(cq);
455       return (void *)(vpsized_int)good;
456     }
457     sleep(1);
458   }
459   PG_EXEC(check_loadall);
460   row_count = PQntuples(d->res);
461  
462   for(i=0; i<row_count; i++) {
463     int rv;
464     int8_t family;
465     struct sockaddr *sin;
466     struct sockaddr_in sin4 = { .sin_family = AF_INET };
467     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
468     char *remote, *id, *target, *module, *name;
469     PG_GET_STR_COL(remote, i, "remote_address");
470     PG_GET_STR_COL(id, i, "id");
471     PG_GET_STR_COL(target, i, "target");
472     PG_GET_STR_COL(module, i, "module");
473     PG_GET_STR_COL(name, i, "name");
474     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
475
476     family = AF_INET;
477     sin = (struct sockaddr *)&sin4;
478     rv = inet_pton(family, remote, &sin4.sin_addr);
479     if(rv != 1) {
480       family = AF_INET6;
481       sin = (struct sockaddr *)&sin6;
482       rv = inet_pton(family, remote, &sin6.sin6_addr);
483       if(rv != 1) {
484         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
485         sin = NULL;
486       }
487     }
488
489     /* stratcon_iep_line_processor takes an allocated operand and frees it */
490     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
491     good++;
492   }
493   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
494         good, row_count, sn->fqdn);
495  bad_row:
496   free_params((ds_single_detail *)d);
497   free(d);
498   if(cq) release_conn_q(cq);
499   return (void *)(vpsized_int)good;
500 }
501 static int
502 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
503                                  struct timeval *now) {
504   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
505   pthread_t *jobs = NULL;
506   int nodes, i = 0, tcnt = 0;
507   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
508   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
509
510   pthread_mutex_lock(&storagenode_to_info_cache_lock);
511   nodes = storagenode_to_info_cache.size;
512   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
513   sns = calloc(MAX(1,nodes), sizeof(*sns));
514   if(nodes == 0) sns[nodes++] = &self;
515   else {
516     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
517     const char *k;
518     void *v;
519     int klen;
520     while(noit_hash_next(&storagenode_to_info_cache,
521                          &iter, &k, &klen, &v)) {
522       sns[i++] = (storagenode_info *)v;
523     }
524   }
525   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
526
527   for(i=0; i<nodes; i++) {
528     if(pthread_create(&jobs[i], NULL,
529                       stratcon_ingest_check_loadall, sns[i]) != 0) {
530       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
531     }
532   }
533   for(i=0; i<nodes; i++) {
534     void *good;
535     pthread_join(jobs[i], &good);
536     tcnt += (int)(vpsized_int)good;
537   }
538   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
539   return 0;
540 }
541 static void
542 stratcon_ingest_iep_check_preload() {
543   eventer_t e;
544   conn_pool *cpool;
545
546   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
547   e = eventer_alloc();
548   e->mask = EVENTER_ASYNCH;
549   e->callback = stratcon_ingest_asynch_drive_iep;
550   e->closure = NULL;
551   eventer_add_asynch(cpool->jobq, e);
552 }
553 execute_outcome_t
554 stratcon_ingest_find(ds_rt_detail *d) {
555   conn_q *cq;
556   char *val;
557   int row_count;
558   struct realtime_tracker *node;
559
560   for(node = d->rt; node; node = node->next) {
561     char uuid_str[UUID_STR_LEN+1];
562     const char *fqdn, *dsn, *remote_cn;
563     char remote_ip[32];
564     int storagenode_id;
565
566     uuid_unparse_lower(node->checkid, uuid_str);
567     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
568                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
569       continue;
570
571     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
572           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
573
574     /* We might be able to find the IP from our config if someone has
575      * specified the expected cn in the noit definition.
576      */
577     if(stratcon_find_noit_ip_by_cn(remote_cn,
578                                    remote_ip, sizeof(remote_ip)) == 0) {
579       node->noit = strdup(remote_ip);
580       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
581       continue;
582     }
583
584     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
585     stratcon_database_connect(cq);
586
587     GET_QUERY(check_find);
588     DECLARE_PARAM_INT(node->sid);
589     PG_EXEC(check_find);
590     row_count = PQntuples(d->res);
591     if(row_count != 1) {
592       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
593       PQclear(d->res);
594       goto bad_row;
595     }
596
597     /* Get the remote_address (which noit owns this) */
598     PG_GET_STR_COL(val, 0, "remote_address");
599     if(!val) {
600       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
601       PQclear(d->res);
602       goto bad_row;
603     }
604     node->noit = strdup(val);
605     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
606    bad_row:
607     free_params((ds_single_detail *)d);
608     d->nparams = 0;
609     release_conn_q(cq);
610   }
611   return DS_EXEC_SUCCESS;
612 }
613 execute_outcome_t
614 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
615                         ds_line_detail *d) {
616   int type, len, sid;
617   char *final_buff;
618   uLong final_len, actual_final_len;
619   char *token;
620   char raddr_blank[1] = "";
621   const char *raddr;
622
623   type = d->data[0];
624   raddr = r ? r : raddr_blank;
625
626   /* Parse the log line, but only if we haven't already */
627   if(!d->nparams) {
628     char *scp, *ecp;
629
630     scp = d->data;
631 #define PROCESS_NEXT_FIELD(t,l) do { \
632   if(!*scp) goto bad_row; \
633   ecp = strchr(scp, '\t'); \
634   if(!ecp) goto bad_row; \
635   token = scp; \
636   len = (ecp-scp); \
637   scp = ecp + 1; \
638 } while(0)
639 #define PROCESS_LAST_FIELD(t,l) do { \
640   if(!*scp) ecp = scp; \
641   else { \
642     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
643     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
644   } \
645   t = scp; \
646   l = (ecp-scp); \
647 } while(0)
648
649     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
650     switch(type) {
651       /* See noit_check_log.c for log description */
652       case 'n':
653         DECLARE_PARAM_STR(raddr, strlen(raddr));
654         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
655         DECLARE_PARAM_STR("noitd",5); /* node_type */
656         PROCESS_NEXT_FIELD(token,len);
657         d->whence = (time_t)strtoul(token, NULL, 10);
658         DECLARE_PARAM_STR(token,len); /* timestamp */
659
660         /* This is the expected uncompressed len */
661         PROCESS_NEXT_FIELD(token,len);
662         final_len = atoi(token);
663         final_buff = malloc(final_len);
664         if(!final_buff) goto bad_row;
665  
666         /* The last token is b64 endoded and compressed.
667          * we need to decode it, declare it and then free it.
668          */
669         PROCESS_LAST_FIELD(token, len);
670         /* We can in-place decode this */
671         len = noit_b64_decode((char *)token, len,
672                               (unsigned char *)token, len);
673         if(len <= 0) {
674           noitL(noit_error, "noitd config base64 decoding error.\n");
675           free(final_buff);
676           goto bad_row;
677         }
678         actual_final_len = final_len;
679         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
680                               (unsigned char *)token, len)) {
681           noitL(noit_error, "noitd config decompression failure.\n");
682           free(final_buff);
683           goto bad_row;
684         }
685         if(final_len != actual_final_len) {
686           noitL(noit_error, "noitd config decompression error.\n");
687           free(final_buff);
688           goto bad_row;
689         }
690         DECLARE_PARAM_STR(final_buff, final_len);
691         free(final_buff);
692         break;
693       case 'C':
694         DECLARE_PARAM_STR(raddr, strlen(raddr));
695         PROCESS_NEXT_FIELD(token,len);
696         DECLARE_PARAM_STR(token,len); /* timestamp */
697         d->whence = (time_t)strtoul(token, NULL, 10);
698         PROCESS_NEXT_FIELD(token, len);
699         /* uuid is last 36 bytes */
700         if(len > 36) { token += (len-36); len = 36; }
701         sid = uuid_to_sid(token, remote_cn);
702         if(sid == 0) goto bad_row;
703         DECLARE_PARAM_INT(sid); /* sid */
704         DECLARE_PARAM_STR(token,len); /* uuid */
705         PROCESS_NEXT_FIELD(token, len);
706         DECLARE_PARAM_STR(token,len); /* target */
707         PROCESS_NEXT_FIELD(token, len);
708         DECLARE_PARAM_STR(token,len); /* module */
709         PROCESS_LAST_FIELD(token, len);
710         DECLARE_PARAM_STR(token,len); /* name */
711         break;
712       case 'M':
713         PROCESS_NEXT_FIELD(token,len);
714         DECLARE_PARAM_STR(token,len); /* timestamp */
715         d->whence = (time_t)strtoul(token, NULL, 10);
716         PROCESS_NEXT_FIELD(token, len);
717         /* uuid is last 36 bytes */
718         if(len > 36) { token += (len-36); len = 36; }
719         sid = uuid_to_sid(token, remote_cn);
720         if(sid == 0) goto bad_row;
721         DECLARE_PARAM_INT(sid); /* sid */
722         PROCESS_NEXT_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* name */
724         PROCESS_NEXT_FIELD(token,len);
725         d->metric_type = *token;
726         PROCESS_LAST_FIELD(token,len);
727         DECLARE_PARAM_STR(token,len); /* value */
728         break;
729       case 'S':
730         PROCESS_NEXT_FIELD(token,len);
731         DECLARE_PARAM_STR(token,len); /* timestamp */
732         d->whence = (time_t)strtoul(token, NULL, 10);
733         PROCESS_NEXT_FIELD(token, len);
734         /* uuid is last 36 bytes */
735         if(len > 36) { token += (len-36); len = 36; }
736         sid = uuid_to_sid(token, remote_cn);
737         if(sid == 0) goto bad_row;
738         DECLARE_PARAM_INT(sid); /* sid */
739         PROCESS_NEXT_FIELD(token, len);
740         DECLARE_PARAM_STR(token,len); /* state */
741         PROCESS_NEXT_FIELD(token, len);
742         DECLARE_PARAM_STR(token,len); /* availability */
743         PROCESS_NEXT_FIELD(token, len);
744         DECLARE_PARAM_STR(token,len); /* duration */
745         PROCESS_LAST_FIELD(token,len);
746         DECLARE_PARAM_STR(token,len); /* status */
747         break;
748       default:
749         goto bad_row;
750     }
751
752   }
753
754   /* Now execute the query */
755   switch(type) {
756     case 'n':
757       GET_QUERY(config_insert);
758       PG_EXEC(config_insert);
759       PQclear(d->res);
760       break;
761     case 'C':
762       GET_QUERY(check_insert);
763       PG_TM_EXEC(check_insert, d->whence);
764       PQclear(d->res);
765       break;
766     case 'S':
767       GET_QUERY(status_insert);
768       PG_TM_EXEC(status_insert, d->whence);
769       PQclear(d->res);
770       break;
771     case 'M':
772       switch(d->metric_type) {
773         case METRIC_INT32:
774         case METRIC_UINT32:
775         case METRIC_INT64:
776         case METRIC_UINT64:
777         case METRIC_DOUBLE:
778           GET_QUERY(metric_insert_numeric);
779           PG_TM_EXEC(metric_insert_numeric, d->whence);
780           PQclear(d->res);
781           break;
782         case METRIC_STRING:
783           GET_QUERY(metric_insert_text);
784           PG_TM_EXEC(metric_insert_text, d->whence);
785           PQclear(d->res);
786           break;
787         default:
788           goto bad_row;
789       }
790       break;
791     default:
792       /* should never get here */
793       goto bad_row;
794   }
795   return DS_EXEC_SUCCESS;
796  bad_row:
797   return DS_EXEC_ROW_FAILED;
798 }
799 static int
800 stratcon_database_post_connect(conn_q *cq) {
801   int rv = 0;
802   ds_single_detail _d = { 0 }, *d = &_d;
803   if(cq->fqdn) {
804     char *remote_str, *remote_cn;
805     /* This is the silly way we get null's in through our declare_param_str */
806     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
807     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
808     /* This is a storage node, it gets the storage node post_connect */
809     GET_QUERY(storage_post_connect);
810     rv = -1; /* now we're serious */
811     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
812     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
813     PG_EXEC(storage_post_connect);
814     PQclear(d->res);
815     rv = 0;
816   }
817   else {
818     /* Metanode post_connect */
819     GET_QUERY(metanode_post_connect);
820     rv = -1; /* now we're serious */
821     PG_EXEC(metanode_post_connect);
822     PQclear(d->res);
823     rv = 0;
824   }
825  bad_row:
826   free_params(d);
827   if(rv == -1) {
828     /* Post-connect intentions are serious and fatal */
829     PQfinish(cq->dbh);
830     cq->dbh = NULL;
831   }
832   return rv;
833 }
834 static int
835 stratcon_database_connect(conn_q *cq) {
836   char *dsn, dsn_meta[512];
837   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
838   const char *k, *v;
839   int klen;
840   noit_hash_table *t;
841
842   dsn_meta[0] = '\0';
843   if(!cq->dsn) {
844     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
845     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
846       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
847       strlcat(dsn_meta, k, sizeof(dsn_meta));
848       strlcat(dsn_meta, "=", sizeof(dsn_meta));
849       strlcat(dsn_meta, v, sizeof(dsn_meta));
850     }
851     noit_hash_destroy(t, free, free);
852     free(t);
853     dsn = dsn_meta;
854   }
855   else {
856     char options[32];
857     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
858     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
859                                options, sizeof(options))) {
860       strlcat(dsn_meta, " ", sizeof(dsn_meta));
861       strlcat(dsn_meta, "user", sizeof(dsn_meta));
862       strlcat(dsn_meta, "=", sizeof(dsn_meta));
863       strlcat(dsn_meta, options, sizeof(dsn_meta));
864     }
865     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
866                                options, sizeof(options))) {
867       strlcat(dsn_meta, " ", sizeof(dsn_meta));
868       strlcat(dsn_meta, "password", sizeof(dsn_meta));
869       strlcat(dsn_meta, "=", sizeof(dsn_meta));
870       strlcat(dsn_meta, options, sizeof(dsn_meta));
871     }
872     dsn = dsn_meta;
873   }
874
875   if(cq->dbh) {
876     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
877     PQreset(cq->dbh);
878     if(PQstatus(cq->dbh) != CONNECTION_OK) {
879       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
880             dsn, PQerrorMessage(cq->dbh));
881       return -1;
882     }
883     if(stratcon_database_post_connect(cq)) return -1;
884     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
885     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
886           dsn, PQerrorMessage(cq->dbh));
887     return -1;
888   }
889
890   cq->dbh = PQconnectdb(dsn);
891   if(!cq->dbh) return -1;
892   if(PQstatus(cq->dbh) != CONNECTION_OK) {
893     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
894           dsn, PQerrorMessage(cq->dbh));
895     return -1;
896   }
897   if(stratcon_database_post_connect(cq)) return -1;
898   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
899   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
900         dsn, PQerrorMessage(cq->dbh));
901   return -1;
902 }
903 static int
904 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
905                              const char *name) {
906   int rv = -1;
907   PGresult *res;
908   char cmd[128];
909   strlcpy(cmd, p, sizeof(cmd));
910   strlcat(cmd, name, sizeof(cmd));
911   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
912   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
913   PQclear(res);
914   return rv;
915 }
916 static int
917 stratcon_ingest_do(conn_q *cq, const char *cmd) {
918   PGresult *res;
919   int rv = -1;
920   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
921   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
922   PQclear(res);
923   return rv;
924 }
925 #define BUSTED(cq) do { \
926   PQfinish((cq)->dbh); \
927   (cq)->dbh = NULL; \
928   goto full_monty; \
929 } while(0)
930 #define SAVEPOINT(name) do { \
931   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
932   last_sp = current; \
933 } while(0)
934 #define ROLLBACK_TO_SAVEPOINT(name) do { \
935   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
936     BUSTED(cq); \
937   last_sp = NULL; \
938 } while(0)
939 #define RELEASE_SAVEPOINT(name) do { \
940   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
941     BUSTED(cq); \
942   last_sp = NULL; \
943 } while(0)
944
945 int
946 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
947                               struct timeval *now) {
948   ds_rt_detail *dsjd = closure;
949   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
950   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
951
952   assert(dsjd->rt);
953   stratcon_ingest_find(dsjd);
954   if(dsjd->completion_event)
955     eventer_add(dsjd->completion_event);
956
957   free_params((ds_single_detail *)dsjd);
958   free(dsjd);
959   return 0;
960 }
961 static void
962 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
963                                 eventer_t completion) {
964   eventer_t e;
965   conn_pool *cpool;
966   ds_rt_detail *rtdetail;
967
968   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
969   rtdetail = calloc(1, sizeof(*rtdetail));
970   rtdetail->rt = rt;
971   rtdetail->completion_event = completion;
972   e = eventer_alloc();
973   e->mask = EVENTER_ASYNCH;
974   e->callback = stratcon_ingest_asynch_lookup;
975   e->closure = rtdetail;
976   eventer_add_asynch(cpool->jobq, e);
977 }
978 static const char *
979 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
980   void *vinfo;
981   char *dsn = NULL, *fqdn = NULL;
982   int found = 0;
983   storagenode_info *info = NULL;
984   pthread_mutex_lock(&storagenode_to_info_cache_lock);
985   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
986                         &vinfo)) {
987     found = 1;
988     info = vinfo;
989   }
990   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
991   if(found) {
992     if(fqdn_out) *fqdn_out = info->fqdn;
993     return info->dsn;
994   }
995
996   if(!found && can_use_db) {
997     ds_single_detail *d;
998     conn_q *cq;
999     int row_count;
1000     /* Look it up and store it */
1001     d = calloc(1, sizeof(*d));
1002     cq = get_conn_q_for_metanode();
1003     GET_QUERY(find_storage);
1004     DECLARE_PARAM_INT(id);
1005     PG_EXEC(find_storage);
1006     row_count = PQntuples(d->res);
1007     if(row_count) {
1008       PG_GET_STR_COL(dsn, 0, "dsn");
1009       PG_GET_STR_COL(fqdn, 0, "fqdn");
1010       fqdn = fqdn ? strdup(fqdn) : NULL;
1011       dsn = dsn ? strdup(dsn) : NULL;
1012     }
1013     PQclear(d->res);
1014    bad_row:
1015     free_params(d);
1016     free(d);
1017     release_conn_q(cq);
1018   }
1019   if(fqdn) {
1020     info = calloc(1, sizeof(*info));
1021     info->fqdn = fqdn;
1022     if(fqdn_out) *fqdn_out = info->fqdn;
1023     info->dsn = dsn;
1024     info->storagenode_id = id;
1025     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1026     noit_hash_store(&storagenode_to_info_cache,
1027                     (void *)&info->storagenode_id, sizeof(int), info);
1028     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1029   }
1030   return info ? info->dsn : NULL;
1031 }
1032 static ds_line_detail *
1033 build_insert_batch(pg_interim_journal_t *ij) {
1034   int rv;
1035   off_t len;
1036   const char *buff, *cp, *lcp;
1037   struct stat st;
1038   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1039
1040   if(ij->fd < 0) {
1041     ij->fd = open(ij->filename, O_RDONLY);
1042     if(ij->fd < 0) {
1043       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1044             ij->filename, strerror(errno));
1045       assert(ij->fd >= 0);
1046     }
1047   }
1048   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1049   if(rv == -1) {
1050       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1051             ij->filename, strerror(errno));
1052     assert(rv != -1);
1053   }
1054   len = st.st_size;
1055   if(len > 0) {
1056     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1057     if(buff == (void *)-1) {
1058       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1059             ij->filename, strerror(errno));
1060       assert(buff != (void *)-1);
1061     }
1062     lcp = buff;
1063     while(lcp < (buff + len) &&
1064           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1065       next = calloc(1, sizeof(*next));
1066       next->data = malloc(cp - lcp + 1);
1067       memcpy(next->data, lcp, cp - lcp);
1068       next->data[cp - lcp] = '\0';
1069       if(!head) head = next;
1070       if(last) last->next = next;
1071       last = next;
1072       lcp = cp + 1;
1073     }
1074     munmap((void *)buff, len);
1075   }
1076   close(ij->fd);
1077   return head;
1078 }
1079 static void
1080 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1081   unlink(ij->filename);
1082   if(ij->filename) free(ij->filename);
1083   if(ij->remote_str) free(ij->remote_str);
1084   if(ij->remote_cn) free(ij->remote_cn);
1085   if(ij->fqdn) free(ij->fqdn);
1086   free(ij);
1087 }
1088 static int
1089 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1090                                struct timeval *now) {
1091   int i, total, success, sp_total, sp_success;
1092   pg_interim_journal_t *ij;
1093   ds_line_detail *head = NULL, *current, *last_sp;
1094   const char *dsn;
1095   conn_q *cq;
1096   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1097   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1098
1099   ij = closure;
1100   if(ij->fqdn == NULL) {
1101     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1102     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1103   }
1104   else {
1105     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1106   }
1107   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1108                              ij->fqdn, dsn);
1109   noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1110         ij->remote_str, ij->remote_cn, ij->fqdn);
1111  full_monty:
1112   /* Make sure we have a connection */
1113   i = 1;
1114   while(stratcon_database_connect(cq)) {
1115     noitL(noit_error, "Error connecting to database: %s\n",
1116           ij->fqdn ? ij->fqdn : "(null)");
1117     sleep(i);
1118     i *= 2;
1119     i = MIN(i, 16);
1120   }
1121
1122   if(head == NULL) head = build_insert_batch(ij);
1123   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1124         ij->remote_str ? ij->remote_str : "(null)",
1125         ij->remote_cn ? ij->remote_cn : "(null)",
1126         ij->fqdn ? ij->fqdn : "(null)");
1127   current = head;
1128   last_sp = NULL;
1129   total = success = sp_total = sp_success = 0;
1130   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1131   while(current) {
1132     execute_outcome_t rv;
1133     if(current->data) {
1134       if(!last_sp) {
1135         SAVEPOINT("batch");
1136         sp_success = success;
1137         sp_total = total;
1138       }
1139  
1140       if(current->problematic) {
1141         RELEASE_SAVEPOINT("batch");
1142         current = current->next;
1143         total++;
1144         continue;
1145       }
1146       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1147                                    current);
1148       switch(rv) {
1149         case DS_EXEC_SUCCESS:
1150           total++;
1151           success++;
1152           current = current->next;
1153           break;
1154         case DS_EXEC_ROW_FAILED:
1155           /* rollback to savepoint, mark this record as bad and start again */
1156           if(current->data[0] != 'n')
1157             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1158           current->problematic = 1;
1159           current = last_sp;
1160           success = sp_success;
1161           total = sp_total;
1162           ROLLBACK_TO_SAVEPOINT("batch");
1163           break;
1164         case DS_EXEC_TXN_FAILED:
1165           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1166           BUSTED(cq);
1167       }
1168     }
1169   }
1170   if(last_sp) RELEASE_SAVEPOINT("batch");
1171   if(stratcon_ingest_do(cq, "COMMIT")) {
1172     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1173     BUSTED(cq);
1174   }
1175   /* Cleanup the mess */
1176   while(head) {
1177     ds_line_detail *tofree;
1178     tofree = head;
1179     head = head->next;
1180     if(tofree->data) free(tofree->data);
1181     free_params((ds_single_detail *)tofree);
1182     free(tofree);
1183   }
1184   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1185         ij->remote_str ? ij->remote_str : "(null)",
1186         ij->remote_cn ? ij->remote_cn : "(null)",
1187         ij->fqdn ? ij->fqdn : "(null)", success, total);
1188   pg_interim_journal_remove(ij);
1189   release_conn_q(cq);
1190   return 0;
1191 }
1192 static int
1193 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1194                           int *sid_out, int *storagenode_id_out,
1195                           const char **remote_cn_out,
1196                           const char **fqdn_out, const char **dsn_out) {
1197   /* only called from the main thread -- no safety issues */
1198   void *vuuidinfo, *vinfo;
1199   uuid_info *uuidinfo;
1200   storagenode_info *info = NULL;
1201   char *fqdn = NULL;
1202   char *dsn = NULL;
1203   char *new_remote_cn = NULL;
1204   int storagenode_id = 0, sid = 0;
1205   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1206                          &vuuidinfo)) {
1207     int row_count = 0;
1208     char *tmpint;
1209     ds_single_detail *d;
1210     conn_q *cq;
1211
1212     /* We can't do a database lookup without the remote_cn */
1213     if(!remote_cn) {
1214       if(stratcon_datastore_get_enabled()) {
1215         /* We have an authoritatively maintained cache, we don't do lookups */
1216         return -1;
1217       }
1218       else
1219         remote_cn = "[[null]]";
1220     }
1221
1222     d = calloc(1, sizeof(*d));
1223     cq = get_conn_q_for_metanode();
1224     if(stratcon_database_connect(cq) == 0) {
1225       /* Blocking call to service the cache miss */
1226       GET_QUERY(check_map);
1227       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1228       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1229       PG_EXEC(check_map);
1230       row_count = PQntuples(d->res);
1231       if(row_count != 1) {
1232         PQclear(d->res);
1233         goto bad_row;
1234       }
1235       PG_GET_STR_COL(tmpint, 0, "sid");
1236       if(!tmpint) {
1237         row_count = 0;
1238         PQclear(d->res);
1239         goto bad_row;
1240       }
1241       sid = atoi(tmpint);
1242       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1243       if(tmpint) storagenode_id = atoi(tmpint);
1244       PG_GET_STR_COL(fqdn, 0, "fqdn");
1245       PG_GET_STR_COL(dsn, 0, "dsn");
1246       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1247       fqdn = fqdn ? strdup(fqdn) : NULL;
1248       dsn = dsn ? strdup(dsn) : NULL;
1249       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1250       PQclear(d->res);
1251     }
1252    bad_row:
1253     free_params((ds_single_detail *)d);
1254     free(d);
1255     release_conn_q(cq);
1256     if(row_count != 1) {
1257       return -1;
1258     }
1259     /* Place in cache */
1260     uuidinfo = calloc(1, sizeof(*uuidinfo));
1261     uuidinfo->sid = sid;
1262     uuidinfo->uuid_str = strdup(uuid_str);
1263     uuidinfo->storagenode_id = storagenode_id;
1264     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1265     noit_hash_store(&uuid_to_info_cache,
1266                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1267     /* Also, we may have just witnessed a new storage node, store it */
1268     if(storagenode_id) {
1269       int needs_free = 0;
1270       info = calloc(1, sizeof(*info));
1271       info->storagenode_id = storagenode_id;
1272       info->dsn = dsn ? strdup(dsn) : NULL;
1273       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1274       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1275       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1276                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1277         /* hack to save memory -- we *never* remove from these caches,
1278            so we can use the same fqdn value in the above cache for the key
1279            in the cache below -- (no strdup) */
1280         noit_hash_store(&storagenode_to_info_cache,
1281                         (void *)&info->storagenode_id, sizeof(int), info);
1282       }
1283       else needs_free = 1;
1284       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1285       if(needs_free) {
1286         if(info->dsn) free(info->dsn);
1287         if(info->fqdn) free(info->fqdn);
1288         free(info);
1289       }
1290     }
1291   }
1292   else
1293     uuidinfo = vuuidinfo;
1294
1295   if(uuidinfo && uuidinfo->storagenode_id) {
1296     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1297       /* we don't have dsn and we actually want it */
1298       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1299       if(noit_hash_retrieve(&storagenode_to_info_cache,
1300                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1301                             &vinfo))
1302         info = vinfo;
1303       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1304     }
1305   }
1306
1307   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1308   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1309   assert(uuidinfo);
1310   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1311   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1312   if(sid_out) *sid_out = uuidinfo->sid;
1313   if(fqdn) free(fqdn);
1314   if(dsn) free(dsn);
1315   if(new_remote_cn) free(new_remote_cn);
1316   return 0;
1317 }
1318 static int
1319 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1320   char uuid_str[UUID_STR_LEN+1];
1321   int sid = 0;
1322   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1323   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1324   return sid;
1325 }
1326
1327 static int
1328 stratcon_ingest_saveconfig() {
1329   int rv = -1;
1330   char *buff;
1331   ds_single_detail _d = { 0 }, *d = &_d;
1332   conn_q *cq;
1333   char ipv4_str[32];
1334   struct in_addr r, l;
1335
1336   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1337   memset(&l, 0, sizeof(l));
1338   noit_getip_ipv4(r, &l);
1339   /* Ignore the error.. what are we going to do anyway */
1340   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1341     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1342
1343   cq = get_conn_q_for_metanode();
1344
1345   if(stratcon_database_connect(cq) == 0) {
1346     char time_as_str[20];
1347     size_t len;
1348     buff = noit_conf_xml_in_mem(&len);
1349     if(!buff) goto bad_row;
1350
1351     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1352     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1353     DECLARE_PARAM_STR("", 0);
1354     DECLARE_PARAM_STR("stratcond", 9);
1355     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1356     DECLARE_PARAM_STR(buff, len);
1357     free(buff);
1358
1359     GET_QUERY(config_insert);
1360     PG_EXEC(config_insert);
1361     PQclear(d->res);
1362     rv = 0;
1363
1364     bad_row:
1365       free_params(d);
1366   }
1367   release_conn_q(cq);
1368   return rv;
1369 }
1370
1371 static void
1372 stratcon_ingest_launch_file_ingestion(char *path,
1373                                       char *remote_str, char *remote_cn,
1374                                       char *id_str) {
1375   pg_interim_journal_t *ij;
1376   eventer_t ingest;
1377
1378   ij = calloc(1, sizeof(*ij));
1379   ij->fd = open(path, O_RDONLY);
1380   if(ij->fd < 0) {
1381     noitL(noit_error, "cannot open journal '%s': %s\n",
1382           path, strerror(errno));
1383     free(ij);
1384     return;
1385   }
1386   close(ij->fd);
1387   ij->fd = -1;
1388   ij->filename = strdup(path);
1389   ij->remote_str = strdup(remote_str);
1390   ij->remote_cn = strdup(remote_cn);
1391   ij->storagenode_id = atoi(id_str);
1392   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1393                                        ij->fqdn);
1394   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1395   ingest = eventer_alloc();
1396   ingest->mask = EVENTER_ASYNCH;
1397   ingest->callback = stratcon_ingest_asynch_execute;
1398   ingest->closure = ij;
1399   eventer_add_asynch(ij->cpool->jobq, ingest);
1400 }
1401 static void
1402 stratcon_ingest_sweep_journals_int(char *first, char *second, char *third) {
1403   char path[PATH_MAX];
1404   DIR *root;
1405   struct dirent *de, *entry;
1406   int i = 0, cnt = 0;
1407   char **entries;
1408   int size = 0;
1409
1410   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1411            first ? "/" : "", first ? first : "",
1412            second ? "/" : "", second ? second : "",
1413            third ? "/" : "", third ? third : "");
1414 #ifdef _PC_NAME_MAX
1415   size = pathconf(path, _PC_NAME_MAX);
1416 #endif
1417   size = MIN(size, PATH_MAX + 128);
1418   de = alloca(size);
1419   root = opendir(path);
1420   if(!root) return;
1421   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1422   closedir(root);
1423   root = opendir(path);
1424   if(!root) return;
1425   entries = malloc(sizeof(*entries) * cnt);
1426   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1427     if(i < cnt) {
1428       entries[i++] = strdup(entry->d_name);
1429     }
1430   }
1431   closedir(root);
1432   cnt = i; /* could have changed, directories are fickle */
1433   qsort(entries, i, sizeof(*entries),
1434         (int (*)(const void *, const void *))strcasecmp);
1435   for(i=0; i<cnt; i++) {
1436     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1437     noitL(ds_deb, "Processing L%d entry '%s'\n",
1438           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1439     if(!first)
1440       stratcon_ingest_sweep_journals_int(entries[i], NULL, NULL);
1441     else if(!second)
1442       stratcon_ingest_sweep_journals_int(first, entries[i], NULL);
1443     else if(!third)
1444       stratcon_ingest_sweep_journals_int(first, second, entries[i]);
1445     else if(strlen(entries[i]) == 16) {
1446       char fullpath[PATH_MAX];
1447       snprintf(fullpath, sizeof(fullpath), "%s/%s/%s/%s/%s", basejpath,
1448                first,second,third,entries[i]);
1449       stratcon_ingest_launch_file_ingestion(fullpath,first,second,third);
1450     }
1451   }
1452   for(i=0; i<cnt; i++)
1453     free(entries[i]);
1454   free(entries);
1455 }
1456 static void
1457 stratcon_ingest_sweep_journals() {
1458   stratcon_ingest_sweep_journals_int(NULL,NULL,NULL);
1459 }
1460
1461 int
1462 stratcon_ingest_all_storagenode_info() {
1463   int i, cnt = 0;
1464   ds_single_detail _d = { 0 }, *d = &_d;
1465   conn_q *cq;
1466   cq = get_conn_q_for_metanode();
1467
1468   while(stratcon_database_connect(cq)) {
1469     noitL(noit_error, "Error connecting to database\n");
1470     sleep(1);
1471   }
1472
1473   GET_QUERY(all_storage);
1474   PG_EXEC(all_storage);
1475   cnt = PQntuples(d->res);
1476   for(i=0; i<cnt; i++) {
1477     void *vinfo;
1478     char *tmpint, *fqdn, *dsn;
1479     int storagenode_id;
1480     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1481     storagenode_id = atoi(tmpint);
1482     PG_GET_STR_COL(fqdn, i, "fqdn");
1483     PG_GET_STR_COL(dsn, i, "dsn");
1484     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1485     storagenode_id = tmpint ? atoi(tmpint) : 0;
1486
1487     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1488                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1489       storagenode_info *info;
1490       info = calloc(1, sizeof(*info));
1491       info->storagenode_id = storagenode_id;
1492       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1493       info->dsn = dsn ? strdup(dsn) : NULL;
1494       noit_hash_store(&storagenode_to_info_cache,
1495                       (void *)&info->storagenode_id, sizeof(int), info);
1496       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1497             info->storagenode_id,
1498             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1499     }
1500   }
1501   PQclear(d->res);
1502  bad_row:
1503   free_params(d);
1504
1505   release_conn_q(cq);
1506   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1507   return cnt;
1508 }
1509 int
1510 stratcon_ingest_all_check_info() {
1511   int i, cnt, loaded = 0;
1512   ds_single_detail _d = { 0 }, *d = &_d;
1513   conn_q *cq;
1514   cq = get_conn_q_for_metanode();
1515
1516   while(stratcon_database_connect(cq)) {
1517     noitL(noit_error, "Error connecting to database\n");
1518     sleep(1);
1519   }
1520
1521   GET_QUERY(check_mapall);
1522   PG_EXEC(check_mapall);
1523   cnt = PQntuples(d->res);
1524   for(i=0; i<cnt; i++) {
1525     void *vinfo;
1526     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1527     int sid, storagenode_id;
1528     uuid_info *uuidinfo;
1529     PG_GET_STR_COL(uuid_str, i, "id");
1530     if(!uuid_str) continue;
1531     PG_GET_STR_COL(tmpint, i, "sid");
1532     if(!tmpint) continue;
1533     sid = atoi(tmpint);
1534     PG_GET_STR_COL(fqdn, i, "fqdn");
1535     PG_GET_STR_COL(dsn, i, "dsn");
1536     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1537     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1538     storagenode_id = tmpint ? atoi(tmpint) : 0;
1539
1540     uuidinfo = calloc(1, sizeof(*uuidinfo));
1541     uuidinfo->uuid_str = strdup(uuid_str);
1542     uuidinfo->remote_cn = strdup(remote_cn);
1543     uuidinfo->storagenode_id = storagenode_id;
1544     uuidinfo->sid = sid;
1545     noit_hash_store(&uuid_to_info_cache,
1546                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1547     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1548           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1549     loaded++;
1550     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1551                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1552       storagenode_info *info;
1553       info = calloc(1, sizeof(*info));
1554       info->storagenode_id = storagenode_id;
1555       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1556       info->dsn = dsn ? strdup(dsn) : NULL;
1557       noit_hash_store(&storagenode_to_info_cache,
1558                       (void *)&info->storagenode_id, sizeof(int), info);
1559       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1560             info->storagenode_id,
1561             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1562     }
1563   }
1564   PQclear(d->res);
1565  bad_row:
1566   free_params(d);
1567
1568   release_conn_q(cq);
1569   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1570   return loaded;
1571 }
1572
1573 static int
1574 rest_get_noit_config(noit_http_rest_closure_t *restc,
1575                      int npats, char **pats) {
1576   noit_http_session_ctx *ctx = restc->http_ctx;
1577   ds_single_detail *d;
1578   int row_count = 0;
1579   const char *xml = NULL;
1580   conn_q *cq = NULL;
1581
1582   if(npats != 0) {
1583     noit_http_response_server_error(ctx, "text/xml");
1584     noit_http_response_end(ctx);
1585     return 0;
1586   }
1587   d = calloc(1, sizeof(*d));
1588   GET_QUERY(config_get);
1589   cq = get_conn_q_for_metanode();
1590   if(!cq) {
1591     noit_http_response_server_error(ctx, "text/xml");
1592     goto bad_row;
1593   }
1594
1595   DECLARE_PARAM_STR(restc->remote_cn,
1596                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1597   PG_EXEC(config_get);
1598   row_count = PQntuples(d->res);
1599   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1600
1601   if(xml == NULL) {
1602     char buff[1024];
1603     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1604                                  "<row_count>%d</row_count></error>\n",
1605              restc->remote_cn, row_count);
1606     noit_http_response_append(ctx, buff, strlen(buff));
1607     noit_http_response_not_found(ctx, "text/xml");
1608   }
1609   else {
1610     noit_http_response_append(ctx, xml, strlen(xml));
1611     noit_http_response_ok(ctx, "text/xml");
1612   }
1613  bad_row:
1614   free_params((ds_single_detail *)d);
1615   d->nparams = 0;
1616   if(cq) release_conn_q(cq);
1617
1618   noit_http_response_end(ctx);
1619   return 0;
1620 }
1621
1622 static ingestor_api_t postgres_ingestor_api = {
1623   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1624   .iep_check_preload = stratcon_ingest_iep_check_preload,
1625   .storage_node_lookup = storage_node_quick_lookup,
1626   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1627   .get_noit_config = NULL,
1628   .save_config = stratcon_ingest_saveconfig
1629 };
1630
1631 static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
1632   return 0;
1633 }
1634 static int postgres_ingestor_onload(noit_image_t *self) {
1635   return 0;
1636 }
1637 static int postgres_ingestor_init(noit_module_generic_t *self) {
1638   pthread_mutex_init(&ds_conns_lock, NULL);
1639   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1640   ds_err = noit_log_stream_find("error/datastore");
1641   ds_deb = noit_log_stream_find("debug/datastore");
1642   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1643   ingest_err = noit_log_stream_find("error/ingest");
1644   if(!ds_err) ds_err = noit_error;
1645   if(!ingest_err) ingest_err = noit_error;
1646   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1647                            &basejpath)) {
1648     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1649     exit(-1);
1650   }
1651   stratcon_ingest_all_check_info();
1652   stratcon_ingest_all_storagenode_info();
1653   stratcon_ingest_sweep_journals();
1654   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1655 }
1656
1657 noit_module_generic_t postgres_ingestor = {
1658   {
1659     NOIT_GENERIC_MAGIC,
1660     NOIT_GENERIC_ABI_VERSION,
1661     "postgres_ingestor",
1662     "postgres drive for data ingestion",
1663     "",
1664     postgres_ingestor_onload,
1665   }, 
1666   postgres_ingestor_config,
1667   postgres_ingestor_init
1668 };
Note: See TracBrowser for help on using the browser.