root/src/modules/postgres_ingestor.c

Revision 94eee0ed35ac7112cd855488e6a2240f9169d335, 51.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 1 year ago)

use concurrency kit for hash tables

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "noit_module.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_b64.h"
38 #include "utils/noit_str.h"
39 #include "utils/noit_mkdir.h"
40 #include "utils/noit_getip.h"
41 #include "utils/noit_watchdog.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_realtime_http.h"
44 #include "stratcon_iep.h"
45 #include "noit_conf.h"
46 #include "noit_check.h"
47 #include "noit_check_log_helpers.h"
48 #include "noit_rest.h"
49 #include <unistd.h>
50 #include <fcntl.h>
51 #include <netinet/in.h>
52 #include <sys/un.h>
53 #include <dirent.h>
54 #include <arpa/inet.h>
55 #include <sys/mman.h>
56 #include <libpq-fe.h>
57 #include <zlib.h>
58 #include <assert.h>
59 #include <errno.h>
60 #include "postgres_ingestor.xmlh"
61 #include "bundle.pb-c.h"
62
63 #define DECL_STMT(codename,confname) \
64 static char *codename = NULL; \
65 static const char *codename##_conf = "/stratcon/database/statements/" #confname
66
67 DECL_STMT(storage_post_connect, storagepostconnect);
68 DECL_STMT(metanode_post_connect, metanodepostconnect);
69 DECL_STMT(find_storage, findstoragenode);
70 DECL_STMT(all_storage, allstoragenodes);
71 DECL_STMT(check_map, mapchecktostoragenode);
72 DECL_STMT(check_mapall, mapallchecks);
73 DECL_STMT(check_loadall, allchecks);
74 DECL_STMT(check_find, findcheck);
75 DECL_STMT(check_insert, check);
76 DECL_STMT(status_insert, status);
77 DECL_STMT(metric_insert_numeric, metric_numeric);
78 DECL_STMT(metric_insert_text, metric_text);
79 DECL_STMT(config_insert, config);
80 DECL_STMT(config_get, findconfig);
81
82 static noit_log_stream_t ds_err = NULL;
83 static noit_log_stream_t ds_deb = NULL;
84 static noit_log_stream_t ds_pool_deb = NULL;
85 static noit_log_stream_t ingest_err = NULL;
86
87 #define GET_QUERY(a) do { \
88   if(a == NULL) \
89     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
90       goto bad_row; \
91 } while(0)
92
93 struct conn_q;
94
95 typedef struct {
96   char            *queue_name; /* the key fqdn+remote_sn */
97   eventer_jobq_t  *jobq;
98   struct conn_q   *head;
99   pthread_mutex_t  lock;
100   pthread_cond_t   cv;
101   int              ttl;
102   int              in_pool;
103   int              outstanding;
104   int              max_allocated;
105   int              max_in_pool;
106 } conn_pool;
107 typedef struct conn_q {
108   time_t           last_use;
109   char            *dsn;        /* Pg connect string */
110   char            *remote_str; /* the IP of the noit*/
111   char            *remote_cn;  /* the Cert CN of the noit */
112   char            *fqdn;       /* the fqdn of the storage node */
113   conn_pool       *pool;
114   struct conn_q   *next;
115   /* Postgres specific stuff */
116   PGconn          *dbh;
117 } conn_q;
118
119
120 #define MAX_PARAMS 8
121 #define POSTGRES_PARTS \
122   PGresult *res; \
123   int rv; \
124   time_t whence; \
125   int nparams; \
126   int metric_type; \
127   char *paramValues[MAX_PARAMS]; \
128   int paramLengths[MAX_PARAMS]; \
129   int paramFormats[MAX_PARAMS]; \
130   int paramAllocd[MAX_PARAMS];
131
132 typedef struct ds_single_detail {
133   POSTGRES_PARTS
134 } ds_single_detail;
135 typedef struct {
136   /* Postgres specific stuff */
137   POSTGRES_PARTS
138   struct realtime_tracker *rt;
139   conn_q *cq; /* connection on which to perform this job */
140   eventer_t completion_event; /* This event should be registered if non NULL */
141 } ds_rt_detail;
142 typedef struct ds_line_detail {
143   /* Postgres specific stuff */
144   POSTGRES_PARTS
145   char *data;
146   int problematic;
147   struct ds_line_detail *next;
148 } ds_line_detail;
149
150 typedef struct {
151   char *remote_str;
152   char *remote_cn;
153   char *fqdn;
154   int storagenode_id;
155   int fd;
156   char *filename;
157   conn_pool *cpool;
158 } pg_interim_journal_t;
159
160 static int stratcon_database_connect(conn_q *cq);
161 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
162 static int storage_node_quick_lookup(const char *uuid_str,
163                                      const char *remote_cn,
164                                      int *sid_out, int *storagenode_id_out,
165                                      const char **remote_cn_out,
166                                      const char **fqdn_out,
167                                      const char **dsn_out);
168
169 static void
170 free_params(ds_single_detail *d) {
171   int i;
172   for(i=0; i<d->nparams; i++)
173     if(d->paramAllocd[i] && d->paramValues[i])
174       free(d->paramValues[i]);
175 }
176
177 static char *basejpath = NULL;
178 static pthread_mutex_t ds_conns_lock;
179 static noit_hash_table ds_conns;
180
181 /* the fqdn cache needs to be thread safe */
182 typedef struct {
183   char *uuid_str;
184   char *remote_cn;
185   int storagenode_id;
186   int sid;
187 } uuid_info;
188 typedef struct {
189   int storagenode_id;
190   char *fqdn;
191   char *dsn;
192 } storagenode_info;
193 noit_hash_table uuid_to_info_cache;
194 pthread_mutex_t storagenode_to_info_cache_lock;
195 noit_hash_table storagenode_to_info_cache;
196
197 /* Thread-safe connection pools */
198
199 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
200 static void
201 release_conn_q_forceable(conn_q *cq, int forcefree) {
202   int putback = 0;
203   cq->last_use = time(NULL);
204   pthread_mutex_lock(&cq->pool->lock);
205   cq->pool->outstanding--;
206   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
207     putback = 1;
208     cq->next = cq->pool->head;
209     cq->pool->head = cq;
210     cq->pool->in_pool++;
211   }
212   pthread_mutex_unlock(&cq->pool->lock);
213   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)(vpsized_int)pthread_self(),
214         putback ? "to pool" : "and destroy", cq->pool->queue_name);
215   pthread_cond_signal(&cq->pool->cv);
216   if(putback) return;
217
218   /* Not put back, release it */
219   if(cq->dbh) PQfinish(cq->dbh);
220   if(cq->remote_str) free(cq->remote_str);
221   if(cq->remote_cn) free(cq->remote_cn);
222   if(cq->fqdn) free(cq->fqdn);
223   if(cq->dsn) free(cq->dsn);
224   free(cq);
225 }
226 static void
227 ttl_purge_conn_pool(conn_pool *pool) {
228   int old_cnt, new_cnt;
229   time_t now = time(NULL);
230   conn_q *cq, *prev = NULL, *iter;
231   /* because we always replace on the head and update the last_use time when
232      doing so, we know they are ordered LRU on the end.  So, once we hit an
233      old one, we know all the others are old too.
234    */
235   if(!pool->head) return; /* hack short circuit for no locks */
236   pthread_mutex_lock(&pool->lock);
237   old_cnt = pool->in_pool;
238   cq = pool->head;
239   while(cq) {
240     if(cq->last_use + cq->pool->ttl < now) {
241       if(prev) prev->next = NULL;
242       else pool->head = NULL;
243       break;
244     }
245     prev = cq;
246     cq = cq->next;
247   }
248   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
249   /* Fix accounting */
250   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
251   new_cnt = pool->in_pool;
252   pthread_mutex_unlock(&pool->lock);
253
254   /* Force release these without holding the lock */
255   while(cq) {
256     prev = cq;
257     cq = cq->next;
258     release_conn_q_forceable(prev, 1);
259   }
260   if(old_cnt != new_cnt)
261     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
262           pool->queue_name);
263 }
264 static void
265 release_conn_q(conn_q *cq) {
266   ttl_purge_conn_pool(cq->pool);
267   release_conn_q_forceable(cq, 0);
268 }
269 static conn_pool *
270 get_conn_pool_for_remote(const char *remote_str,
271                          const char *remote_cn, const char *fqdn) {
272   void *vcpool;
273   conn_pool *cpool = NULL;
274   char queue_name[256] = "datastore_";
275   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
276            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
277            fqdn ? fqdn : "default",
278            remote_cn ? remote_cn : "default");
279   pthread_mutex_lock(&ds_conns_lock);
280   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
281                         strlen(queue_name), &vcpool))
282     cpool = vcpool;
283   pthread_mutex_unlock(&ds_conns_lock);
284   if(!cpool) {
285     vcpool = cpool = calloc(1, sizeof(*cpool));
286     cpool->queue_name = strdup(queue_name);
287     pthread_mutex_init(&cpool->lock, NULL);
288     pthread_cond_init(&cpool->cv, NULL);
289     cpool->in_pool = 0;
290     cpool->outstanding = 0;
291     cpool->max_in_pool = 1;
292     cpool->max_allocated = 1;
293     pthread_mutex_lock(&ds_conns_lock);
294     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
295                         cpool)) {
296       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
297                          strlen(queue_name), &vcpool);
298     }
299     pthread_mutex_unlock(&ds_conns_lock);
300     if(vcpool != cpool) {
301       /* someone beat us to it */
302       free(cpool->queue_name);
303       pthread_mutex_destroy(&cpool->lock);
304       pthread_cond_destroy(&cpool->cv);
305       free(cpool);
306     }
307     else {
308       int i;
309       /* Our job to setup the pool */
310       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
311       eventer_jobq_init(cpool->jobq, queue_name);
312       /* Add one thread */
313       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
314         eventer_jobq_increase_concurrency(cpool->jobq);
315     }
316     cpool = vcpool;
317   }
318   return cpool;
319 }
320 static conn_q *
321 get_conn_q_for_remote(const char *remote_str,
322                       const char *remote_cn, const char *fqdn,
323                       const char *dsn) {
324   conn_pool *cpool;
325   conn_q *cq;
326   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
327   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)(vpsized_int)pthread_self(),
328         cpool->queue_name);
329   pthread_mutex_lock(&cpool->lock);
330  again:
331   if(cpool->head) {
332     assert(cpool->in_pool > 0);
333     cq = cpool->head;
334     cpool->head = cq->next;
335     cpool->in_pool--;
336     cpool->outstanding++;
337     cq->next = NULL;
338     pthread_mutex_unlock(&cpool->lock);
339     return cq;
340   }
341   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
342     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
343           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
344     pthread_cond_wait(&cpool->cv, &cpool->lock);
345     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
346           (void *)(vpsized_int)pthread_self(), cpool->queue_name);
347     goto again;
348   }
349   else {
350     cpool->outstanding++;
351     pthread_mutex_unlock(&cpool->lock);
352   }
353  
354   cq = calloc(1, sizeof(*cq));
355   cq->pool = cpool;
356   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
357   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
358   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
359   cq->dsn = dsn ? strdup(dsn) : NULL;
360   return cq;
361 }
362 static conn_q *
363 get_conn_q_for_metanode() {
364   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
365 }
366
367 typedef enum {
368   DS_EXEC_SUCCESS = 0,
369   DS_EXEC_ROW_FAILED = 1,
370   DS_EXEC_TXN_FAILED = 2,
371 } execute_outcome_t;
372
373 #define DECLARE_PARAM_STR(str, len) do { \
374   d->paramValues[d->nparams] = noit__strndup(str, len); \
375   d->paramLengths[d->nparams] = len; \
376   d->paramFormats[d->nparams] = 0; \
377   d->paramAllocd[d->nparams] = 1; \
378   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
379     free(d->paramValues[d->nparams]); \
380     d->paramValues[d->nparams] = NULL; \
381     d->paramLengths[d->nparams] = 0; \
382     d->paramAllocd[d->nparams] = 0; \
383   } \
384   d->nparams++; \
385 } while(0)
386 #define DECLARE_PARAM_INT(i) do { \
387   int buffer__len; \
388   char buffer__[32]; \
389   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
390   buffer__len = strlen(buffer__); \
391   DECLARE_PARAM_STR(buffer__, buffer__len); \
392 } while(0)
393
394 #define PG_GET_STR_COL(dest, row, name) do { \
395   int colnum = PQfnumber(d->res, name); \
396   dest = NULL; \
397   if (colnum >= 0) \
398     dest = PQgetisnull(d->res, row, colnum) \
399          ? NULL : PQgetvalue(d->res, row, colnum); \
400 } while(0)
401
402 #define PG_EXEC(cmd) do { \
403   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
404                         (const char * const *)d->paramValues, \
405                         d->paramLengths, d->paramFormats, 0); \
406   d->rv = PQresultStatus(d->res); \
407   if(d->rv != PGRES_COMMAND_OK && \
408      d->rv != PGRES_TUPLES_OK) { \
409     const char *pgerr = PQresultErrorMessage(d->res); \
410     const char *pgerr_end = strchr(pgerr, '\n'); \
411     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
412     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
413           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
414           (int)(pgerr_end - pgerr), pgerr); \
415     PQclear(d->res); \
416     goto bad_row; \
417   } \
418 } while(0)
419
420 #define PG_TM_EXEC(cmd, whence) do { \
421   time_t __w = whence; \
422   char cmdbuf[4096]; \
423   struct tm tbuf, *tm; \
424   tm = gmtime_r(&__w, &tbuf); \
425   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
426   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
427                         (const char * const *)d->paramValues, \
428                         d->paramLengths, d->paramFormats, 0); \
429   d->rv = PQresultStatus(d->res); \
430   if(d->rv != PGRES_COMMAND_OK && \
431      d->rv != PGRES_TUPLES_OK) { \
432     const char *pgerr = PQresultErrorMessage(d->res); \
433     const char *pgerr_end = strchr(pgerr, '\n'); \
434     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
435     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
436           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
437           (long long unsigned)whence); \
438     PQclear(d->res); \
439     goto bad_row; \
440   } \
441 } while(0)
442
443 static void *
444 stratcon_ingest_check_loadall(void *vsn) {
445   storagenode_info *sn = vsn;
446   ds_single_detail *d;
447   int i, row_count = 0, good = 0;
448   char buff[1024];
449   conn_q *cq = NULL;
450
451   d = calloc(1, sizeof(*d));
452   GET_QUERY(check_loadall);
453   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
454   i = 0;
455   while(stratcon_database_connect(cq)) {
456     if(i++ > 4) {
457       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
458       release_conn_q(cq);
459       free(d);
460       return (void *)(vpsized_int)good;
461     }
462     sleep(1);
463   }
464   PG_EXEC(check_loadall);
465   row_count = PQntuples(d->res);
466  
467   for(i=0; i<row_count; i++) {
468     int rv;
469     int8_t family;
470     struct sockaddr *sin;
471     struct sockaddr_in sin4 = { .sin_family = AF_INET };
472     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
473     char *remote, *id, *target, *module, *name;
474     PG_GET_STR_COL(remote, i, "remote_address");
475     PG_GET_STR_COL(id, i, "id");
476     PG_GET_STR_COL(target, i, "target");
477     PG_GET_STR_COL(module, i, "module");
478     PG_GET_STR_COL(name, i, "name");
479     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
480
481     family = AF_INET;
482     sin = (struct sockaddr *)&sin4;
483     rv = inet_pton(family, remote, &sin4.sin_addr);
484     if(rv != 1) {
485       family = AF_INET6;
486       sin = (struct sockaddr *)&sin6;
487       rv = inet_pton(family, remote, &sin6.sin6_addr);
488       if(rv != 1) {
489         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
490         sin = NULL;
491       }
492     }
493
494     /* stratcon_iep_line_processor takes an allocated operand and frees it */
495     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
496     good++;
497   }
498   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
499         good, row_count, sn->fqdn);
500  bad_row:
501   free_params((ds_single_detail *)d);
502   free(d);
503   if(cq) release_conn_q(cq);
504   return (void *)(vpsized_int)good;
505 }
506 static int
507 stratcon_ingest_asynch_drive_iep(eventer_t e, int mask, void *closure,
508                                  struct timeval *now) {
509   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
510   pthread_t *jobs = NULL;
511   int nodes, i = 0, tcnt = 0;
512   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
513   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
514
515   pthread_mutex_lock(&storagenode_to_info_cache_lock);
516   nodes = noit_hash_size(&storagenode_to_info_cache);
517   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
518   sns = calloc(MAX(1,nodes), sizeof(*sns));
519   if(nodes == 0) sns[nodes++] = &self;
520   else {
521     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
522     const char *k;
523     void *v;
524     int klen;
525     while(noit_hash_next(&storagenode_to_info_cache,
526                          &iter, &k, &klen, &v)) {
527       sns[i++] = (storagenode_info *)v;
528     }
529   }
530   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
531
532   for(i=0; i<nodes; i++) {
533     if(pthread_create(&jobs[i], NULL,
534                       stratcon_ingest_check_loadall, sns[i]) != 0) {
535       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
536     }
537   }
538   for(i=0; i<nodes; i++) {
539     void *good;
540     pthread_join(jobs[i], &good);
541     tcnt += (int)(vpsized_int)good;
542   }
543   free(jobs);
544   free(sns);
545   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
546   return 0;
547 }
548 static void
549 stratcon_ingest_iep_check_preload() {
550   eventer_t e;
551   conn_pool *cpool;
552
553   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
554   e = eventer_alloc();
555   e->mask = EVENTER_ASYNCH;
556   e->callback = stratcon_ingest_asynch_drive_iep;
557   e->closure = NULL;
558   eventer_add_asynch(cpool->jobq, e);
559 }
560 execute_outcome_t
561 stratcon_ingest_find(ds_rt_detail *d) {
562   conn_q *cq;
563   char *val;
564   int row_count;
565   struct realtime_tracker *node;
566
567   for(node = d->rt; node; node = node->next) {
568     char uuid_str[UUID_STR_LEN+1];
569     const char *fqdn, *dsn, *remote_cn;
570     char remote_ip[32];
571     int storagenode_id;
572
573     uuid_unparse_lower(node->checkid, uuid_str);
574     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
575                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
576       continue;
577
578     noitL(noit_debug, "stratcon_ingest_find <- (%d, %s) @ %s\n",
579           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
580
581     /* We might be able to find the IP from our config if someone has
582      * specified the expected cn in the noit definition.
583      */
584     if(stratcon_find_noit_ip_by_cn(remote_cn,
585                                    remote_ip, sizeof(remote_ip)) == 0) {
586       node->noit = strdup(remote_ip);
587       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
588       continue;
589     }
590
591     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
592     if(stratcon_database_connect(cq) != 0) goto bad_row;
593
594     GET_QUERY(check_find);
595     DECLARE_PARAM_INT(node->sid);
596     PG_EXEC(check_find);
597     row_count = PQntuples(d->res);
598     if(row_count != 1) {
599       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
600       PQclear(d->res);
601       goto bad_row;
602     }
603
604     /* Get the remote_address (which noit owns this) */
605     PG_GET_STR_COL(val, 0, "remote_address");
606     if(!val) {
607       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
608       PQclear(d->res);
609       goto bad_row;
610     }
611     node->noit = strdup(val);
612     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
613    bad_row:
614     free_params((ds_single_detail *)d);
615     d->nparams = 0;
616     release_conn_q(cq);
617   }
618   return DS_EXEC_SUCCESS;
619 }
620 execute_outcome_t
621 stratcon_ingest_execute(conn_q *cq, const char *r, const char *remote_cn,
622                         ds_line_detail *d) {
623   int type, len, sid;
624   char *final_buff;
625   uLong final_len, actual_final_len;
626   char *token;
627   char raddr_blank[1] = "";
628   const char *raddr;
629
630   type = d->data[0];
631   raddr = r ? r : raddr_blank;
632
633   /* Parse the log line, but only if we haven't already */
634   if(!d->nparams) {
635     char *scp, *ecp;
636
637     scp = d->data;
638 #define PROCESS_NEXT_FIELD(t,l) do { \
639   if(!*scp) goto bad_row; \
640   ecp = strchr(scp, '\t'); \
641   if(!ecp) goto bad_row; \
642   token = scp; \
643   len = (ecp-scp); \
644   scp = ecp + 1; \
645 } while(0)
646 #define PROCESS_LAST_FIELD(t,l) do { \
647   if(!*scp) ecp = scp; \
648   else { \
649     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
650     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
651   } \
652   t = scp; \
653   l = (ecp-scp); \
654 } while(0)
655
656     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
657     switch(type) {
658       /* See noit_check_log.c for log description */
659       case 'n':
660         DECLARE_PARAM_STR(raddr, strlen(raddr));
661         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
662         DECLARE_PARAM_STR("noitd",5); /* node_type */
663         PROCESS_NEXT_FIELD(token,len);
664         d->whence = (time_t)strtoul(token, NULL, 10);
665         DECLARE_PARAM_STR(token,len); /* timestamp */
666
667         /* This is the expected uncompressed len */
668         PROCESS_NEXT_FIELD(token,len);
669         final_len = atoi(token);
670         final_buff = malloc(final_len);
671         if(!final_buff) goto bad_row;
672  
673         /* The last token is b64 endoded and compressed.
674          * we need to decode it, declare it and then free it.
675          */
676         PROCESS_LAST_FIELD(token, len);
677         /* We can in-place decode this */
678         len = noit_b64_decode((char *)token, len,
679                               (unsigned char *)token, len);
680         if(len <= 0) {
681           noitL(noit_error, "noitd config base64 decoding error.\n");
682           free(final_buff);
683           goto bad_row;
684         }
685         actual_final_len = final_len;
686         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
687                               (unsigned char *)token, len)) {
688           noitL(noit_error, "noitd config decompression failure.\n");
689           free(final_buff);
690           goto bad_row;
691         }
692         if(final_len != actual_final_len) {
693           noitL(noit_error, "noitd config decompression error.\n");
694           free(final_buff);
695           goto bad_row;
696         }
697         DECLARE_PARAM_STR(final_buff, final_len);
698         free(final_buff);
699         break;
700       case 'D':
701         break;
702       case 'C':
703         DECLARE_PARAM_STR(raddr, strlen(raddr));
704         PROCESS_NEXT_FIELD(token,len);
705         DECLARE_PARAM_STR(token,len); /* timestamp */
706         d->whence = (time_t)strtoul(token, NULL, 10);
707         PROCESS_NEXT_FIELD(token, len);
708         /* uuid is last 36 bytes */
709         if(len > 36) { token += (len-36); len = 36; }
710         sid = uuid_to_sid(token, remote_cn);
711         if(sid == 0) goto bad_row;
712         DECLARE_PARAM_INT(sid); /* sid */
713         DECLARE_PARAM_STR(token,len); /* uuid */
714         PROCESS_NEXT_FIELD(token, len);
715         DECLARE_PARAM_STR(token,len); /* target */
716         PROCESS_NEXT_FIELD(token, len);
717         DECLARE_PARAM_STR(token,len); /* module */
718         PROCESS_LAST_FIELD(token, len);
719         DECLARE_PARAM_STR(token,len); /* name */
720         break;
721       case 'M':
722         PROCESS_NEXT_FIELD(token,len);
723         DECLARE_PARAM_STR(token,len); /* timestamp */
724         d->whence = (time_t)strtoul(token, NULL, 10);
725         PROCESS_NEXT_FIELD(token, len);
726         /* uuid is last 36 bytes */
727         if(len > 36) { token += (len-36); len = 36; }
728         sid = uuid_to_sid(token, remote_cn);
729         if(sid == 0) goto bad_row;
730         DECLARE_PARAM_INT(sid); /* sid */
731         PROCESS_NEXT_FIELD(token, len);
732         DECLARE_PARAM_STR(token,len); /* name */
733         PROCESS_NEXT_FIELD(token,len);
734         d->metric_type = *token;
735         PROCESS_LAST_FIELD(token,len);
736         DECLARE_PARAM_STR(token,len); /* value */
737         break;
738       case 'S':
739         PROCESS_NEXT_FIELD(token,len);
740         DECLARE_PARAM_STR(token,len); /* timestamp */
741         d->whence = (time_t)strtoul(token, NULL, 10);
742         PROCESS_NEXT_FIELD(token, len);
743         /* uuid is last 36 bytes */
744         if(len > 36) { token += (len-36); len = 36; }
745         sid = uuid_to_sid(token, remote_cn);
746         if(sid == 0) goto bad_row;
747         DECLARE_PARAM_INT(sid); /* sid */
748         PROCESS_NEXT_FIELD(token, len);
749         DECLARE_PARAM_STR(token,len); /* state */
750         PROCESS_NEXT_FIELD(token, len);
751         DECLARE_PARAM_STR(token,len); /* availability */
752         PROCESS_NEXT_FIELD(token, len);
753         DECLARE_PARAM_STR(token,len); /* duration */
754         PROCESS_LAST_FIELD(token,len);
755         DECLARE_PARAM_STR(token,len); /* status */
756         break;
757       default:
758         goto bad_row;
759     }
760
761   }
762
763   /* Now execute the query */
764   switch(type) {
765     case 'n':
766       GET_QUERY(config_insert);
767       PG_EXEC(config_insert);
768       PQclear(d->res);
769       break;
770     case 'C':
771       GET_QUERY(check_insert);
772       PG_TM_EXEC(check_insert, d->whence);
773       PQclear(d->res);
774       break;
775     case 'S':
776       GET_QUERY(status_insert);
777       PG_TM_EXEC(status_insert, d->whence);
778       PQclear(d->res);
779       break;
780     case 'D':
781       break;
782     case 'M':
783       switch(d->metric_type) {
784         case METRIC_INT32:
785         case METRIC_UINT32:
786         case METRIC_INT64:
787         case METRIC_UINT64:
788         case METRIC_DOUBLE:
789           GET_QUERY(metric_insert_numeric);
790           PG_TM_EXEC(metric_insert_numeric, d->whence);
791           PQclear(d->res);
792           break;
793         case METRIC_STRING:
794           GET_QUERY(metric_insert_text);
795           PG_TM_EXEC(metric_insert_text, d->whence);
796           PQclear(d->res);
797           break;
798         default:
799           goto bad_row;
800       }
801       break;
802     default:
803       /* should never get here */
804       goto bad_row;
805   }
806   return DS_EXEC_SUCCESS;
807  bad_row:
808   return DS_EXEC_ROW_FAILED;
809 }
810 static int
811 stratcon_database_post_connect(conn_q *cq) {
812   int rv = 0;
813   ds_single_detail _d = { 0 }, *d = &_d;
814   if(cq->fqdn) {
815     char *remote_str, *remote_cn;
816     /* This is the silly way we get null's in through our declare_param_str */
817     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
818     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
819     /* This is a storage node, it gets the storage node post_connect */
820     GET_QUERY(storage_post_connect);
821     rv = -1; /* now we're serious */
822     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
823     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
824     PG_EXEC(storage_post_connect);
825     PQclear(d->res);
826     rv = 0;
827   }
828   else {
829     /* Metanode post_connect */
830     GET_QUERY(metanode_post_connect);
831     rv = -1; /* now we're serious */
832     PG_EXEC(metanode_post_connect);
833     PQclear(d->res);
834     rv = 0;
835   }
836  bad_row:
837   free_params(d);
838   if(rv == -1) {
839     /* Post-connect intentions are serious and fatal */
840     PQfinish(cq->dbh);
841     cq->dbh = NULL;
842   }
843   return rv;
844 }
845 static int
846 stratcon_database_connect(conn_q *cq) {
847   char *dsn, dsn_meta[512];
848   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
849   const char *k, *v;
850   int klen;
851   noit_hash_table *t;
852
853   dsn_meta[0] = '\0';
854   if(!cq->dsn) {
855     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
856     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
857       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
858       strlcat(dsn_meta, k, sizeof(dsn_meta));
859       strlcat(dsn_meta, "=", sizeof(dsn_meta));
860       strlcat(dsn_meta, v, sizeof(dsn_meta));
861     }
862     noit_hash_destroy(t, free, free);
863     free(t);
864     dsn = dsn_meta;
865   }
866   else {
867     char options[32];
868     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
869     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
870                                options, sizeof(options))) {
871       strlcat(dsn_meta, " ", sizeof(dsn_meta));
872       strlcat(dsn_meta, "user", sizeof(dsn_meta));
873       strlcat(dsn_meta, "=", sizeof(dsn_meta));
874       strlcat(dsn_meta, options, sizeof(dsn_meta));
875     }
876     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
877                                options, sizeof(options))) {
878       strlcat(dsn_meta, " ", sizeof(dsn_meta));
879       strlcat(dsn_meta, "password", sizeof(dsn_meta));
880       strlcat(dsn_meta, "=", sizeof(dsn_meta));
881       strlcat(dsn_meta, options, sizeof(dsn_meta));
882     }
883     dsn = dsn_meta;
884   }
885
886   if(cq->dbh) {
887     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
888     PQreset(cq->dbh);
889     if(PQstatus(cq->dbh) != CONNECTION_OK) {
890       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
891             dsn, PQerrorMessage(cq->dbh));
892       return -1;
893     }
894     if(stratcon_database_post_connect(cq)) return -1;
895     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
896     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
897           dsn, PQerrorMessage(cq->dbh));
898     return -1;
899   }
900
901   cq->dbh = PQconnectdb(dsn);
902   if(!cq->dbh) return -1;
903   if(PQstatus(cq->dbh) != CONNECTION_OK) {
904     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
905           dsn, PQerrorMessage(cq->dbh));
906     return -1;
907   }
908   if(stratcon_database_post_connect(cq)) return -1;
909   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
910   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
911         dsn, PQerrorMessage(cq->dbh));
912   return -1;
913 }
914 static int
915 stratcon_ingest_savepoint_op(conn_q *cq, const char *p,
916                              const char *name) {
917   int rv = -1;
918   PGresult *res;
919   char cmd[128];
920   strlcpy(cmd, p, sizeof(cmd));
921   strlcat(cmd, name, sizeof(cmd));
922   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
923   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
924   PQclear(res);
925   return rv;
926 }
927 static int
928 stratcon_ingest_do(conn_q *cq, const char *cmd) {
929   PGresult *res;
930   int rv = -1;
931   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
932   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
933   PQclear(res);
934   return rv;
935 }
936 #define BUSTED(cq) do { \
937   PQfinish((cq)->dbh); \
938   (cq)->dbh = NULL; \
939   goto full_monty; \
940 } while(0)
941 #define SAVEPOINT(name) do { \
942   if(stratcon_ingest_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
943   last_sp = current; \
944 } while(0)
945 #define ROLLBACK_TO_SAVEPOINT(name) do { \
946   if(stratcon_ingest_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
947     BUSTED(cq); \
948   last_sp = NULL; \
949 } while(0)
950 #define RELEASE_SAVEPOINT(name) do { \
951   if(stratcon_ingest_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
952     BUSTED(cq); \
953   last_sp = NULL; \
954 } while(0)
955
956 int
957 stratcon_ingest_asynch_lookup(eventer_t e, int mask, void *closure,
958                               struct timeval *now) {
959   ds_rt_detail *dsjd = closure;
960   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
961   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
962
963   assert(dsjd->rt);
964   stratcon_ingest_find(dsjd);
965   if(dsjd->completion_event)
966     eventer_add(dsjd->completion_event);
967
968   free_params((ds_single_detail *)dsjd);
969   free(dsjd);
970   return 0;
971 }
972 static void
973 stratcon_ingestor_submit_lookup(struct realtime_tracker *rt,
974                                 eventer_t completion) {
975   eventer_t e;
976   conn_pool *cpool;
977   ds_rt_detail *rtdetail;
978
979   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
980   rtdetail = calloc(1, sizeof(*rtdetail));
981   rtdetail->rt = rt;
982   rtdetail->completion_event = completion;
983   e = eventer_alloc();
984   e->mask = EVENTER_ASYNCH;
985   e->callback = stratcon_ingest_asynch_lookup;
986   e->closure = rtdetail;
987   eventer_add_asynch(cpool->jobq, e);
988 }
989 static const char *
990 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
991   void *vinfo;
992   char *dsn = NULL, *fqdn = NULL;
993   int found = 0;
994   storagenode_info *info = NULL;
995   pthread_mutex_lock(&storagenode_to_info_cache_lock);
996   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
997                         &vinfo)) {
998     found = 1;
999     info = vinfo;
1000   }
1001   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1002   if(found) {
1003     if(fqdn_out) *fqdn_out = info->fqdn;
1004     return info->dsn;
1005   }
1006
1007   if(!found && can_use_db) {
1008     ds_single_detail *d;
1009     conn_q *cq;
1010     int row_count;
1011     /* Look it up and store it */
1012     d = calloc(1, sizeof(*d));
1013     cq = get_conn_q_for_metanode();
1014     GET_QUERY(find_storage);
1015     DECLARE_PARAM_INT(id);
1016     PG_EXEC(find_storage);
1017     row_count = PQntuples(d->res);
1018     if(row_count) {
1019       PG_GET_STR_COL(dsn, 0, "dsn");
1020       PG_GET_STR_COL(fqdn, 0, "fqdn");
1021       fqdn = fqdn ? strdup(fqdn) : NULL;
1022       dsn = dsn ? strdup(dsn) : NULL;
1023     }
1024     PQclear(d->res);
1025    bad_row:
1026     free_params(d);
1027     free(d);
1028     release_conn_q(cq);
1029   }
1030   if(fqdn) {
1031     info = calloc(1, sizeof(*info));
1032     info->fqdn = fqdn;
1033     if(fqdn_out) *fqdn_out = info->fqdn;
1034     info->dsn = dsn;
1035     info->storagenode_id = id;
1036     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1037     noit_hash_store(&storagenode_to_info_cache,
1038                     (void *)&info->storagenode_id, sizeof(int), info);
1039     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1040   }
1041   return info ? info->dsn : NULL;
1042 }
1043 static void
1044 expand_b_record(ds_line_detail **head, ds_line_detail **last,
1045                 const char *line, int len) {
1046   char **outrows;
1047   int i, cnt;
1048   ds_line_detail *next;
1049
1050   cnt = noit_check_log_b_to_sm(line, len, &outrows);
1051   for(i=0;i<cnt;i++) {
1052     if(outrows[i] == NULL) continue;
1053     next = calloc(sizeof(*next), 1);
1054     next->data = outrows[i];
1055     if(!*head) *head = next;
1056     if(*last) (*last)->next = next;
1057     *last = next;
1058   }
1059   if(outrows) free(outrows);
1060 }
1061 static ds_line_detail *
1062 build_insert_batch(pg_interim_journal_t *ij) {
1063   int rv;
1064   off_t len;
1065   const char *buff, *cp, *lcp;
1066   struct stat st;
1067   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1068
1069   if(ij->fd < 0) {
1070     ij->fd = open(ij->filename, O_RDONLY);
1071     if(ij->fd < 0) {
1072       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1073             ij->filename, strerror(errno));
1074       assert(ij->fd >= 0);
1075     }
1076   }
1077   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1078   if(rv == -1) {
1079       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1080             ij->filename, strerror(errno));
1081     assert(rv != -1);
1082   }
1083   len = st.st_size;
1084   if(len > 0) {
1085     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1086     if(buff == (void *)-1) {
1087       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1088             ij->filename, strerror(errno));
1089       assert(buff != (void *)-1);
1090     }
1091     lcp = buff;
1092     while(lcp < (buff + len) &&
1093           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1094       if(lcp[0] == 'B' && lcp[1] != '\0' && lcp[2] == '\t') {
1095       /* Bundle records are special and need to be expanded into
1096        * traditional records here
1097        */
1098         noit_compression_type_t ctype = NOIT_COMPRESS_NONE;
1099         switch(lcp[1]) {
1100           case '1': /* version 1 */
1101             ctype = NOIT_COMPRESS_ZLIB; /*no break fall through */
1102           case '2': /* version 2 */
1103               expand_b_record(&head, &last, lcp, cp - lcp);
1104             break;
1105           default:
1106             noitL(noit_error, "unknown bundle version %c\n", lcp[1]);
1107         }
1108       }
1109       else {
1110         next = calloc(1, sizeof(*next));
1111         next->data = malloc(cp - lcp + 1);
1112         memcpy(next->data, lcp, cp - lcp);
1113         next->data[cp - lcp] = '\0';
1114         if(!head) head = next;
1115         if(last) last->next = next;
1116         last = next;
1117       }
1118       lcp = cp + 1;
1119     }
1120     munmap((void *)buff, len);
1121   }
1122   close(ij->fd);
1123   return head;
1124 }
1125 static void
1126 pg_interim_journal_remove(pg_interim_journal_t *ij) {
1127   unlink(ij->filename);
1128   free(ij->filename);
1129   if(ij->remote_str) free(ij->remote_str);
1130   if(ij->remote_cn) free(ij->remote_cn);
1131   if(ij->fqdn) free(ij->fqdn);
1132   free(ij);
1133 }
1134 static int
1135 stratcon_ingest_asynch_execute(eventer_t e, int mask, void *closure,
1136                                struct timeval *now) {
1137   int i, total, success, sp_total, sp_success;
1138   pg_interim_journal_t *ij;
1139   ds_line_detail *head = NULL, *current, *last_sp;
1140   const char *dsn;
1141   conn_q *cq;
1142   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1143   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1144
1145   ij = closure;
1146   if(ij->fqdn == NULL) {
1147     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1148     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1149   }
1150   else {
1151     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1152   }
1153   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1154                              ij->fqdn, dsn);
1155   noitL(ds_deb, "stratcon_ingest_asynch_execute[%s,%s,%s]\n",
1156         ij->remote_str, ij->remote_cn, ij->fqdn);
1157  full_monty:
1158   /* Make sure we have a connection */
1159   i = 1;
1160   while(stratcon_database_connect(cq)) {
1161     noitL(noit_error, "Error connecting to database: %s\n",
1162           ij->fqdn ? ij->fqdn : "(null)");
1163     sleep(i);
1164     i *= 2;
1165     i = MIN(i, 16);
1166   }
1167
1168   if(head == NULL) head = build_insert_batch(ij);
1169   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1170         ij->remote_str ? ij->remote_str : "(null)",
1171         ij->remote_cn ? ij->remote_cn : "(null)",
1172         ij->fqdn ? ij->fqdn : "(null)");
1173   current = head;
1174   last_sp = NULL;
1175   total = success = sp_total = sp_success = 0;
1176   if(stratcon_ingest_do(cq, "BEGIN")) BUSTED(cq);
1177   while(current) {
1178     execute_outcome_t rv;
1179     if(current->data) {
1180       if(!last_sp) {
1181         SAVEPOINT("batch");
1182         sp_success = success;
1183         sp_total = total;
1184       }
1185  
1186       if(current->problematic) {
1187         RELEASE_SAVEPOINT("batch");
1188         current = current->next;
1189         total++;
1190         continue;
1191       }
1192       rv = stratcon_ingest_execute(cq, cq->remote_str, cq->remote_cn,
1193                                    current);
1194       switch(rv) {
1195         case DS_EXEC_SUCCESS:
1196           total++;
1197           success++;
1198           current = current->next;
1199           break;
1200         case DS_EXEC_ROW_FAILED:
1201           /* rollback to savepoint, mark this record as bad and start again */
1202           if(current->data[0] != 'n')
1203             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1204           current->problematic = 1;
1205           current = last_sp;
1206           success = sp_success;
1207           total = sp_total;
1208           ROLLBACK_TO_SAVEPOINT("batch");
1209           break;
1210         case DS_EXEC_TXN_FAILED:
1211           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1212           BUSTED(cq);
1213       }
1214     }
1215   }
1216   if(last_sp) RELEASE_SAVEPOINT("batch");
1217   if(stratcon_ingest_do(cq, "COMMIT")) {
1218     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1219     BUSTED(cq);
1220   }
1221   /* Cleanup the mess */
1222   while(head) {
1223     ds_line_detail *tofree;
1224     tofree = head;
1225     head = head->next;
1226     if(tofree->data) free(tofree->data);
1227     free_params((ds_single_detail *)tofree);
1228     free(tofree);
1229   }
1230   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1231         ij->remote_str ? ij->remote_str : "(null)",
1232         ij->remote_cn ? ij->remote_cn : "(null)",
1233         ij->fqdn ? ij->fqdn : "(null)", success, total);
1234   pg_interim_journal_remove(ij);
1235   release_conn_q(cq);
1236   return 0;
1237 }
1238 static int
1239 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1240                           int *sid_out, int *storagenode_id_out,
1241                           const char **remote_cn_out,
1242                           const char **fqdn_out, const char **dsn_out) {
1243   /* only called from the main thread -- no safety issues */
1244   void *vuuidinfo, *vinfo;
1245   uuid_info *uuidinfo;
1246   storagenode_info *info = NULL;
1247   char *fqdn = NULL;
1248   char *dsn = NULL;
1249   char *new_remote_cn = NULL;
1250   int storagenode_id = 0, sid = 0;
1251   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1252                          &vuuidinfo)) {
1253     int row_count = 0;
1254     char *tmpint;
1255     ds_single_detail *d;
1256     conn_q *cq;
1257
1258     /* We can't do a database lookup without the remote_cn */
1259     if(!remote_cn) {
1260       if(stratcon_datastore_get_enabled()) {
1261         /* We have an authoritatively maintained cache, we don't do lookups */
1262         return -1;
1263       }
1264       else
1265         remote_cn = "[[null]]";
1266     }
1267
1268     d = calloc(1, sizeof(*d));
1269     cq = get_conn_q_for_metanode();
1270     if(stratcon_database_connect(cq) == 0) {
1271       /* Blocking call to service the cache miss */
1272       GET_QUERY(check_map);
1273       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1274       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1275       PG_EXEC(check_map);
1276       row_count = PQntuples(d->res);
1277       if(row_count != 1) {
1278         PQclear(d->res);
1279         goto bad_row;
1280       }
1281       PG_GET_STR_COL(tmpint, 0, "sid");
1282       if(!tmpint) {
1283         row_count = 0;
1284         PQclear(d->res);
1285         goto bad_row;
1286       }
1287       sid = atoi(tmpint);
1288       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1289       if(tmpint) storagenode_id = atoi(tmpint);
1290       PG_GET_STR_COL(fqdn, 0, "fqdn");
1291       PG_GET_STR_COL(dsn, 0, "dsn");
1292       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1293       fqdn = fqdn ? strdup(fqdn) : NULL;
1294       dsn = dsn ? strdup(dsn) : NULL;
1295       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1296       PQclear(d->res);
1297     }
1298    bad_row:
1299     free_params((ds_single_detail *)d);
1300     free(d);
1301     release_conn_q(cq);
1302     if(row_count != 1) {
1303       return -1;
1304     }
1305     /* Place in cache */
1306     uuidinfo = calloc(1, sizeof(*uuidinfo));
1307     uuidinfo->sid = sid;
1308     uuidinfo->uuid_str = strdup(uuid_str);
1309     uuidinfo->storagenode_id = storagenode_id;
1310     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1311     noit_hash_store(&uuid_to_info_cache,
1312                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1313     /* Also, we may have just witnessed a new storage node, store it */
1314     if(storagenode_id) {
1315       int needs_free = 0;
1316       info = calloc(1, sizeof(*info));
1317       info->storagenode_id = storagenode_id;
1318       info->dsn = dsn ? strdup(dsn) : NULL;
1319       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1320       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1321       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1322                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1323         /* hack to save memory -- we *never* remove from these caches,
1324            so we can use the same fqdn value in the above cache for the key
1325            in the cache below -- (no strdup) */
1326         noit_hash_store(&storagenode_to_info_cache,
1327                         (void *)&info->storagenode_id, sizeof(int), info);
1328       }
1329       else needs_free = 1;
1330       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1331       if(needs_free) {
1332         if(info->dsn) free(info->dsn);
1333         if(info->fqdn) free(info->fqdn);
1334         free(info);
1335       }
1336     }
1337   }
1338   else
1339     uuidinfo = vuuidinfo;
1340
1341   if(uuidinfo && uuidinfo->storagenode_id) {
1342     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1343       /* we don't have dsn and we actually want it */
1344       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1345       if(noit_hash_retrieve(&storagenode_to_info_cache,
1346                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1347                             &vinfo))
1348         info = vinfo;
1349       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1350     }
1351   }
1352
1353   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1354   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1355   assert(uuidinfo);
1356   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1357   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1358   if(sid_out) *sid_out = uuidinfo->sid;
1359   if(fqdn) free(fqdn);
1360   if(dsn) free(dsn);
1361   if(new_remote_cn) free(new_remote_cn);
1362   return 0;
1363 }
1364 static int
1365 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1366   char uuid_str[UUID_STR_LEN+1];
1367   int sid = 0;
1368   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1369   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1370   return sid;
1371 }
1372
1373 static int
1374 stratcon_ingest_saveconfig() {
1375   int rv = -1;
1376   char *buff;
1377   ds_single_detail _d = { 0 }, *d = &_d;
1378   conn_q *cq;
1379   char ipv4_str[32];
1380   struct in_addr r, l;
1381
1382   r.s_addr = htonl((4 << 24) | (2 << 16) | (2 << 8) | 1);
1383   memset(&l, 0, sizeof(l));
1384   noit_getip_ipv4(r, &l);
1385   /* Ignore the error.. what are we going to do anyway */
1386   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1387     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1388
1389   cq = get_conn_q_for_metanode();
1390
1391   if(stratcon_database_connect(cq) == 0) {
1392     char time_as_str[20];
1393     size_t len;
1394     buff = noit_conf_xml_in_mem(&len);
1395     if(!buff) goto bad_row;
1396
1397     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1398     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1399     DECLARE_PARAM_STR("", 0);
1400     DECLARE_PARAM_STR("stratcond", 9);
1401     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1402     DECLARE_PARAM_STR(buff, len);
1403     free(buff);
1404
1405     GET_QUERY(config_insert);
1406     PG_EXEC(config_insert);
1407     PQclear(d->res);
1408     rv = 0;
1409
1410     bad_row:
1411       free_params(d);
1412   }
1413   release_conn_q(cq);
1414   return rv;
1415 }
1416
1417 static int
1418 stratcon_ingest_launch_file_ingestion(const char *path,
1419                                       const char *remote_str,
1420                                       const char *remote_cn,
1421                                       const char *id_str) {
1422   pg_interim_journal_t *ij;
1423   char pgfile[PATH_MAX];
1424   eventer_t ingest;
1425
1426   if(strcmp(path + strlen(path) - 3, ".pg")) {
1427     snprintf(pgfile, sizeof(pgfile), "%s.pg", path);
1428     if(link(path, pgfile) < 0 && errno != EEXIST) {
1429       noitL(noit_error, "cannot link journal %s: %s\n", path, strerror(errno));
1430       return -1;
1431     }
1432   }
1433   else
1434     strlcpy(pgfile, path, sizeof(pgfile));
1435   ij = calloc(1, sizeof(*ij));
1436   ij->fd = open(pgfile, O_RDONLY);
1437   if(ij->fd < 0) {
1438     noitL(noit_error, "cannot open journal '%s': %s\n",
1439           pgfile, strerror(errno));
1440     free(ij);
1441     return -1;
1442   }
1443   close(ij->fd);
1444   ij->fd = -1;
1445   ij->filename = strdup(pgfile);
1446   ij->remote_str = strdup(remote_str);
1447   ij->remote_cn = strdup(remote_cn);
1448   ij->storagenode_id = atoi(id_str);
1449   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1450                                        ij->fqdn);
1451   noitL(noit_debug, "ingesting payload: %s\n", ij->filename);
1452   ingest = eventer_alloc();
1453   ingest->mask = EVENTER_ASYNCH;
1454   ingest->callback = stratcon_ingest_asynch_execute;
1455   ingest->closure = ij;
1456   eventer_add_asynch(ij->cpool->jobq, ingest);
1457   return 0;
1458 }
1459
1460 int
1461 stratcon_ingest_all_storagenode_info() {
1462   int i, cnt = 0;
1463   ds_single_detail _d = { 0 }, *d = &_d;
1464   conn_q *cq;
1465   cq = get_conn_q_for_metanode();
1466
1467   while(stratcon_database_connect(cq)) {
1468     noitL(noit_error, "Error connecting to database\n");
1469     sleep(1);
1470   }
1471
1472   GET_QUERY(all_storage);
1473   PG_EXEC(all_storage);
1474   cnt = PQntuples(d->res);
1475   for(i=0; i<cnt; i++) {
1476     void *vinfo;
1477     char *tmpint, *fqdn, *dsn;
1478     int storagenode_id;
1479     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1480     storagenode_id = atoi(tmpint);
1481     PG_GET_STR_COL(fqdn, i, "fqdn");
1482     PG_GET_STR_COL(dsn, i, "dsn");
1483     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1484     storagenode_id = tmpint ? atoi(tmpint) : 0;
1485
1486     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1487                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1488       storagenode_info *info;
1489       info = calloc(1, sizeof(*info));
1490       info->storagenode_id = storagenode_id;
1491       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1492       info->dsn = dsn ? strdup(dsn) : NULL;
1493       noit_hash_store(&storagenode_to_info_cache,
1494                       (void *)&info->storagenode_id, sizeof(int), info);
1495       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1496             info->storagenode_id,
1497             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1498     }
1499     noit_watchdog_child_heartbeat();
1500   }
1501   PQclear(d->res);
1502  bad_row:
1503   free_params(d);
1504
1505   release_conn_q(cq);
1506   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1507   return cnt;
1508 }
1509 int
1510 stratcon_ingest_all_check_info() {
1511   int i, cnt, loaded = 0;
1512   ds_single_detail _d = { 0 }, *d = &_d;
1513   conn_q *cq;
1514   cq = get_conn_q_for_metanode();
1515
1516   while(stratcon_database_connect(cq)) {
1517     noitL(noit_error, "Error connecting to database\n");
1518     sleep(1);
1519   }
1520
1521   GET_QUERY(check_mapall);
1522   PG_EXEC(check_mapall);
1523   cnt = PQntuples(d->res);
1524   for(i=0; i<cnt; i++) {
1525     void *vinfo;
1526     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1527     int sid, storagenode_id;
1528     uuid_info *uuidinfo;
1529     PG_GET_STR_COL(uuid_str, i, "id");
1530     if(!uuid_str) continue;
1531     PG_GET_STR_COL(tmpint, i, "sid");
1532     if(!tmpint) continue;
1533     sid = atoi(tmpint);
1534     PG_GET_STR_COL(fqdn, i, "fqdn");
1535     PG_GET_STR_COL(dsn, i, "dsn");
1536     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1537     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1538     storagenode_id = tmpint ? atoi(tmpint) : 0;
1539
1540     uuidinfo = calloc(1, sizeof(*uuidinfo));
1541     uuidinfo->uuid_str = strdup(uuid_str);
1542     uuidinfo->remote_cn = strdup(remote_cn);
1543     uuidinfo->storagenode_id = storagenode_id;
1544     uuidinfo->sid = sid;
1545     noit_hash_store(&uuid_to_info_cache,
1546                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1547     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1548           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1549     loaded++;
1550     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1551                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1552       storagenode_info *info;
1553       info = calloc(1, sizeof(*info));
1554       info->storagenode_id = storagenode_id;
1555       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1556       info->dsn = dsn ? strdup(dsn) : NULL;
1557       noit_hash_store(&storagenode_to_info_cache,
1558                       (void *)&info->storagenode_id, sizeof(int), info);
1559       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1560             info->storagenode_id,
1561             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1562     }
1563     noit_watchdog_child_heartbeat();
1564   }
1565   PQclear(d->res);
1566  bad_row:
1567   free_params(d);
1568
1569   release_conn_q(cq);
1570   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1571   return loaded;
1572 }
1573
1574 static char *
1575 stratcon_get_noit_config(const char *cn) {
1576   ds_single_detail *d;
1577   int row_count = 0;
1578   const char *xml = NULL;
1579   char *xmlcopy = NULL;
1580   conn_q *cq = NULL;
1581
1582   d = calloc(1, sizeof(*d));
1583   GET_QUERY(config_get);
1584   cq = get_conn_q_for_metanode();
1585   if(!cq) goto bad_row;
1586
1587   DECLARE_PARAM_STR(cn, cn ? strlen(cn) : 0);
1588   PG_EXEC(config_get);
1589   row_count = PQntuples(d->res);
1590   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1591
1592   if(xml) xmlcopy = strdup(xml);
1593
1594  bad_row:
1595   free_params((ds_single_detail *)d);
1596   d->nparams = 0;
1597   if(cq) release_conn_q(cq);
1598   free(d);
1599
1600   return xmlcopy;
1601 }
1602
1603 static ingestor_api_t postgres_ingestor_api = {
1604   .launch_file_ingestion = stratcon_ingest_launch_file_ingestion,
1605   .iep_check_preload = stratcon_ingest_iep_check_preload,
1606   .storage_node_lookup = storage_node_quick_lookup,
1607   .submit_realtime_lookup = stratcon_ingestor_submit_lookup,
1608   .get_noit_config = stratcon_get_noit_config,
1609   .save_config = stratcon_ingest_saveconfig
1610 };
1611
1612 static int postgres_ingestor_config(noit_module_generic_t *self, noit_hash_table *o) {
1613   return 0;
1614 }
1615 static int postgres_ingestor_onload(noit_image_t *self) {
1616   return 0;
1617 }
1618 static int is_postgres_ingestor_file(const char *file) {
1619   noit_watchdog_child_heartbeat();
1620   return (strlen(file) == 19 && !strcmp(file + 16, ".pg"));
1621 }
1622 static int postgres_ingestor_init(noit_module_generic_t *self) {
1623   stratcon_datastore_core_init();
1624   pthread_mutex_init(&ds_conns_lock, NULL);
1625   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1626   ds_err = noit_log_stream_find("error/datastore");
1627   ds_deb = noit_log_stream_find("debug/datastore");
1628   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1629   ingest_err = noit_log_stream_find("error/ingest");
1630   if(!ds_err) ds_err = noit_error;
1631   if(!ingest_err) ingest_err = noit_error;
1632   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1633                            &basejpath)) {
1634     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1635     exit(-1);
1636   }
1637   stratcon_ingest_all_check_info();
1638   stratcon_ingest_all_storagenode_info();
1639   stratcon_ingest_sweep_journals(is_postgres_ingestor_file,
1640                                  stratcon_ingest_launch_file_ingestion);
1641   return stratcon_datastore_set_ingestor(&postgres_ingestor_api);
1642 }
1643
1644 noit_module_generic_t postgres_ingestor = {
1645   {
1646     .magic = NOIT_GENERIC_MAGIC,
1647     .version = NOIT_GENERIC_ABI_VERSION,
1648     .name = "postgres_ingestor",
1649     .description = "postgres drive for data ingestion",
1650     .xml_description = postgres_ingestor_xml_description,
1651     .onload = postgres_ingestor_onload,
1652   }, 
1653   postgres_ingestor_config,
1654   postgres_ingestor_init
1655 };
Note: See TracBrowser for help on using the browser.