root/src/stratcon_datastore.c

Revision 3c6cba18a654f50a87118dfd7fb1e7117ac02f43, 57.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 years ago)

fixes #285

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 /* Thread-safe connection pools */
209
210 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
211 static void
212 release_conn_q_forceable(conn_q *cq, int forcefree) {
213   int putback = 0;
214   cq->last_use = time(NULL);
215   pthread_mutex_lock(&cq->pool->lock);
216   cq->pool->outstanding--;
217   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
218     putback = 1;
219     cq->next = cq->pool->head;
220     cq->pool->head = cq;
221     cq->pool->in_pool++;
222   }
223   pthread_mutex_unlock(&cq->pool->lock);
224   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
225         putback ? "to pool" : "and destroy", cq->pool->queue_name);
226   pthread_cond_signal(&cq->pool->cv);
227   if(putback) return;
228
229   /* Not put back, release it */
230   if(cq->dbh) PQfinish(cq->dbh);
231   if(cq->remote_str) free(cq->remote_str);
232   if(cq->remote_cn) free(cq->remote_cn);
233   if(cq->fqdn) free(cq->fqdn);
234   if(cq->dsn) free(cq->dsn);
235   free(cq);
236 }
237 static void
238 ttl_purge_conn_pool(conn_pool *pool) {
239   int old_cnt, new_cnt;
240   time_t now = time(NULL);
241   conn_q *cq, *prev = NULL, *iter;
242   /* because we always replace on the head and update the last_use time when
243      doing so, we know they are ordered LRU on the end.  So, once we hit an
244      old one, we know all the others are old too.
245    */
246   if(!pool->head) return; /* hack short circuit for no locks */
247   pthread_mutex_lock(&pool->lock);
248   old_cnt = pool->in_pool;
249   cq = pool->head;
250   while(cq) {
251     if(cq->last_use + cq->pool->ttl < now) {
252       if(prev) prev->next = NULL;
253       else pool->head = NULL;
254       break;
255     }
256     prev = cq;
257     cq = cq->next;
258   }
259   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
260   /* Fix accounting */
261   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
262   new_cnt = pool->in_pool;
263   pthread_mutex_unlock(&pool->lock);
264
265   /* Force release these without holding the lock */
266   while(cq) {
267     prev = cq;
268     cq = cq->next;
269     release_conn_q_forceable(cq, 1);
270   }
271   if(old_cnt != new_cnt)
272     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
273           pool->queue_name);
274 }
275 static void
276 release_conn_q(conn_q *cq) {
277   ttl_purge_conn_pool(cq->pool);
278   release_conn_q_forceable(cq, 0);
279 }
280 static conn_pool *
281 get_conn_pool_for_remote(const char *remote_str,
282                          const char *remote_cn, const char *fqdn) {
283   void *vcpool;
284   conn_pool *cpool = NULL;
285   char queue_name[256] = "datastore_";
286   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
287            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
288            fqdn ? fqdn : "default",
289            remote_cn ? remote_cn : "default");
290   pthread_mutex_lock(&ds_conns_lock);
291   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
292                         strlen(queue_name), &vcpool))
293     cpool = vcpool;
294   pthread_mutex_unlock(&ds_conns_lock);
295   if(!cpool) {
296     vcpool = cpool = calloc(1, sizeof(*cpool));
297     cpool->queue_name = strdup(queue_name);
298     pthread_mutex_init(&cpool->lock, NULL);
299     pthread_cond_init(&cpool->cv, NULL);
300     cpool->in_pool = 0;
301     cpool->outstanding = 0;
302     cpool->max_in_pool = 1;
303     cpool->max_allocated = 1;
304     pthread_mutex_lock(&ds_conns_lock);
305     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
306                         cpool)) {
307       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
308                          strlen(queue_name), &vcpool);
309     }
310     pthread_mutex_unlock(&ds_conns_lock);
311     if(vcpool != cpool) {
312       /* someone beat us to it */
313       free(cpool->queue_name);
314       pthread_mutex_destroy(&cpool->lock);
315       pthread_cond_destroy(&cpool->cv);
316       free(cpool);
317     }
318     else {
319       int i;
320       /* Our job to setup the pool */
321       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
322       eventer_jobq_init(cpool->jobq, queue_name);
323       cpool->jobq->backq = eventer_default_backq();
324       /* Add one thread */
325       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
326         eventer_jobq_increase_concurrency(cpool->jobq);
327     }
328     cpool = vcpool;
329   }
330   return cpool;
331 }
332 static conn_q *
333 get_conn_q_for_remote(const char *remote_str,
334                       const char *remote_cn, const char *fqdn,
335                       const char *dsn) {
336   conn_pool *cpool;
337   conn_q *cq;
338   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
339   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
340         cpool->queue_name);
341   pthread_mutex_lock(&cpool->lock);
342  again:
343   if(cpool->head) {
344     assert(cpool->in_pool > 0);
345     cq = cpool->head;
346     cpool->head = cq->next;
347     cpool->in_pool--;
348     cpool->outstanding++;
349     cq->next = NULL;
350     pthread_mutex_unlock(&cpool->lock);
351     return cq;
352   }
353   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
354     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
355           (void *)pthread_self(), cpool->queue_name);
356     pthread_cond_wait(&cpool->cv, &cpool->lock);
357     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
358           (void *)pthread_self(), cpool->queue_name);
359     goto again;
360   }
361   else {
362     cpool->outstanding++;
363     pthread_mutex_unlock(&cpool->lock);
364   }
365  
366   cq = calloc(1, sizeof(*cq));
367   cq->pool = cpool;
368   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
369   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
370   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
371   cq->dsn = dsn ? strdup(dsn) : NULL;
372   return cq;
373 }
374 static conn_q *
375 get_conn_q_for_metanode() {
376   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
377 }
378
379 typedef enum {
380   DS_EXEC_SUCCESS = 0,
381   DS_EXEC_ROW_FAILED = 1,
382   DS_EXEC_TXN_FAILED = 2,
383 } execute_outcome_t;
384
385 #define DECLARE_PARAM_STR(str, len) do { \
386   d->paramValues[d->nparams] = noit__strndup(str, len); \
387   d->paramLengths[d->nparams] = len; \
388   d->paramFormats[d->nparams] = 0; \
389   d->paramAllocd[d->nparams] = 1; \
390   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
391     free(d->paramValues[d->nparams]); \
392     d->paramValues[d->nparams] = NULL; \
393     d->paramLengths[d->nparams] = 0; \
394     d->paramAllocd[d->nparams] = 0; \
395   } \
396   d->nparams++; \
397 } while(0)
398 #define DECLARE_PARAM_INT(i) do { \
399   int buffer__len; \
400   char buffer__[32]; \
401   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
402   buffer__len = strlen(buffer__); \
403   DECLARE_PARAM_STR(buffer__, buffer__len); \
404 } while(0)
405
406 #define PG_GET_STR_COL(dest, row, name) do { \
407   int colnum = PQfnumber(d->res, name); \
408   dest = NULL; \
409   if (colnum >= 0) \
410     dest = PQgetisnull(d->res, row, colnum) \
411          ? NULL : PQgetvalue(d->res, row, colnum); \
412 } while(0)
413
414 #define PG_EXEC(cmd) do { \
415   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
416                         (const char * const *)d->paramValues, \
417                         d->paramLengths, d->paramFormats, 0); \
418   d->rv = PQresultStatus(d->res); \
419   if(d->rv != PGRES_COMMAND_OK && \
420      d->rv != PGRES_TUPLES_OK) { \
421     const char *pgerr = PQresultErrorMessage(d->res); \
422     const char *pgerr_end = strchr(pgerr, '\n'); \
423     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
424     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
425           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
426           (int)(pgerr_end - pgerr), pgerr); \
427     PQclear(d->res); \
428     goto bad_row; \
429   } \
430 } while(0)
431
432 #define PG_TM_EXEC(cmd, whence) do { \
433   time_t __w = whence; \
434   char cmdbuf[4096]; \
435   struct tm tbuf, *tm; \
436   tm = gmtime_r(&__w, &tbuf); \
437   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
438   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
439                         (const char * const *)d->paramValues, \
440                         d->paramLengths, d->paramFormats, 0); \
441   d->rv = PQresultStatus(d->res); \
442   if(d->rv != PGRES_COMMAND_OK && \
443      d->rv != PGRES_TUPLES_OK) { \
444     const char *pgerr = PQresultErrorMessage(d->res); \
445     const char *pgerr_end = strchr(pgerr, '\n'); \
446     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
447     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
448           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
449           (long long unsigned)whence); \
450     PQclear(d->res); \
451     goto bad_row; \
452   } \
453 } while(0)
454
455 static void *
456 stratcon_datastore_check_loadall(void *vsn) {
457   storagenode_info *sn = vsn;
458   ds_single_detail *d;
459   int i, row_count = 0, good = 0;
460   char buff[1024];
461   conn_q *cq = NULL;
462
463   d = calloc(1, sizeof(*d));
464   GET_QUERY(check_loadall);
465   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
466   i = 0;
467   while(stratcon_database_connect(cq)) {
468     if(i++ > 4) {
469       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
470       release_conn_q(cq);
471       return (void *)(vpsized_int)good;
472     }
473     sleep(1);
474   }
475   PG_EXEC(check_loadall);
476   row_count = PQntuples(d->res);
477  
478   for(i=0; i<row_count; i++) {
479     int rv;
480     int8_t family;
481     struct sockaddr *sin;
482     struct sockaddr_in sin4 = { .sin_family = AF_INET };
483     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
484     char *remote, *id, *target, *module, *name;
485     PG_GET_STR_COL(remote, i, "remote_address");
486     PG_GET_STR_COL(id, i, "id");
487     PG_GET_STR_COL(target, i, "target");
488     PG_GET_STR_COL(module, i, "module");
489     PG_GET_STR_COL(name, i, "name");
490     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
491
492     family = AF_INET;
493     sin = (struct sockaddr *)&sin4;
494     rv = inet_pton(family, remote, &sin4.sin_addr);
495     if(rv != 1) {
496       family = AF_INET6;
497       sin = (struct sockaddr *)&sin6;
498       rv = inet_pton(family, remote, &sin6.sin6_addr);
499       if(rv != 1) {
500         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
501         sin = NULL;
502       }
503     }
504
505     /* stratcon_iep_line_processor takes an allocated operand and frees it */
506     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
507     good++;
508   }
509   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
510         good, row_count, sn->fqdn);
511  bad_row:
512   free_params((ds_single_detail *)d);
513   free(d);
514   if(cq) release_conn_q(cq);
515   return (void *)(vpsized_int)good;
516 }
517 static int
518 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
519                                     struct timeval *now) {
520   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
521   pthread_t *jobs = NULL;
522   int nodes, i = 0, tcnt = 0;
523   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
524   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
525
526   pthread_mutex_lock(&storagenode_to_info_cache_lock);
527   nodes = storagenode_to_info_cache.size;
528   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
529   sns = calloc(MAX(1,nodes), sizeof(*sns));
530   if(nodes == 0) sns[nodes++] = &self;
531   else {
532     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
533     const char *k;
534     void *v;
535     int klen;
536     while(noit_hash_next(&storagenode_to_info_cache,
537                          &iter, &k, &klen, &v)) {
538       sns[i++] = (storagenode_info *)v;
539     }
540   }
541   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
542
543   for(i=0; i<nodes; i++) {
544     if(pthread_create(&jobs[i], NULL,
545                       stratcon_datastore_check_loadall, sns[i]) != 0) {
546       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
547     }
548   }
549   for(i=0; i<nodes; i++) {
550     void *good;
551     pthread_join(jobs[i], &good);
552     tcnt += (int)(vpsized_int)good;
553   }
554   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
555   return 0;
556 }
557 void
558 stratcon_datastore_iep_check_preload() {
559   eventer_t e;
560   conn_pool *cpool;
561
562   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
563   e = eventer_alloc();
564   e->mask = EVENTER_ASYNCH;
565   e->callback = stratcon_datastore_asynch_drive_iep;
566   e->closure = NULL;
567   eventer_add_asynch(cpool->jobq, e);
568 }
569 execute_outcome_t
570 stratcon_datastore_find(ds_rt_detail *d) {
571   conn_q *cq;
572   char *val;
573   int row_count;
574   struct realtime_tracker *node;
575
576   for(node = d->rt; node; node = node->next) {
577     char uuid_str[UUID_STR_LEN+1];
578     const char *fqdn, *dsn, *remote_cn;
579     char remote_ip[32];
580     int storagenode_id;
581
582     uuid_unparse_lower(node->checkid, uuid_str);
583     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
584                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
585       continue;
586
587     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n",
588           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
589
590     /* We might be able to find the IP from our config if someone has
591      * specified the expected cn in the noit definition.
592      */
593     if(stratcon_find_noit_ip_by_cn(remote_cn,
594                                    remote_ip, sizeof(remote_ip)) == 0) {
595       node->noit = strdup(remote_ip);
596       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
597       continue;
598     }
599
600     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
601     stratcon_database_connect(cq);
602
603     GET_QUERY(check_find);
604     DECLARE_PARAM_INT(node->sid);
605     PG_EXEC(check_find);
606     row_count = PQntuples(d->res);
607     if(row_count != 1) {
608       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
609       PQclear(d->res);
610       goto bad_row;
611     }
612
613     /* Get the remote_address (which noit owns this) */
614     PG_GET_STR_COL(val, 0, "remote_address");
615     if(!val) {
616       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
617       PQclear(d->res);
618       goto bad_row;
619     }
620     node->noit = strdup(val);
621     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
622    bad_row:
623     free_params((ds_single_detail *)d);
624     d->nparams = 0;
625     release_conn_q(cq);
626   }
627   return DS_EXEC_SUCCESS;
628 }
629 execute_outcome_t
630 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
631                            ds_line_detail *d) {
632   int type, len, sid;
633   char *final_buff;
634   uLong final_len, actual_final_len;
635   char *token;
636   char raddr_blank[1] = "";
637   const char *raddr;
638
639   type = d->data[0];
640   raddr = r ? r : raddr_blank;
641
642   /* Parse the log line, but only if we haven't already */
643   if(!d->nparams) {
644     char *scp, *ecp;
645
646     scp = d->data;
647 #define PROCESS_NEXT_FIELD(t,l) do { \
648   if(!*scp) goto bad_row; \
649   ecp = strchr(scp, '\t'); \
650   if(!ecp) goto bad_row; \
651   token = scp; \
652   len = (ecp-scp); \
653   scp = ecp + 1; \
654 } while(0)
655 #define PROCESS_LAST_FIELD(t,l) do { \
656   if(!*scp) ecp = scp; \
657   else { \
658     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
659     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
660   } \
661   t = scp; \
662   l = (ecp-scp); \
663 } while(0)
664
665     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
666     switch(type) {
667       /* See noit_check_log.c for log description */
668       case 'n':
669         DECLARE_PARAM_STR(raddr, strlen(raddr));
670         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
671         DECLARE_PARAM_STR("noitd",5); /* node_type */
672         PROCESS_NEXT_FIELD(token,len);
673         d->whence = (time_t)strtoul(token, NULL, 10);
674         DECLARE_PARAM_STR(token,len); /* timestamp */
675
676         /* This is the expected uncompressed len */
677         PROCESS_NEXT_FIELD(token,len);
678         final_len = atoi(token);
679         final_buff = malloc(final_len);
680         if(!final_buff) goto bad_row;
681  
682         /* The last token is b64 endoded and compressed.
683          * we need to decode it, declare it and then free it.
684          */
685         PROCESS_LAST_FIELD(token, len);
686         /* We can in-place decode this */
687         len = noit_b64_decode((char *)token, len,
688                               (unsigned char *)token, len);
689         if(len <= 0) {
690           noitL(noit_error, "noitd config base64 decoding error.\n");
691           free(final_buff);
692           goto bad_row;
693         }
694         actual_final_len = final_len;
695         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
696                               (unsigned char *)token, len)) {
697           noitL(noit_error, "noitd config decompression failure.\n");
698           free(final_buff);
699           goto bad_row;
700         }
701         if(final_len != actual_final_len) {
702           noitL(noit_error, "noitd config decompression error.\n");
703           free(final_buff);
704           goto bad_row;
705         }
706         DECLARE_PARAM_STR(final_buff, final_len);
707         free(final_buff);
708         break;
709       case 'C':
710         DECLARE_PARAM_STR(raddr, strlen(raddr));
711         PROCESS_NEXT_FIELD(token,len);
712         DECLARE_PARAM_STR(token,len); /* timestamp */
713         d->whence = (time_t)strtoul(token, NULL, 10);
714         PROCESS_NEXT_FIELD(token, len);
715         sid = uuid_to_sid(token, remote_cn);
716         if(sid == 0) goto bad_row;
717         DECLARE_PARAM_INT(sid); /* sid */
718         DECLARE_PARAM_STR(token,len); /* uuid */
719         PROCESS_NEXT_FIELD(token, len);
720         DECLARE_PARAM_STR(token,len); /* target */
721         PROCESS_NEXT_FIELD(token, len);
722         DECLARE_PARAM_STR(token,len); /* module */
723         PROCESS_LAST_FIELD(token, len);
724         DECLARE_PARAM_STR(token,len); /* name */
725         break;
726       case 'M':
727         PROCESS_NEXT_FIELD(token,len);
728         DECLARE_PARAM_STR(token,len); /* timestamp */
729         d->whence = (time_t)strtoul(token, NULL, 10);
730         PROCESS_NEXT_FIELD(token, len);
731         sid = uuid_to_sid(token, remote_cn);
732         if(sid == 0) goto bad_row;
733         DECLARE_PARAM_INT(sid); /* sid */
734         PROCESS_NEXT_FIELD(token, len);
735         DECLARE_PARAM_STR(token,len); /* name */
736         PROCESS_NEXT_FIELD(token,len);
737         d->metric_type = *token;
738         PROCESS_LAST_FIELD(token,len);
739         DECLARE_PARAM_STR(token,len); /* value */
740         break;
741       case 'S':
742         PROCESS_NEXT_FIELD(token,len);
743         DECLARE_PARAM_STR(token,len); /* timestamp */
744         d->whence = (time_t)strtoul(token, NULL, 10);
745         PROCESS_NEXT_FIELD(token, len);
746         sid = uuid_to_sid(token, remote_cn);
747         if(sid == 0) goto bad_row;
748         DECLARE_PARAM_INT(sid); /* sid */
749         PROCESS_NEXT_FIELD(token, len);
750         DECLARE_PARAM_STR(token,len); /* state */
751         PROCESS_NEXT_FIELD(token, len);
752         DECLARE_PARAM_STR(token,len); /* availability */
753         PROCESS_NEXT_FIELD(token, len);
754         DECLARE_PARAM_STR(token,len); /* duration */
755         PROCESS_LAST_FIELD(token,len);
756         DECLARE_PARAM_STR(token,len); /* status */
757         break;
758       default:
759         goto bad_row;
760     }
761
762   }
763
764   /* Now execute the query */
765   switch(type) {
766     case 'n':
767       GET_QUERY(config_insert);
768       PG_EXEC(config_insert);
769       PQclear(d->res);
770       break;
771     case 'C':
772       GET_QUERY(check_insert);
773       PG_TM_EXEC(check_insert, d->whence);
774       PQclear(d->res);
775       break;
776     case 'S':
777       GET_QUERY(status_insert);
778       PG_TM_EXEC(status_insert, d->whence);
779       PQclear(d->res);
780       break;
781     case 'M':
782       switch(d->metric_type) {
783         case METRIC_INT32:
784         case METRIC_UINT32:
785         case METRIC_INT64:
786         case METRIC_UINT64:
787         case METRIC_DOUBLE:
788           GET_QUERY(metric_insert_numeric);
789           PG_TM_EXEC(metric_insert_numeric, d->whence);
790           PQclear(d->res);
791           break;
792         case METRIC_STRING:
793           GET_QUERY(metric_insert_text);
794           PG_TM_EXEC(metric_insert_text, d->whence);
795           PQclear(d->res);
796           break;
797         default:
798           goto bad_row;
799       }
800       break;
801     default:
802       /* should never get here */
803       goto bad_row;
804   }
805   return DS_EXEC_SUCCESS;
806  bad_row:
807   return DS_EXEC_ROW_FAILED;
808 }
809 static int
810 stratcon_database_post_connect(conn_q *cq) {
811   int rv = 0;
812   ds_single_detail _d = { 0 }, *d = &_d;
813   if(cq->fqdn) {
814     char *remote_str, *remote_cn;
815     /* This is the silly way we get null's in through our declare_param_str */
816     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
817     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
818     /* This is a storage node, it gets the storage node post_connect */
819     GET_QUERY(storage_post_connect);
820     rv = -1; /* now we're serious */
821     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
822     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
823     PG_EXEC(storage_post_connect);
824     PQclear(d->res);
825     rv = 0;
826   }
827   else {
828     /* Metanode post_connect */
829     GET_QUERY(metanode_post_connect);
830     rv = -1; /* now we're serious */
831     PG_EXEC(metanode_post_connect);
832     PQclear(d->res);
833     rv = 0;
834   }
835  bad_row:
836   free_params(d);
837   if(rv == -1) {
838     /* Post-connect intentions are serious and fatal */
839     PQfinish(cq->dbh);
840     cq->dbh = NULL;
841   }
842   return rv;
843 }
844 static int
845 stratcon_database_connect(conn_q *cq) {
846   char *dsn, dsn_meta[512];
847   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
848   const char *k, *v;
849   int klen;
850   noit_hash_table *t;
851
852   dsn_meta[0] = '\0';
853   if(!cq->dsn) {
854     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
855     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
856       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
857       strlcat(dsn_meta, k, sizeof(dsn_meta));
858       strlcat(dsn_meta, "=", sizeof(dsn_meta));
859       strlcat(dsn_meta, v, sizeof(dsn_meta));
860     }
861     noit_hash_destroy(t, free, free);
862     free(t);
863     dsn = dsn_meta;
864   }
865   else {
866     char options[32];
867     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
868     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
869                                options, sizeof(options))) {
870       strlcat(dsn_meta, " ", sizeof(dsn_meta));
871       strlcat(dsn_meta, "user", sizeof(dsn_meta));
872       strlcat(dsn_meta, "=", sizeof(dsn_meta));
873       strlcat(dsn_meta, options, sizeof(dsn_meta));
874     }
875     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
876                                options, sizeof(options))) {
877       strlcat(dsn_meta, " ", sizeof(dsn_meta));
878       strlcat(dsn_meta, "password", sizeof(dsn_meta));
879       strlcat(dsn_meta, "=", sizeof(dsn_meta));
880       strlcat(dsn_meta, options, sizeof(dsn_meta));
881     }
882     dsn = dsn_meta;
883   }
884
885   if(cq->dbh) {
886     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
887     PQreset(cq->dbh);
888     if(PQstatus(cq->dbh) != CONNECTION_OK) {
889       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
890             dsn, PQerrorMessage(cq->dbh));
891       return -1;
892     }
893     if(stratcon_database_post_connect(cq)) return -1;
894     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
895     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
896           dsn, PQerrorMessage(cq->dbh));
897     return -1;
898   }
899
900   cq->dbh = PQconnectdb(dsn);
901   if(!cq->dbh) return -1;
902   if(PQstatus(cq->dbh) != CONNECTION_OK) {
903     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
904           dsn, PQerrorMessage(cq->dbh));
905     return -1;
906   }
907   if(stratcon_database_post_connect(cq)) return -1;
908   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
909   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
910         dsn, PQerrorMessage(cq->dbh));
911   return -1;
912 }
913 static int
914 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
915                                 const char *name) {
916   int rv = -1;
917   PGresult *res;
918   char cmd[128];
919   strlcpy(cmd, p, sizeof(cmd));
920   strlcat(cmd, name, sizeof(cmd));
921   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
922   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
923   PQclear(res);
924   return rv;
925 }
926 static int
927 stratcon_datastore_do(conn_q *cq, const char *cmd) {
928   PGresult *res;
929   int rv = -1;
930   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
931   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
932   PQclear(res);
933   return rv;
934 }
935 #define BUSTED(cq) do { \
936   PQfinish((cq)->dbh); \
937   (cq)->dbh = NULL; \
938   goto full_monty; \
939 } while(0)
940 #define SAVEPOINT(name) do { \
941   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
942   last_sp = current; \
943 } while(0)
944 #define ROLLBACK_TO_SAVEPOINT(name) do { \
945   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
946     BUSTED(cq); \
947   last_sp = NULL; \
948 } while(0)
949 #define RELEASE_SAVEPOINT(name) do { \
950   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
951     BUSTED(cq); \
952   last_sp = NULL; \
953 } while(0)
954 int
955 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
956                                  struct timeval *now) {
957   ds_rt_detail *dsjd = closure;
958   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
959   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
960
961   assert(dsjd->rt);
962   stratcon_datastore_find(dsjd);
963   if(dsjd->completion_event)
964     eventer_add(dsjd->completion_event);
965
966   free_params((ds_single_detail *)dsjd);
967   free(dsjd);
968   return 0;
969 }
970 static const char *
971 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
972   void *vinfo;
973   char *dsn = NULL, *fqdn = NULL;
974   int found = 0;
975   storagenode_info *info = NULL;
976   pthread_mutex_lock(&storagenode_to_info_cache_lock);
977   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
978                         &vinfo)) {
979     found = 1;
980     info = vinfo;
981   }
982   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
983   if(found) {
984     if(fqdn_out) *fqdn_out = info->fqdn;
985     return info->dsn;
986   }
987
988   if(!found && can_use_db) {
989     ds_single_detail *d;
990     conn_q *cq;
991     int row_count;
992     /* Look it up and store it */
993     d = calloc(1, sizeof(*d));
994     cq = get_conn_q_for_metanode();
995     GET_QUERY(find_storage);
996     DECLARE_PARAM_INT(id);
997     PG_EXEC(find_storage);
998     row_count = PQntuples(d->res);
999     if(row_count) {
1000       PG_GET_STR_COL(dsn, 0, "dsn");
1001       PG_GET_STR_COL(fqdn, 0, "fqdn");
1002       fqdn = fqdn ? strdup(fqdn) : NULL;
1003       dsn = dsn ? strdup(dsn) : NULL;
1004     }
1005     PQclear(d->res);
1006    bad_row:
1007     free_params(d);
1008     free(d);
1009     release_conn_q(cq);
1010   }
1011   if(fqdn) {
1012     info = calloc(1, sizeof(*info));
1013     info->fqdn = fqdn;
1014     if(fqdn_out) *fqdn_out = info->fqdn;
1015     info->dsn = dsn;
1016     info->storagenode_id = id;
1017     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1018     noit_hash_store(&storagenode_to_info_cache,
1019                     (void *)&info->storagenode_id, sizeof(int), info);
1020     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1021   }
1022   return info ? info->dsn : NULL;
1023 }
1024 static ds_line_detail *
1025 build_insert_batch(interim_journal_t *ij) {
1026   int rv;
1027   off_t len;
1028   const char *buff, *cp, *lcp;
1029   struct stat st;
1030   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1031
1032   if(ij->fd < 0) {
1033     ij->fd = open(ij->filename, O_RDONLY);
1034     if(ij->fd < 0) {
1035       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1036             ij->filename, strerror(errno));
1037       assert(ij->fd >= 0);
1038     }
1039   }
1040   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1041   if(rv == -1) {
1042       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1043             ij->filename, strerror(errno));
1044     assert(rv != -1);
1045   }
1046   len = st.st_size;
1047   if(len > 0) {
1048     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1049     if(buff == (void *)-1) {
1050       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1051             ij->filename, strerror(errno));
1052       assert(buff != (void *)-1);
1053     }
1054     lcp = buff;
1055     while(lcp < (buff + len) &&
1056           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1057       next = calloc(1, sizeof(*next));
1058       next->data = malloc(cp - lcp + 1);
1059       memcpy(next->data, lcp, cp - lcp);
1060       next->data[cp - lcp] = '\0';
1061       if(!head) head = next;
1062       if(last) last->next = next;
1063       last = next;
1064       lcp = cp + 1;
1065     }
1066     munmap((void *)buff, len);
1067   }
1068   close(ij->fd);
1069   return head;
1070 }
1071 static void
1072 interim_journal_remove(interim_journal_t *ij) {
1073   unlink(ij->filename);
1074   if(ij->filename) free(ij->filename);
1075   if(ij->remote_str) free(ij->remote_str);
1076   if(ij->remote_cn) free(ij->remote_cn);
1077   if(ij->fqdn) free(ij->fqdn);
1078 }
1079 int
1080 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1081                                   struct timeval *now) {
1082   int i, total, success, sp_total, sp_success;
1083   interim_journal_t *ij;
1084   ds_line_detail *head = NULL, *current, *last_sp;
1085   const char *dsn;
1086   conn_q *cq;
1087   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1088   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1089
1090   ij = closure;
1091   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1092   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1093   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1094                              ij->fqdn, dsn);
1095   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1096         ij->remote_str, ij->remote_cn, ij->fqdn);
1097  full_monty:
1098   /* Make sure we have a connection */
1099   i = 1;
1100   while(stratcon_database_connect(cq)) {
1101     noitL(noit_error, "Error connecting to database: %s\n",
1102           ij->fqdn ? ij->fqdn : "(null)");
1103     sleep(i);
1104     i *= 2;
1105     i = MIN(i, 16);
1106   }
1107
1108   if(head == NULL) head = build_insert_batch(ij);
1109   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1110         ij->remote_str ? ij->remote_str : "(null)",
1111         ij->remote_cn ? ij->remote_cn : "(null)",
1112         ij->fqdn ? ij->fqdn : "(null)");
1113   current = head;
1114   last_sp = NULL;
1115   total = success = sp_total = sp_success = 0;
1116   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1117   while(current) {
1118     execute_outcome_t rv;
1119     if(current->data) {
1120       if(!last_sp) {
1121         SAVEPOINT("batch");
1122         sp_success = success;
1123         sp_total = total;
1124       }
1125  
1126       if(current->problematic) {
1127         RELEASE_SAVEPOINT("batch");
1128         current = current->next;
1129         total++;
1130         continue;
1131       }
1132       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1133                                       current);
1134       switch(rv) {
1135         case DS_EXEC_SUCCESS:
1136           total++;
1137           success++;
1138           current = current->next;
1139           break;
1140         case DS_EXEC_ROW_FAILED:
1141           /* rollback to savepoint, mark this record as bad and start again */
1142           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1143           current->problematic = 1;
1144           current = last_sp;
1145           success = sp_success;
1146           total = sp_total;
1147           ROLLBACK_TO_SAVEPOINT("batch");
1148           break;
1149         case DS_EXEC_TXN_FAILED:
1150           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1151           BUSTED(cq);
1152       }
1153     }
1154   }
1155   if(last_sp) RELEASE_SAVEPOINT("batch");
1156   if(stratcon_datastore_do(cq, "COMMIT")) {
1157     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1158     BUSTED(cq);
1159   }
1160   /* Cleanup the mess */
1161   while(head) {
1162     ds_line_detail *tofree;
1163     tofree = head;
1164     head = head->next;
1165     if(tofree->data) free(tofree->data);
1166     free_params((ds_single_detail *)tofree);
1167     free(tofree);
1168   }
1169   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1170         ij->remote_str ? ij->remote_str : "(null)",
1171         ij->remote_cn ? ij->remote_cn : "(null)",
1172         ij->fqdn ? ij->fqdn : "(null)", success, total);
1173   interim_journal_remove(ij);
1174   release_conn_q(cq);
1175   return 0;
1176 }
1177 static int
1178 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1179                                 struct timeval *now) {
1180   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1181   const char *k;
1182   int klen;
1183   void *vij;
1184   interim_journal_t *ij;
1185   syncset_t *syncset = closure;
1186
1187   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1188     eventer_add(syncset->completion);
1189     eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1190     free(syncset);
1191     return 0;
1192   }
1193   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1194
1195   noitL(ds_deb, "Syncing journal sets...\n");
1196   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1197     eventer_t ingest;
1198     ij = vij;
1199     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1200           ij->remote_str, ij->remote_cn, ij->fqdn);
1201     fsync(ij->fd);
1202     close(ij->fd);
1203     ij->fd = -1;
1204     ingest = eventer_alloc();
1205     ingest->mask = EVENTER_ASYNCH;
1206     ingest->callback = stratcon_datastore_asynch_execute;
1207     ingest->closure = ij;
1208     eventer_add_asynch(ij->cpool->jobq, ingest);
1209   }
1210   noit_hash_destroy(syncset->ws, free, NULL);
1211   free(syncset->ws);
1212   return 0;
1213 }
1214 static interim_journal_t *
1215 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1216                     int storagenode_id, const char *fqdn_in) {
1217   void *vhash, *vij;
1218   noit_hash_table *working_set;
1219   interim_journal_t *ij;
1220   struct timeval now;
1221   char jpath[PATH_MAX];
1222   char remote_str[128];
1223   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1224   const char *fqdn = fqdn_in ? fqdn_in : "default";
1225
1226   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1227   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1228
1229   /* Lookup the working set */
1230   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1231     working_set = calloc(1, sizeof(*working_set));
1232     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1233                     working_set);
1234   }
1235   else
1236     working_set = vhash;
1237
1238   /* Lookup the interim journal within the working set */
1239   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1240     ij = calloc(1, sizeof(*ij));
1241     gettimeofday(&now, NULL);
1242     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1243              basejpath, remote_str, remote_cn, storagenode_id,
1244              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1245     ij->remote_str = strdup(remote_str);
1246     ij->remote_cn = strdup(remote_cn);
1247     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1248     ij->storagenode_id = storagenode_id;
1249     ij->filename = strdup(jpath);
1250     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1251                                          ij->fqdn);
1252     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1253     if(ij->fd < 0 && errno == ENOENT) {
1254       if(mkdir_for_file(ij->filename, 0750)) {
1255         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1256               ij->filename, strerror(errno));
1257         exit(-1);
1258       }
1259       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1260     }
1261     if(ij->fd < 0) {
1262       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1263             ij->filename, strerror(errno));
1264       exit(-1);
1265     }
1266     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1267   }
1268   else
1269     ij = vij;
1270
1271   return ij;
1272 }
1273 static int
1274 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1275                           int *sid_out, int *storagenode_id_out,
1276                           const char **remote_cn_out,
1277                           const char **fqdn_out, const char **dsn_out) {
1278   /* only called from the main thread -- no safety issues */
1279   void *vuuidinfo, *vinfo;
1280   uuid_info *uuidinfo;
1281   storagenode_info *info = NULL;
1282   char *fqdn = NULL;
1283   char *dsn = NULL;
1284   char *new_remote_cn = NULL;
1285   int storagenode_id = 0, sid = 0;
1286   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1287                          &vuuidinfo)) {
1288     int row_count = 0;
1289     char *tmpint;
1290     ds_single_detail *d;
1291     conn_q *cq;
1292
1293     /* We can't do a database lookup without the remote_cn */
1294     if(!remote_cn) {
1295       if(stratcon_datastore_get_enabled()) {
1296         /* We have an authoritatively maintained cache, we don't do lookups */
1297         return -1;
1298       }
1299       else
1300         remote_cn = "[[null]]";
1301     }
1302
1303     d = calloc(1, sizeof(*d));
1304     cq = get_conn_q_for_metanode();
1305     if(stratcon_database_connect(cq) == 0) {
1306       /* Blocking call to service the cache miss */
1307       GET_QUERY(check_map);
1308       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1309       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1310       PG_EXEC(check_map);
1311       row_count = PQntuples(d->res);
1312       if(row_count != 1) {
1313         PQclear(d->res);
1314         goto bad_row;
1315       }
1316       PG_GET_STR_COL(tmpint, 0, "sid");
1317       if(!tmpint) {
1318         row_count = 0;
1319         PQclear(d->res);
1320         goto bad_row;
1321       }
1322       sid = atoi(tmpint);
1323       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1324       if(tmpint) storagenode_id = atoi(tmpint);
1325       PG_GET_STR_COL(fqdn, 0, "fqdn");
1326       PG_GET_STR_COL(dsn, 0, "dsn");
1327       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1328       fqdn = fqdn ? strdup(fqdn) : NULL;
1329       dsn = dsn ? strdup(dsn) : NULL;
1330       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1331       PQclear(d->res);
1332     }
1333    bad_row:
1334     free_params((ds_single_detail *)d);
1335     free(d);
1336     release_conn_q(cq);
1337     if(row_count != 1) {
1338       return -1;
1339     }
1340     /* Place in cache */
1341     uuidinfo = calloc(1, sizeof(*uuidinfo));
1342     uuidinfo->sid = sid;
1343     uuidinfo->uuid_str = strdup(uuid_str);
1344     uuidinfo->storagenode_id = storagenode_id;
1345     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1346     noit_hash_store(&uuid_to_info_cache,
1347                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1348     /* Also, we may have just witnessed a new storage node, store it */
1349     if(storagenode_id) {
1350       int needs_free = 0;
1351       info = calloc(1, sizeof(*info));
1352       info->storagenode_id = storagenode_id;
1353       info->dsn = dsn ? strdup(dsn) : NULL;
1354       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1355       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1356       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1357                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1358         /* hack to save memory -- we *never* remove from these caches,
1359            so we can use the same fqdn value in the above cache for the key
1360            in the cache below -- (no strdup) */
1361         noit_hash_store(&storagenode_to_info_cache,
1362                         (void *)&info->storagenode_id, sizeof(int), info);
1363       }
1364       else needs_free = 1;
1365       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1366       if(needs_free) {
1367         if(info->dsn) free(info->dsn);
1368         if(info->fqdn) free(info->fqdn);
1369         free(info);
1370       }
1371     }
1372   }
1373   else
1374     uuidinfo = vuuidinfo;
1375
1376   if(uuidinfo && uuidinfo->storagenode_id) {
1377     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1378       /* we don't have dsn and we actually want it */
1379       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1380       if(noit_hash_retrieve(&storagenode_to_info_cache,
1381                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1382                             &vinfo))
1383         info = vinfo;
1384       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1385     }
1386   }
1387
1388   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1389   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1390   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1391   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1392   if(sid_out) *sid_out = uuidinfo->sid;
1393   if(fqdn) free(fqdn);
1394   if(dsn) free(dsn);
1395   if(new_remote_cn) free(new_remote_cn);
1396   return 0;
1397 }
1398 static int
1399 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1400   char uuid_str[UUID_STR_LEN+1];
1401   int sid = 0;
1402   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1403   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1404   return sid;
1405 }
1406 static void
1407 stratcon_datastore_journal(struct sockaddr *remote,
1408                            const char *remote_cn, const char *line) {
1409   interim_journal_t *ij = NULL;
1410   char uuid_str[UUID_STR_LEN+1], *cp;
1411   const char *fqdn = NULL, *dsn = NULL;
1412   int storagenode_id = 0;
1413   uuid_t checkid;
1414   if(!line) return;
1415   /* if it is a UUID based thing, find the storage node */
1416   switch(*line) {
1417     case 'C':
1418     case 'S':
1419     case 'M':
1420       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1421         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1422         if(!uuid_parse(uuid_str, checkid)) {
1423           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1424                                     &storagenode_id, NULL, &fqdn, &dsn);
1425           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1426         }
1427       }
1428       break;
1429     case 'n':
1430       ij = interim_journal_get(remote,remote_cn,0,NULL);
1431       break;
1432     default:
1433       break;
1434   }
1435   if(!ij) {
1436     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1437   }
1438   else {
1439     int len;
1440     len = write(ij->fd, line, strlen(line));
1441     if(len < 0) {
1442       noitL(noit_error, "write to %s failed: %s\n",
1443             ij->filename, strerror(errno));
1444     }
1445   }
1446   return;
1447 }
1448 static noit_hash_table *
1449 stratcon_datastore_journal_remove(struct sockaddr *remote,
1450                                   const char *remote_cn) {
1451   void *vhash = NULL;
1452   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1453     /* pluck it out */
1454     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1455   }
1456   else {
1457     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1458           remote_cn);
1459     abort();
1460   }
1461   return vhash;
1462 }
1463 void
1464 stratcon_datastore_push(stratcon_datastore_op_t op,
1465                         struct sockaddr *remote,
1466                         const char *remote_cn, void *operand,
1467                         eventer_t completion) {
1468   conn_pool *cpool;
1469   syncset_t *syncset;
1470   eventer_t e;
1471   ds_rt_detail *rtdetail;
1472   struct datastore_onlooker_list *nnode;
1473
1474   for(nnode = onlookers; nnode; nnode = nnode->next)
1475     nnode->dispatch(op,remote,remote_cn,operand);
1476
1477   switch(op) {
1478     case DS_OP_INSERT:
1479       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1480       break;
1481     case DS_OP_CHKPT:
1482       e = eventer_alloc();
1483       syncset = calloc(1, sizeof(*syncset));
1484       e->mask = EVENTER_ASYNCH;
1485       e->callback = stratcon_datastore_journal_sync;
1486       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1487       syncset->completion = completion;
1488       e->closure = syncset;
1489       eventer_add(e);
1490       break;
1491     case DS_OP_FIND_COMPLETE:
1492       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1493       rtdetail = calloc(1, sizeof(*rtdetail));
1494       rtdetail->rt = operand;
1495       rtdetail->completion_event = completion;
1496       e = eventer_alloc();
1497       e->mask = EVENTER_ASYNCH;
1498       e->callback = stratcon_datastore_asynch_lookup;
1499       e->closure = rtdetail;
1500       eventer_add_asynch(cpool->jobq, e);
1501       break;
1502   }
1503 }
1504
1505 int
1506 stratcon_datastore_saveconfig(void *unused) {
1507   int rv = -1;
1508   char *buff;
1509   ds_single_detail _d = { 0 }, *d = &_d;
1510   conn_q *cq;
1511   char ipv4_str[32];
1512   struct in_addr r, l;
1513
1514   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1515   memset(&l, 0, sizeof(l));
1516   noit_getip_ipv4(r, &l);
1517   /* Ignore the error.. what are we going to do anyway */
1518   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1519     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1520
1521   cq = get_conn_q_for_metanode();
1522
1523   if(stratcon_database_connect(cq) == 0) {
1524     char time_as_str[20];
1525     size_t len;
1526     buff = noit_conf_xml_in_mem(&len);
1527     if(!buff) goto bad_row;
1528
1529     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1530     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1531     DECLARE_PARAM_STR("", 0);
1532     DECLARE_PARAM_STR("stratcond", 9);
1533     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1534     DECLARE_PARAM_STR(buff, len);
1535     free(buff);
1536
1537     GET_QUERY(config_insert);
1538     PG_EXEC(config_insert);
1539     PQclear(d->res);
1540     rv = 0;
1541
1542     bad_row:
1543       free_params(d);
1544   }
1545   release_conn_q(cq);
1546   return rv;
1547 }
1548
1549 void
1550 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1551                                                struct sockaddr *,
1552                                                const char *, void *)) {
1553   struct datastore_onlooker_list *nnode;
1554   nnode = calloc(1, sizeof(*nnode));
1555   nnode->dispatch = f;
1556   nnode->next = onlookers;
1557   while(noit_atomic_casptr((volatile void **)&onlookers,
1558                            nnode, nnode->next) != (void *)nnode->next)
1559     nnode->next = onlookers;
1560 }
1561 static void
1562 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1563                                          char *id_str, char *file) {
1564   char path[PATH_MAX];
1565   interim_journal_t *ij;
1566   eventer_t ingest;
1567
1568   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1569            basejpath, remote_str, remote_cn, id_str, file);
1570   ij = calloc(1, sizeof(*ij));
1571   ij->fd = open(path, O_RDONLY);
1572   if(ij->fd < 0) {
1573     noitL(noit_error, "cannot open journal '%s': %s\n",
1574           path, strerror(errno));
1575     free(ij);
1576     return;
1577   }
1578   close(ij->fd);
1579   ij->fd = -1;
1580   ij->filename = strdup(path);
1581   ij->remote_str = strdup(remote_str);
1582   ij->remote_cn = strdup(remote_cn);
1583   ij->storagenode_id = atoi(id_str);
1584   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1585                                        ij->fqdn);
1586   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1587   ingest = eventer_alloc();
1588   ingest->mask = EVENTER_ASYNCH;
1589   ingest->callback = stratcon_datastore_asynch_execute;
1590   ingest->closure = ij;
1591   eventer_add_asynch(ij->cpool->jobq, ingest);
1592 }
1593 static void
1594 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1595   char path[PATH_MAX];
1596   DIR *root;
1597   struct dirent *de, *entry;
1598   int i = 0, cnt = 0;
1599   char **entries;
1600   int size = 0;
1601
1602   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1603            first ? "/" : "", first ? first : "",
1604            second ? "/" : "", second ? second : "",
1605            third ? "/" : "", third ? third : "");
1606 #ifdef _PC_NAME_MAX
1607   size = pathconf(path, _PC_NAME_MAX);
1608 #endif
1609   size = MIN(size, PATH_MAX + 128);
1610   de = alloca(size);
1611   root = opendir(path);
1612   if(!root) return;
1613   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1614   closedir(root);
1615   root = opendir(path);
1616   if(!root) return;
1617   entries = malloc(sizeof(*entries) * cnt);
1618   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1619     if(i < cnt) {
1620       entries[i++] = strdup(entry->d_name);
1621     }
1622   }
1623   closedir(root);
1624   cnt = i; /* could have changed, directories are fickle */
1625   qsort(entries, i, sizeof(*entries),
1626         (int (*)(const void *, const void *))strcasecmp);
1627   for(i=0; i<cnt; i++) {
1628     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1629     noitL(ds_deb, "Processing L%d entry '%s'\n",
1630           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1631     if(!first)
1632       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1633     else if(!second)
1634       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1635     else if(!third)
1636       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1637     else if(strlen(entries[i]) == 16)
1638       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1639   }
1640 }
1641 static void
1642 stratcon_datastore_sweep_journals() {
1643   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1644 }
1645
1646 int
1647 stratcon_datastore_ingest_all_storagenode_info() {
1648   int i, cnt = 0;
1649   ds_single_detail _d = { 0 }, *d = &_d;
1650   conn_q *cq;
1651   cq = get_conn_q_for_metanode();
1652
1653   while(stratcon_database_connect(cq)) {
1654     noitL(noit_error, "Error connecting to database\n");
1655     sleep(1);
1656   }
1657
1658   GET_QUERY(all_storage);
1659   PG_EXEC(all_storage);
1660   cnt = PQntuples(d->res);
1661   for(i=0; i<cnt; i++) {
1662     void *vinfo;
1663     char *tmpint, *fqdn, *dsn;
1664     int storagenode_id;
1665     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1666     storagenode_id = atoi(tmpint);
1667     PG_GET_STR_COL(fqdn, i, "fqdn");
1668     PG_GET_STR_COL(dsn, i, "dsn");
1669     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1670     storagenode_id = tmpint ? atoi(tmpint) : 0;
1671
1672     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1673                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1674       storagenode_info *info;
1675       info = calloc(1, sizeof(*info));
1676       info->storagenode_id = storagenode_id;
1677       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1678       info->dsn = dsn ? strdup(dsn) : NULL;
1679       noit_hash_store(&storagenode_to_info_cache,
1680                       (void *)&info->storagenode_id, sizeof(int), info);
1681       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1682             info->storagenode_id,
1683             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1684     }
1685   }
1686   PQclear(d->res);
1687  bad_row:
1688   free_params(d);
1689
1690   release_conn_q(cq);
1691   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1692   return cnt;
1693 }
1694 int
1695 stratcon_datastore_ingest_all_check_info() {
1696   int i, cnt, loaded = 0;
1697   ds_single_detail _d = { 0 }, *d = &_d;
1698   conn_q *cq;
1699   cq = get_conn_q_for_metanode();
1700
1701   while(stratcon_database_connect(cq)) {
1702     noitL(noit_error, "Error connecting to database\n");
1703     sleep(1);
1704   }
1705
1706   GET_QUERY(check_mapall);
1707   PG_EXEC(check_mapall);
1708   cnt = PQntuples(d->res);
1709   for(i=0; i<cnt; i++) {
1710     void *vinfo;
1711     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1712     int sid, storagenode_id;
1713     uuid_info *uuidinfo;
1714     PG_GET_STR_COL(uuid_str, i, "id");
1715     if(!uuid_str) continue;
1716     PG_GET_STR_COL(tmpint, i, "sid");
1717     if(!tmpint) continue;
1718     sid = atoi(tmpint);
1719     PG_GET_STR_COL(fqdn, i, "fqdn");
1720     PG_GET_STR_COL(dsn, i, "dsn");
1721     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1722     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1723     storagenode_id = tmpint ? atoi(tmpint) : 0;
1724
1725     uuidinfo = calloc(1, sizeof(*uuidinfo));
1726     uuidinfo->uuid_str = strdup(uuid_str);
1727     uuidinfo->remote_cn = strdup(remote_cn);
1728     uuidinfo->storagenode_id = storagenode_id;
1729     uuidinfo->sid = sid;
1730     noit_hash_store(&uuid_to_info_cache,
1731                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1732     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1733           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1734     loaded++;
1735     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1736                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1737       storagenode_info *info;
1738       info = calloc(1, sizeof(*info));
1739       info->storagenode_id = storagenode_id;
1740       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1741       info->dsn = dsn ? strdup(dsn) : NULL;
1742       noit_hash_store(&storagenode_to_info_cache,
1743                       (void *)&info->storagenode_id, sizeof(int), info);
1744       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1745             info->storagenode_id,
1746             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1747     }
1748   }
1749   PQclear(d->res);
1750  bad_row:
1751   free_params(d);
1752
1753   release_conn_q(cq);
1754   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1755   return loaded;
1756 }
1757
1758 static int
1759 rest_get_noit_config(noit_http_rest_closure_t *restc,
1760                      int npats, char **pats) {
1761   noit_http_session_ctx *ctx = restc->http_ctx;
1762   ds_single_detail *d;
1763   int row_count = 0;
1764   const char *xml = NULL;
1765   conn_q *cq = NULL;
1766
1767   if(npats != 0) {
1768     noit_http_response_server_error(ctx, "text/xml");
1769     noit_http_response_end(ctx);
1770     return 0;
1771   }
1772   d = calloc(1, sizeof(*d));
1773   GET_QUERY(config_get);
1774   cq = get_conn_q_for_metanode();
1775   if(!cq) {
1776     noit_http_response_server_error(ctx, "text/xml");
1777     goto bad_row;
1778   }
1779
1780   DECLARE_PARAM_STR(restc->remote_cn,
1781                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1782   PG_EXEC(config_get);
1783   row_count = PQntuples(d->res);
1784   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1785
1786   if(xml == NULL) {
1787     char buff[1024];
1788     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1789                                  "<row_count>%d</row_count></error>\n",
1790              restc->remote_cn, row_count);
1791     noit_http_response_append(ctx, buff, strlen(buff));
1792     noit_http_response_not_found(ctx, "text/xml");
1793   }
1794   else {
1795     noit_http_response_append(ctx, xml, strlen(xml));
1796     noit_http_response_ok(ctx, "text/xml");
1797   }
1798  bad_row:
1799   free_params((ds_single_detail *)d);
1800   d->nparams = 0;
1801   if(cq) release_conn_q(cq);
1802
1803   noit_http_response_end(ctx);
1804   return 0;
1805 }
1806
1807 void
1808 stratcon_datastore_init() {
1809   pthread_mutex_init(&ds_conns_lock, NULL);
1810   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1811   ds_err = noit_log_stream_find("error/datastore");
1812   ds_deb = noit_log_stream_find("debug/datastore");
1813   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1814   ingest_err = noit_log_stream_find("error/ingest");
1815   if(!ds_err) ds_err = noit_error;
1816   if(!ingest_err) ingest_err = noit_error;
1817   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1818                            &basejpath)) {
1819     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1820     exit(-1);
1821   }
1822   stratcon_datastore_ingest_all_check_info();
1823   stratcon_datastore_ingest_all_storagenode_info();
1824   stratcon_datastore_sweep_journals();
1825
1826   assert(noit_http_rest_register_auth(
1827     "GET", "/noits/", "^config$", rest_get_noit_config,
1828              noit_http_rest_client_cert_auth
1829   ) == 0);
1830 }
1831
Note: See TracBrowser for help on using the browser.