root/src/stratcon_datastore.c

Revision 1f81201c99fa3adc7d2ab5a32c867f5d40114bc7, 57.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

it's always some mundane detail dammit. fixes #299

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 /* Thread-safe connection pools */
209
210 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
211 static void
212 release_conn_q_forceable(conn_q *cq, int forcefree) {
213   int putback = 0;
214   cq->last_use = time(NULL);
215   pthread_mutex_lock(&cq->pool->lock);
216   cq->pool->outstanding--;
217   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
218     putback = 1;
219     cq->next = cq->pool->head;
220     cq->pool->head = cq;
221     cq->pool->in_pool++;
222   }
223   pthread_mutex_unlock(&cq->pool->lock);
224   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
225         putback ? "to pool" : "and destroy", cq->pool->queue_name);
226   pthread_cond_signal(&cq->pool->cv);
227   if(putback) return;
228
229   /* Not put back, release it */
230   if(cq->dbh) PQfinish(cq->dbh);
231   if(cq->remote_str) free(cq->remote_str);
232   if(cq->remote_cn) free(cq->remote_cn);
233   if(cq->fqdn) free(cq->fqdn);
234   if(cq->dsn) free(cq->dsn);
235   free(cq);
236 }
237 static void
238 ttl_purge_conn_pool(conn_pool *pool) {
239   int old_cnt, new_cnt;
240   time_t now = time(NULL);
241   conn_q *cq, *prev = NULL, *iter;
242   /* because we always replace on the head and update the last_use time when
243      doing so, we know they are ordered LRU on the end.  So, once we hit an
244      old one, we know all the others are old too.
245    */
246   if(!pool->head) return; /* hack short circuit for no locks */
247   pthread_mutex_lock(&pool->lock);
248   old_cnt = pool->in_pool;
249   cq = pool->head;
250   while(cq) {
251     if(cq->last_use + cq->pool->ttl < now) {
252       if(prev) prev->next = NULL;
253       else pool->head = NULL;
254       break;
255     }
256     prev = cq;
257     cq = cq->next;
258   }
259   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
260   /* Fix accounting */
261   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
262   new_cnt = pool->in_pool;
263   pthread_mutex_unlock(&pool->lock);
264
265   /* Force release these without holding the lock */
266   while(cq) {
267     cq = cq->next;
268     release_conn_q_forceable(cq, 1);
269   }
270   if(old_cnt != new_cnt)
271     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
272           pool->queue_name);
273 }
274 static void
275 release_conn_q(conn_q *cq) {
276   ttl_purge_conn_pool(cq->pool);
277   release_conn_q_forceable(cq, 0);
278 }
279 static conn_pool *
280 get_conn_pool_for_remote(const char *remote_str,
281                          const char *remote_cn, const char *fqdn) {
282   void *vcpool;
283   conn_pool *cpool = NULL;
284   char queue_name[256] = "datastore_";
285   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
286            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
287            fqdn ? fqdn : "default",
288            remote_cn ? remote_cn : "default");
289   pthread_mutex_lock(&ds_conns_lock);
290   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
291                         strlen(queue_name), &vcpool))
292     cpool = vcpool;
293   pthread_mutex_unlock(&ds_conns_lock);
294   if(!cpool) {
295     vcpool = cpool = calloc(1, sizeof(*cpool));
296     cpool->queue_name = strdup(queue_name);
297     pthread_mutex_init(&cpool->lock, NULL);
298     pthread_cond_init(&cpool->cv, NULL);
299     cpool->in_pool = 0;
300     cpool->outstanding = 0;
301     cpool->max_in_pool = 1;
302     cpool->max_allocated = 1;
303     pthread_mutex_lock(&ds_conns_lock);
304     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
305                         cpool)) {
306       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
307                          strlen(queue_name), &vcpool);
308     }
309     pthread_mutex_unlock(&ds_conns_lock);
310     if(vcpool != cpool) {
311       /* someone beat us to it */
312       free(cpool->queue_name);
313       pthread_mutex_destroy(&cpool->lock);
314       pthread_cond_destroy(&cpool->cv);
315       free(cpool);
316     }
317     else {
318       int i;
319       /* Our job to setup the pool */
320       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
321       eventer_jobq_init(cpool->jobq, queue_name);
322       cpool->jobq->backq = eventer_default_backq();
323       /* Add one thread */
324       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
325         eventer_jobq_increase_concurrency(cpool->jobq);
326     }
327     cpool = vcpool;
328   }
329   return cpool;
330 }
331 static conn_q *
332 get_conn_q_for_remote(const char *remote_str,
333                       const char *remote_cn, const char *fqdn,
334                       const char *dsn) {
335   conn_pool *cpool;
336   conn_q *cq;
337   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
338   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
339         cpool->queue_name);
340   pthread_mutex_lock(&cpool->lock);
341  again:
342   if(cpool->head) {
343     assert(cpool->in_pool > 0);
344     cq = cpool->head;
345     cpool->head = cq->next;
346     cpool->in_pool--;
347     cpool->outstanding++;
348     cq->next = NULL;
349     pthread_mutex_unlock(&cpool->lock);
350     return cq;
351   }
352   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
353     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
354           (void *)pthread_self(), cpool->queue_name);
355     pthread_cond_wait(&cpool->cv, &cpool->lock);
356     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
357           (void *)pthread_self(), cpool->queue_name);
358     goto again;
359   }
360   else {
361     cpool->outstanding++;
362     pthread_mutex_unlock(&cpool->lock);
363   }
364  
365   cq = calloc(1, sizeof(*cq));
366   cq->pool = cpool;
367   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
368   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
369   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
370   cq->dsn = dsn ? strdup(dsn) : NULL;
371   return cq;
372 }
373 static conn_q *
374 get_conn_q_for_metanode() {
375   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
376 }
377
378 typedef enum {
379   DS_EXEC_SUCCESS = 0,
380   DS_EXEC_ROW_FAILED = 1,
381   DS_EXEC_TXN_FAILED = 2,
382 } execute_outcome_t;
383
384 #define DECLARE_PARAM_STR(str, len) do { \
385   d->paramValues[d->nparams] = noit__strndup(str, len); \
386   d->paramLengths[d->nparams] = len; \
387   d->paramFormats[d->nparams] = 0; \
388   d->paramAllocd[d->nparams] = 1; \
389   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
390     free(d->paramValues[d->nparams]); \
391     d->paramValues[d->nparams] = NULL; \
392     d->paramLengths[d->nparams] = 0; \
393     d->paramAllocd[d->nparams] = 0; \
394   } \
395   d->nparams++; \
396 } while(0)
397 #define DECLARE_PARAM_INT(i) do { \
398   int buffer__len; \
399   char buffer__[32]; \
400   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
401   buffer__len = strlen(buffer__); \
402   DECLARE_PARAM_STR(buffer__, buffer__len); \
403 } while(0)
404
405 #define PG_GET_STR_COL(dest, row, name) do { \
406   int colnum = PQfnumber(d->res, name); \
407   dest = NULL; \
408   if (colnum >= 0) \
409     dest = PQgetisnull(d->res, row, colnum) \
410          ? NULL : PQgetvalue(d->res, row, colnum); \
411 } while(0)
412
413 #define PG_EXEC(cmd) do { \
414   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
415                         (const char * const *)d->paramValues, \
416                         d->paramLengths, d->paramFormats, 0); \
417   d->rv = PQresultStatus(d->res); \
418   if(d->rv != PGRES_COMMAND_OK && \
419      d->rv != PGRES_TUPLES_OK) { \
420     const char *pgerr = PQresultErrorMessage(d->res); \
421     const char *pgerr_end = strchr(pgerr, '\n'); \
422     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
423     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
424           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
425           (int)(pgerr_end - pgerr), pgerr); \
426     PQclear(d->res); \
427     goto bad_row; \
428   } \
429 } while(0)
430
431 #define PG_TM_EXEC(cmd, whence) do { \
432   time_t __w = whence; \
433   char cmdbuf[4096]; \
434   struct tm tbuf, *tm; \
435   tm = gmtime_r(&__w, &tbuf); \
436   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
437   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
438                         (const char * const *)d->paramValues, \
439                         d->paramLengths, d->paramFormats, 0); \
440   d->rv = PQresultStatus(d->res); \
441   if(d->rv != PGRES_COMMAND_OK && \
442      d->rv != PGRES_TUPLES_OK) { \
443     const char *pgerr = PQresultErrorMessage(d->res); \
444     const char *pgerr_end = strchr(pgerr, '\n'); \
445     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
446     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
447           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
448           (long long unsigned)whence); \
449     PQclear(d->res); \
450     goto bad_row; \
451   } \
452 } while(0)
453
454 static void *
455 stratcon_datastore_check_loadall(void *vsn) {
456   storagenode_info *sn = vsn;
457   ds_single_detail *d;
458   int i, row_count = 0, good = 0;
459   char buff[1024];
460   conn_q *cq = NULL;
461
462   d = calloc(1, sizeof(*d));
463   GET_QUERY(check_loadall);
464   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
465   i = 0;
466   while(stratcon_database_connect(cq)) {
467     if(i++ > 4) {
468       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
469       release_conn_q(cq);
470       return (void *)(vpsized_int)good;
471     }
472     sleep(1);
473   }
474   PG_EXEC(check_loadall);
475   row_count = PQntuples(d->res);
476  
477   for(i=0; i<row_count; i++) {
478     int rv;
479     int8_t family;
480     struct sockaddr *sin;
481     struct sockaddr_in sin4 = { .sin_family = AF_INET };
482     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
483     char *remote, *id, *target, *module, *name;
484     PG_GET_STR_COL(remote, i, "remote_address");
485     PG_GET_STR_COL(id, i, "id");
486     PG_GET_STR_COL(target, i, "target");
487     PG_GET_STR_COL(module, i, "module");
488     PG_GET_STR_COL(name, i, "name");
489     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
490
491     family = AF_INET;
492     sin = (struct sockaddr *)&sin4;
493     rv = inet_pton(family, remote, &sin4.sin_addr);
494     if(rv != 1) {
495       family = AF_INET6;
496       sin = (struct sockaddr *)&sin6;
497       rv = inet_pton(family, remote, &sin6.sin6_addr);
498       if(rv != 1) {
499         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
500         sin = NULL;
501       }
502     }
503
504     /* stratcon_iep_line_processor takes an allocated operand and frees it */
505     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
506     good++;
507   }
508   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
509         good, row_count, sn->fqdn);
510  bad_row:
511   free_params((ds_single_detail *)d);
512   free(d);
513   if(cq) release_conn_q(cq);
514   return (void *)(vpsized_int)good;
515 }
516 static int
517 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
518                                     struct timeval *now) {
519   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
520   pthread_t *jobs = NULL;
521   int nodes, i = 0, tcnt = 0;
522   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
523   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
524
525   pthread_mutex_lock(&storagenode_to_info_cache_lock);
526   nodes = storagenode_to_info_cache.size;
527   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
528   sns = calloc(MAX(1,nodes), sizeof(*sns));
529   if(nodes == 0) sns[nodes++] = &self;
530   else {
531     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
532     const char *k;
533     void *v;
534     int klen;
535     while(noit_hash_next(&storagenode_to_info_cache,
536                          &iter, &k, &klen, &v)) {
537       sns[i++] = (storagenode_info *)v;
538     }
539   }
540   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
541
542   for(i=0; i<nodes; i++) {
543     if(pthread_create(&jobs[i], NULL,
544                       stratcon_datastore_check_loadall, sns[i]) != 0) {
545       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
546     }
547   }
548   for(i=0; i<nodes; i++) {
549     void *good;
550     pthread_join(jobs[i], &good);
551     tcnt += (int)(vpsized_int)good;
552   }
553   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
554   return 0;
555 }
556 void
557 stratcon_datastore_iep_check_preload() {
558   eventer_t e;
559   conn_pool *cpool;
560
561   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
562   e = eventer_alloc();
563   e->mask = EVENTER_ASYNCH;
564   e->callback = stratcon_datastore_asynch_drive_iep;
565   e->closure = NULL;
566   eventer_add_asynch(cpool->jobq, e);
567 }
568 execute_outcome_t
569 stratcon_datastore_find(ds_rt_detail *d) {
570   conn_q *cq;
571   char *val;
572   int row_count;
573   struct realtime_tracker *node;
574
575   for(node = d->rt; node; node = node->next) {
576     char uuid_str[UUID_STR_LEN+1];
577     const char *fqdn, *dsn, *remote_cn;
578     char remote_ip[32];
579     int storagenode_id;
580
581     uuid_unparse_lower(node->checkid, uuid_str);
582     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
583                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
584       continue;
585
586     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n",
587           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
588
589     /* We might be able to find the IP from our config if someone has
590      * specified the expected cn in the noit definition.
591      */
592     if(stratcon_find_noit_ip_by_cn(remote_cn,
593                                    remote_ip, sizeof(remote_ip)) == 0) {
594       node->noit = strdup(remote_ip);
595       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
596       continue;
597     }
598
599     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
600     stratcon_database_connect(cq);
601
602     GET_QUERY(check_find);
603     DECLARE_PARAM_INT(node->sid);
604     PG_EXEC(check_find);
605     row_count = PQntuples(d->res);
606     if(row_count != 1) {
607       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
608       PQclear(d->res);
609       goto bad_row;
610     }
611
612     /* Get the remote_address (which noit owns this) */
613     PG_GET_STR_COL(val, 0, "remote_address");
614     if(!val) {
615       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
616       PQclear(d->res);
617       goto bad_row;
618     }
619     node->noit = strdup(val);
620     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
621    bad_row:
622     free_params((ds_single_detail *)d);
623     d->nparams = 0;
624     release_conn_q(cq);
625   }
626   return DS_EXEC_SUCCESS;
627 }
628 execute_outcome_t
629 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
630                            ds_line_detail *d) {
631   int type, len, sid;
632   char *final_buff;
633   uLong final_len, actual_final_len;
634   char *token;
635   char raddr_blank[1] = "";
636   const char *raddr;
637
638   type = d->data[0];
639   raddr = r ? r : raddr_blank;
640
641   /* Parse the log line, but only if we haven't already */
642   if(!d->nparams) {
643     char *scp, *ecp;
644
645     scp = d->data;
646 #define PROCESS_NEXT_FIELD(t,l) do { \
647   if(!*scp) goto bad_row; \
648   ecp = strchr(scp, '\t'); \
649   if(!ecp) goto bad_row; \
650   token = scp; \
651   len = (ecp-scp); \
652   scp = ecp + 1; \
653 } while(0)
654 #define PROCESS_LAST_FIELD(t,l) do { \
655   if(!*scp) ecp = scp; \
656   else { \
657     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
658     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
659   } \
660   t = scp; \
661   l = (ecp-scp); \
662 } while(0)
663
664     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
665     switch(type) {
666       /* See noit_check_log.c for log description */
667       case 'n':
668         DECLARE_PARAM_STR(raddr, strlen(raddr));
669         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
670         DECLARE_PARAM_STR("noitd",5); /* node_type */
671         PROCESS_NEXT_FIELD(token,len);
672         d->whence = (time_t)strtoul(token, NULL, 10);
673         DECLARE_PARAM_STR(token,len); /* timestamp */
674
675         /* This is the expected uncompressed len */
676         PROCESS_NEXT_FIELD(token,len);
677         final_len = atoi(token);
678         final_buff = malloc(final_len);
679         if(!final_buff) goto bad_row;
680  
681         /* The last token is b64 endoded and compressed.
682          * we need to decode it, declare it and then free it.
683          */
684         PROCESS_LAST_FIELD(token, len);
685         /* We can in-place decode this */
686         len = noit_b64_decode((char *)token, len,
687                               (unsigned char *)token, len);
688         if(len <= 0) {
689           noitL(noit_error, "noitd config base64 decoding error.\n");
690           free(final_buff);
691           goto bad_row;
692         }
693         actual_final_len = final_len;
694         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
695                               (unsigned char *)token, len)) {
696           noitL(noit_error, "noitd config decompression failure.\n");
697           free(final_buff);
698           goto bad_row;
699         }
700         if(final_len != actual_final_len) {
701           noitL(noit_error, "noitd config decompression error.\n");
702           free(final_buff);
703           goto bad_row;
704         }
705         DECLARE_PARAM_STR(final_buff, final_len);
706         free(final_buff);
707         break;
708       case 'C':
709         DECLARE_PARAM_STR(raddr, strlen(raddr));
710         PROCESS_NEXT_FIELD(token,len);
711         DECLARE_PARAM_STR(token,len); /* timestamp */
712         d->whence = (time_t)strtoul(token, NULL, 10);
713         PROCESS_NEXT_FIELD(token, len);
714         sid = uuid_to_sid(token, remote_cn);
715         if(sid == 0) goto bad_row;
716         DECLARE_PARAM_INT(sid); /* sid */
717         DECLARE_PARAM_STR(token,len); /* uuid */
718         PROCESS_NEXT_FIELD(token, len);
719         DECLARE_PARAM_STR(token,len); /* target */
720         PROCESS_NEXT_FIELD(token, len);
721         DECLARE_PARAM_STR(token,len); /* module */
722         PROCESS_LAST_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* name */
724         break;
725       case 'M':
726         PROCESS_NEXT_FIELD(token,len);
727         DECLARE_PARAM_STR(token,len); /* timestamp */
728         d->whence = (time_t)strtoul(token, NULL, 10);
729         PROCESS_NEXT_FIELD(token, len);
730         sid = uuid_to_sid(token, remote_cn);
731         if(sid == 0) goto bad_row;
732         DECLARE_PARAM_INT(sid); /* sid */
733         PROCESS_NEXT_FIELD(token, len);
734         DECLARE_PARAM_STR(token,len); /* name */
735         PROCESS_NEXT_FIELD(token,len);
736         d->metric_type = *token;
737         PROCESS_LAST_FIELD(token,len);
738         DECLARE_PARAM_STR(token,len); /* value */
739         break;
740       case 'S':
741         PROCESS_NEXT_FIELD(token,len);
742         DECLARE_PARAM_STR(token,len); /* timestamp */
743         d->whence = (time_t)strtoul(token, NULL, 10);
744         PROCESS_NEXT_FIELD(token, len);
745         sid = uuid_to_sid(token, remote_cn);
746         if(sid == 0) goto bad_row;
747         DECLARE_PARAM_INT(sid); /* sid */
748         PROCESS_NEXT_FIELD(token, len);
749         DECLARE_PARAM_STR(token,len); /* state */
750         PROCESS_NEXT_FIELD(token, len);
751         DECLARE_PARAM_STR(token,len); /* availability */
752         PROCESS_NEXT_FIELD(token, len);
753         DECLARE_PARAM_STR(token,len); /* duration */
754         PROCESS_LAST_FIELD(token,len);
755         DECLARE_PARAM_STR(token,len); /* status */
756         break;
757       default:
758         goto bad_row;
759     }
760
761   }
762
763   /* Now execute the query */
764   switch(type) {
765     case 'n':
766       GET_QUERY(config_insert);
767       PG_EXEC(config_insert);
768       PQclear(d->res);
769       break;
770     case 'C':
771       GET_QUERY(check_insert);
772       PG_TM_EXEC(check_insert, d->whence);
773       PQclear(d->res);
774       break;
775     case 'S':
776       GET_QUERY(status_insert);
777       PG_TM_EXEC(status_insert, d->whence);
778       PQclear(d->res);
779       break;
780     case 'M':
781       switch(d->metric_type) {
782         case METRIC_INT32:
783         case METRIC_UINT32:
784         case METRIC_INT64:
785         case METRIC_UINT64:
786         case METRIC_DOUBLE:
787           GET_QUERY(metric_insert_numeric);
788           PG_TM_EXEC(metric_insert_numeric, d->whence);
789           PQclear(d->res);
790           break;
791         case METRIC_STRING:
792           GET_QUERY(metric_insert_text);
793           PG_TM_EXEC(metric_insert_text, d->whence);
794           PQclear(d->res);
795           break;
796         default:
797           goto bad_row;
798       }
799       break;
800     default:
801       /* should never get here */
802       goto bad_row;
803   }
804   return DS_EXEC_SUCCESS;
805  bad_row:
806   return DS_EXEC_ROW_FAILED;
807 }
808 static int
809 stratcon_database_post_connect(conn_q *cq) {
810   int rv = 0;
811   ds_single_detail _d = { 0 }, *d = &_d;
812   if(cq->fqdn) {
813     char *remote_str, *remote_cn;
814     /* This is the silly way we get null's in through our declare_param_str */
815     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
816     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
817     /* This is a storage node, it gets the storage node post_connect */
818     GET_QUERY(storage_post_connect);
819     rv = -1; /* now we're serious */
820     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
821     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
822     PG_EXEC(storage_post_connect);
823     PQclear(d->res);
824     rv = 0;
825   }
826   else {
827     /* Metanode post_connect */
828     GET_QUERY(metanode_post_connect);
829     rv = -1; /* now we're serious */
830     PG_EXEC(metanode_post_connect);
831     PQclear(d->res);
832     rv = 0;
833   }
834  bad_row:
835   free_params(d);
836   if(rv == -1) {
837     /* Post-connect intentions are serious and fatal */
838     PQfinish(cq->dbh);
839     cq->dbh = NULL;
840   }
841   return rv;
842 }
843 static int
844 stratcon_database_connect(conn_q *cq) {
845   char *dsn, dsn_meta[512];
846   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
847   const char *k, *v;
848   int klen;
849   noit_hash_table *t;
850
851   dsn_meta[0] = '\0';
852   if(!cq->dsn) {
853     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
854     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
855       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
856       strlcat(dsn_meta, k, sizeof(dsn_meta));
857       strlcat(dsn_meta, "=", sizeof(dsn_meta));
858       strlcat(dsn_meta, v, sizeof(dsn_meta));
859     }
860     noit_hash_destroy(t, free, free);
861     free(t);
862     dsn = dsn_meta;
863   }
864   else {
865     char options[32];
866     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
867     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
868                                options, sizeof(options))) {
869       strlcat(dsn_meta, " ", sizeof(dsn_meta));
870       strlcat(dsn_meta, "user", sizeof(dsn_meta));
871       strlcat(dsn_meta, "=", sizeof(dsn_meta));
872       strlcat(dsn_meta, options, sizeof(dsn_meta));
873     }
874     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
875                                options, sizeof(options))) {
876       strlcat(dsn_meta, " ", sizeof(dsn_meta));
877       strlcat(dsn_meta, "password", sizeof(dsn_meta));
878       strlcat(dsn_meta, "=", sizeof(dsn_meta));
879       strlcat(dsn_meta, options, sizeof(dsn_meta));
880     }
881     dsn = dsn_meta;
882   }
883
884   if(cq->dbh) {
885     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
886     PQreset(cq->dbh);
887     if(PQstatus(cq->dbh) != CONNECTION_OK) {
888       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
889             dsn, PQerrorMessage(cq->dbh));
890       return -1;
891     }
892     if(stratcon_database_post_connect(cq)) return -1;
893     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
894     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
895           dsn, PQerrorMessage(cq->dbh));
896     return -1;
897   }
898
899   cq->dbh = PQconnectdb(dsn);
900   if(!cq->dbh) return -1;
901   if(PQstatus(cq->dbh) != CONNECTION_OK) {
902     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
903           dsn, PQerrorMessage(cq->dbh));
904     return -1;
905   }
906   if(stratcon_database_post_connect(cq)) return -1;
907   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
908   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
909         dsn, PQerrorMessage(cq->dbh));
910   return -1;
911 }
912 static int
913 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
914                                 const char *name) {
915   int rv = -1;
916   PGresult *res;
917   char cmd[128];
918   strlcpy(cmd, p, sizeof(cmd));
919   strlcat(cmd, name, sizeof(cmd));
920   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
921   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
922   PQclear(res);
923   return rv;
924 }
925 static int
926 stratcon_datastore_do(conn_q *cq, const char *cmd) {
927   PGresult *res;
928   int rv = -1;
929   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
930   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
931   PQclear(res);
932   return rv;
933 }
934 #define BUSTED(cq) do { \
935   PQfinish((cq)->dbh); \
936   (cq)->dbh = NULL; \
937   goto full_monty; \
938 } while(0)
939 #define SAVEPOINT(name) do { \
940   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
941   last_sp = current; \
942 } while(0)
943 #define ROLLBACK_TO_SAVEPOINT(name) do { \
944   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
945     BUSTED(cq); \
946   last_sp = NULL; \
947 } while(0)
948 #define RELEASE_SAVEPOINT(name) do { \
949   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
950     BUSTED(cq); \
951   last_sp = NULL; \
952 } while(0)
953 int
954 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
955                                  struct timeval *now) {
956   ds_rt_detail *dsjd = closure;
957   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
958   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
959
960   assert(dsjd->rt);
961   stratcon_datastore_find(dsjd);
962   if(dsjd->completion_event)
963     eventer_add(dsjd->completion_event);
964
965   free_params((ds_single_detail *)dsjd);
966   free(dsjd);
967   return 0;
968 }
969 static const char *
970 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
971   void *vinfo;
972   char *dsn = NULL, *fqdn = NULL;
973   int found = 0;
974   storagenode_info *info = NULL;
975   pthread_mutex_lock(&storagenode_to_info_cache_lock);
976   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
977                         &vinfo)) {
978     found = 1;
979     info = vinfo;
980   }
981   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
982   if(found) {
983     if(fqdn_out) *fqdn_out = info->fqdn;
984     return info->dsn;
985   }
986
987   if(!found && can_use_db) {
988     ds_single_detail *d;
989     conn_q *cq;
990     int row_count;
991     /* Look it up and store it */
992     d = calloc(1, sizeof(*d));
993     cq = get_conn_q_for_metanode();
994     GET_QUERY(find_storage);
995     DECLARE_PARAM_INT(id);
996     PG_EXEC(find_storage);
997     row_count = PQntuples(d->res);
998     if(row_count) {
999       PG_GET_STR_COL(dsn, 0, "dsn");
1000       PG_GET_STR_COL(fqdn, 0, "fqdn");
1001       fqdn = fqdn ? strdup(fqdn) : NULL;
1002       dsn = dsn ? strdup(dsn) : NULL;
1003     }
1004     PQclear(d->res);
1005    bad_row:
1006     free_params(d);
1007     free(d);
1008     release_conn_q(cq);
1009   }
1010   if(fqdn) {
1011     info = calloc(1, sizeof(*info));
1012     info->fqdn = fqdn;
1013     if(fqdn_out) *fqdn_out = info->fqdn;
1014     info->dsn = dsn;
1015     info->storagenode_id = id;
1016     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1017     noit_hash_store(&storagenode_to_info_cache,
1018                     (void *)&info->storagenode_id, sizeof(int), info);
1019     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1020   }
1021   return info ? info->dsn : NULL;
1022 }
1023 static ds_line_detail *
1024 build_insert_batch(interim_journal_t *ij) {
1025   int rv;
1026   off_t len;
1027   const char *buff, *cp, *lcp;
1028   struct stat st;
1029   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1030
1031   if(ij->fd < 0) {
1032     ij->fd = open(ij->filename, O_RDONLY);
1033     if(ij->fd < 0) {
1034       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1035             ij->filename, strerror(errno));
1036       assert(ij->fd >= 0);
1037     }
1038   }
1039   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1040   if(rv == -1) {
1041       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1042             ij->filename, strerror(errno));
1043     assert(rv != -1);
1044   }
1045   len = st.st_size;
1046   if(len > 0) {
1047     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1048     if(buff == (void *)-1) {
1049       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1050             ij->filename, strerror(errno));
1051       assert(buff != (void *)-1);
1052     }
1053     lcp = buff;
1054     while(lcp < (buff + len) &&
1055           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1056       next = calloc(1, sizeof(*next));
1057       next->data = malloc(cp - lcp + 1);
1058       memcpy(next->data, lcp, cp - lcp);
1059       next->data[cp - lcp] = '\0';
1060       if(!head) head = next;
1061       if(last) last->next = next;
1062       last = next;
1063       lcp = cp + 1;
1064     }
1065     munmap((void *)buff, len);
1066   }
1067   close(ij->fd);
1068   return head;
1069 }
1070 static void
1071 interim_journal_remove(interim_journal_t *ij) {
1072   unlink(ij->filename);
1073   if(ij->filename) free(ij->filename);
1074   if(ij->remote_str) free(ij->remote_str);
1075   if(ij->remote_cn) free(ij->remote_cn);
1076   if(ij->fqdn) free(ij->fqdn);
1077 }
1078 int
1079 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1080                                   struct timeval *now) {
1081   int i, total, success, sp_total, sp_success;
1082   interim_journal_t *ij;
1083   ds_line_detail *head = NULL, *current, *last_sp;
1084   const char *dsn;
1085   conn_q *cq;
1086   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1087   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1088
1089   ij = closure;
1090   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1091   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1092   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1093                              ij->fqdn, dsn);
1094   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1095         ij->remote_str, ij->remote_cn, ij->fqdn);
1096  full_monty:
1097   /* Make sure we have a connection */
1098   i = 1;
1099   while(stratcon_database_connect(cq)) {
1100     noitL(noit_error, "Error connecting to database: %s\n",
1101           ij->fqdn ? ij->fqdn : "(null)");
1102     sleep(i);
1103     i *= 2;
1104     i = MIN(i, 16);
1105   }
1106
1107   if(head == NULL) head = build_insert_batch(ij);
1108   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1109         ij->remote_str ? ij->remote_str : "(null)",
1110         ij->remote_cn ? ij->remote_cn : "(null)",
1111         ij->fqdn ? ij->fqdn : "(null)");
1112   current = head;
1113   last_sp = NULL;
1114   total = success = sp_total = sp_success = 0;
1115   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1116   while(current) {
1117     execute_outcome_t rv;
1118     if(current->data) {
1119       if(!last_sp) {
1120         SAVEPOINT("batch");
1121         sp_success = success;
1122         sp_total = total;
1123       }
1124  
1125       if(current->problematic) {
1126         RELEASE_SAVEPOINT("batch");
1127         current = current->next;
1128         total++;
1129         continue;
1130       }
1131       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1132                                       current);
1133       switch(rv) {
1134         case DS_EXEC_SUCCESS:
1135           total++;
1136           success++;
1137           current = current->next;
1138           break;
1139         case DS_EXEC_ROW_FAILED:
1140           /* rollback to savepoint, mark this record as bad and start again */
1141           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1142           current->problematic = 1;
1143           current = last_sp;
1144           success = sp_success;
1145           total = sp_total;
1146           ROLLBACK_TO_SAVEPOINT("batch");
1147           break;
1148         case DS_EXEC_TXN_FAILED:
1149           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1150           BUSTED(cq);
1151       }
1152     }
1153   }
1154   if(last_sp) RELEASE_SAVEPOINT("batch");
1155   if(stratcon_datastore_do(cq, "COMMIT")) {
1156     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1157     BUSTED(cq);
1158   }
1159   /* Cleanup the mess */
1160   while(head) {
1161     ds_line_detail *tofree;
1162     tofree = head;
1163     head = head->next;
1164     if(tofree->data) free(tofree->data);
1165     free_params((ds_single_detail *)tofree);
1166     free(tofree);
1167   }
1168   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1169         ij->remote_str ? ij->remote_str : "(null)",
1170         ij->remote_cn ? ij->remote_cn : "(null)",
1171         ij->fqdn ? ij->fqdn : "(null)", success, total);
1172   interim_journal_remove(ij);
1173   release_conn_q(cq);
1174   return 0;
1175 }
1176 static int
1177 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1178                                 struct timeval *now) {
1179   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1180   const char *k;
1181   int klen;
1182   void *vij;
1183   interim_journal_t *ij;
1184   syncset_t *syncset = closure;
1185
1186   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1187     eventer_add(syncset->completion);
1188     eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1189     free(syncset);
1190     return 0;
1191   }
1192   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1193
1194   noitL(ds_deb, "Syncing journal sets...\n");
1195   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1196     eventer_t ingest;
1197     ij = vij;
1198     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1199           ij->remote_str, ij->remote_cn, ij->fqdn);
1200     fsync(ij->fd);
1201     close(ij->fd);
1202     ij->fd = -1;
1203     ingest = eventer_alloc();
1204     ingest->mask = EVENTER_ASYNCH;
1205     ingest->callback = stratcon_datastore_asynch_execute;
1206     ingest->closure = ij;
1207     eventer_add_asynch(ij->cpool->jobq, ingest);
1208   }
1209   noit_hash_destroy(syncset->ws, free, NULL);
1210   free(syncset->ws);
1211   return 0;
1212 }
1213 static interim_journal_t *
1214 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1215                     int storagenode_id, const char *fqdn_in) {
1216   void *vhash, *vij;
1217   noit_hash_table *working_set;
1218   interim_journal_t *ij;
1219   struct timeval now;
1220   char jpath[PATH_MAX];
1221   char remote_str[128];
1222   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1223   const char *fqdn = fqdn_in ? fqdn_in : "default";
1224
1225   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1226   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1227
1228   /* Lookup the working set */
1229   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1230     working_set = calloc(1, sizeof(*working_set));
1231     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1232                     working_set);
1233   }
1234   else
1235     working_set = vhash;
1236
1237   /* Lookup the interim journal within the working set */
1238   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1239     ij = calloc(1, sizeof(*ij));
1240     gettimeofday(&now, NULL);
1241     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1242              basejpath, remote_str, remote_cn, storagenode_id,
1243              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1244     ij->remote_str = strdup(remote_str);
1245     ij->remote_cn = strdup(remote_cn);
1246     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1247     ij->storagenode_id = storagenode_id;
1248     ij->filename = strdup(jpath);
1249     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1250                                          ij->fqdn);
1251     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1252     if(ij->fd < 0 && errno == ENOENT) {
1253       if(mkdir_for_file(ij->filename, 0750)) {
1254         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1255               ij->filename, strerror(errno));
1256         exit(-1);
1257       }
1258       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1259     }
1260     if(ij->fd < 0) {
1261       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1262             ij->filename, strerror(errno));
1263       exit(-1);
1264     }
1265     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1266   }
1267   else
1268     ij = vij;
1269
1270   return ij;
1271 }
1272 static int
1273 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1274                           int *sid_out, int *storagenode_id_out,
1275                           const char **remote_cn_out,
1276                           const char **fqdn_out, const char **dsn_out) {
1277   /* only called from the main thread -- no safety issues */
1278   void *vuuidinfo, *vinfo;
1279   uuid_info *uuidinfo;
1280   storagenode_info *info = NULL;
1281   char *fqdn = NULL;
1282   char *dsn = NULL;
1283   char *new_remote_cn = NULL;
1284   int storagenode_id = 0, sid = 0;
1285   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1286                          &vuuidinfo)) {
1287     int row_count = 0;
1288     char *tmpint;
1289     ds_single_detail *d;
1290     conn_q *cq;
1291
1292     /* We can't do a database lookup without the remote_cn */
1293     if(!remote_cn) {
1294       if(stratcon_datastore_get_enabled()) {
1295         /* We have an authoritatively maintained cache, we don't do lookups */
1296         return -1;
1297       }
1298       else
1299         remote_cn = "[[null]]";
1300     }
1301
1302     d = calloc(1, sizeof(*d));
1303     cq = get_conn_q_for_metanode();
1304     if(stratcon_database_connect(cq) == 0) {
1305       /* Blocking call to service the cache miss */
1306       GET_QUERY(check_map);
1307       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1308       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1309       PG_EXEC(check_map);
1310       row_count = PQntuples(d->res);
1311       if(row_count != 1) {
1312         PQclear(d->res);
1313         goto bad_row;
1314       }
1315       PG_GET_STR_COL(tmpint, 0, "sid");
1316       if(!tmpint) {
1317         row_count = 0;
1318         PQclear(d->res);
1319         goto bad_row;
1320       }
1321       sid = atoi(tmpint);
1322       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1323       if(tmpint) storagenode_id = atoi(tmpint);
1324       PG_GET_STR_COL(fqdn, 0, "fqdn");
1325       PG_GET_STR_COL(dsn, 0, "dsn");
1326       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1327       fqdn = fqdn ? strdup(fqdn) : NULL;
1328       dsn = dsn ? strdup(dsn) : NULL;
1329       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1330       PQclear(d->res);
1331     }
1332    bad_row:
1333     free_params((ds_single_detail *)d);
1334     free(d);
1335     release_conn_q(cq);
1336     if(row_count != 1) {
1337       return -1;
1338     }
1339     /* Place in cache */
1340     uuidinfo = calloc(1, sizeof(*uuidinfo));
1341     uuidinfo->sid = sid;
1342     uuidinfo->uuid_str = strdup(uuid_str);
1343     uuidinfo->storagenode_id = storagenode_id;
1344     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1345     noit_hash_store(&uuid_to_info_cache,
1346                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1347     /* Also, we may have just witnessed a new storage node, store it */
1348     if(storagenode_id) {
1349       int needs_free = 0;
1350       info = calloc(1, sizeof(*info));
1351       info->storagenode_id = storagenode_id;
1352       info->dsn = dsn ? strdup(dsn) : NULL;
1353       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1354       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1355       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1356                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1357         /* hack to save memory -- we *never* remove from these caches,
1358            so we can use the same fqdn value in the above cache for the key
1359            in the cache below -- (no strdup) */
1360         noit_hash_store(&storagenode_to_info_cache,
1361                         (void *)&info->storagenode_id, sizeof(int), info);
1362       }
1363       else needs_free = 1;
1364       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1365       if(needs_free) {
1366         if(info->dsn) free(info->dsn);
1367         if(info->fqdn) free(info->fqdn);
1368         free(info);
1369       }
1370     }
1371   }
1372   else
1373     uuidinfo = vuuidinfo;
1374
1375   if(uuidinfo && uuidinfo->storagenode_id) {
1376     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1377       /* we don't have dsn and we actually want it */
1378       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1379       if(noit_hash_retrieve(&storagenode_to_info_cache,
1380                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1381                             &vinfo))
1382         info = vinfo;
1383       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1384     }
1385   }
1386
1387   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1388   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1389   assert(uuidinfo);
1390   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1391   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1392   if(sid_out) *sid_out = uuidinfo->sid;
1393   if(fqdn) free(fqdn);
1394   if(dsn) free(dsn);
1395   if(new_remote_cn) free(new_remote_cn);
1396   return 0;
1397 }
1398 static int
1399 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1400   char uuid_str[UUID_STR_LEN+1];
1401   int sid = 0;
1402   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1403   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1404   return sid;
1405 }
1406 static void
1407 stratcon_datastore_journal(struct sockaddr *remote,
1408                            const char *remote_cn, const char *line) {
1409   interim_journal_t *ij = NULL;
1410   char uuid_str[UUID_STR_LEN+1], *cp;
1411   const char *fqdn = NULL, *dsn = NULL;
1412   int storagenode_id = 0;
1413   uuid_t checkid;
1414   if(!line) return;
1415   /* if it is a UUID based thing, find the storage node */
1416   switch(*line) {
1417     case 'C':
1418     case 'S':
1419     case 'M':
1420       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1421         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1422         if(!uuid_parse(uuid_str, checkid)) {
1423           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1424                                     &storagenode_id, NULL, &fqdn, &dsn);
1425           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1426         }
1427       }
1428       break;
1429     case 'n':
1430       ij = interim_journal_get(remote,remote_cn,0,NULL);
1431       break;
1432     default:
1433       break;
1434   }
1435   if(!ij) {
1436     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1437   }
1438   else {
1439     int len;
1440     len = write(ij->fd, line, strlen(line));
1441     if(len < 0) {
1442       noitL(noit_error, "write to %s failed: %s\n",
1443             ij->filename, strerror(errno));
1444     }
1445   }
1446   free(line);
1447   return;
1448 }
1449 static noit_hash_table *
1450 stratcon_datastore_journal_remove(struct sockaddr *remote,
1451                                   const char *remote_cn) {
1452   void *vhash = NULL;
1453   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1454     /* pluck it out */
1455     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1456   }
1457   else {
1458     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1459           remote_cn);
1460     abort();
1461   }
1462   return vhash;
1463 }
1464 void
1465 stratcon_datastore_push(stratcon_datastore_op_t op,
1466                         struct sockaddr *remote,
1467                         const char *remote_cn, void *operand,
1468                         eventer_t completion) {
1469   conn_pool *cpool;
1470   syncset_t *syncset;
1471   eventer_t e;
1472   ds_rt_detail *rtdetail;
1473   struct datastore_onlooker_list *nnode;
1474
1475   for(nnode = onlookers; nnode; nnode = nnode->next)
1476     nnode->dispatch(op,remote,remote_cn,operand);
1477
1478   switch(op) {
1479     case DS_OP_INSERT:
1480       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1481       break;
1482     case DS_OP_CHKPT:
1483       e = eventer_alloc();
1484       syncset = calloc(1, sizeof(*syncset));
1485       e->mask = EVENTER_ASYNCH;
1486       e->callback = stratcon_datastore_journal_sync;
1487       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1488       syncset->completion = completion;
1489       e->closure = syncset;
1490       eventer_add(e);
1491       break;
1492     case DS_OP_FIND_COMPLETE:
1493       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1494       rtdetail = calloc(1, sizeof(*rtdetail));
1495       rtdetail->rt = operand;
1496       rtdetail->completion_event = completion;
1497       e = eventer_alloc();
1498       e->mask = EVENTER_ASYNCH;
1499       e->callback = stratcon_datastore_asynch_lookup;
1500       e->closure = rtdetail;
1501       eventer_add_asynch(cpool->jobq, e);
1502       break;
1503   }
1504 }
1505
1506 int
1507 stratcon_datastore_saveconfig(void *unused) {
1508   int rv = -1;
1509   char *buff;
1510   ds_single_detail _d = { 0 }, *d = &_d;
1511   conn_q *cq;
1512   char ipv4_str[32];
1513   struct in_addr r, l;
1514
1515   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1516   memset(&l, 0, sizeof(l));
1517   noit_getip_ipv4(r, &l);
1518   /* Ignore the error.. what are we going to do anyway */
1519   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1520     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1521
1522   cq = get_conn_q_for_metanode();
1523
1524   if(stratcon_database_connect(cq) == 0) {
1525     char time_as_str[20];
1526     size_t len;
1527     buff = noit_conf_xml_in_mem(&len);
1528     if(!buff) goto bad_row;
1529
1530     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1531     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1532     DECLARE_PARAM_STR("", 0);
1533     DECLARE_PARAM_STR("stratcond", 9);
1534     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1535     DECLARE_PARAM_STR(buff, len);
1536     free(buff);
1537
1538     GET_QUERY(config_insert);
1539     PG_EXEC(config_insert);
1540     PQclear(d->res);
1541     rv = 0;
1542
1543     bad_row:
1544       free_params(d);
1545   }
1546   release_conn_q(cq);
1547   return rv;
1548 }
1549
1550 void
1551 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1552                                                struct sockaddr *,
1553                                                const char *, void *)) {
1554   struct datastore_onlooker_list *nnode;
1555   nnode = calloc(1, sizeof(*nnode));
1556   nnode->dispatch = f;
1557   nnode->next = onlookers;
1558   while(noit_atomic_casptr((volatile void **)&onlookers,
1559                            nnode, nnode->next) != (void *)nnode->next)
1560     nnode->next = onlookers;
1561 }
1562 static void
1563 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1564                                          char *id_str, char *file) {
1565   char path[PATH_MAX];
1566   interim_journal_t *ij;
1567   eventer_t ingest;
1568
1569   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1570            basejpath, remote_str, remote_cn, id_str, file);
1571   ij = calloc(1, sizeof(*ij));
1572   ij->fd = open(path, O_RDONLY);
1573   if(ij->fd < 0) {
1574     noitL(noit_error, "cannot open journal '%s': %s\n",
1575           path, strerror(errno));
1576     free(ij);
1577     return;
1578   }
1579   close(ij->fd);
1580   ij->fd = -1;
1581   ij->filename = strdup(path);
1582   ij->remote_str = strdup(remote_str);
1583   ij->remote_cn = strdup(remote_cn);
1584   ij->storagenode_id = atoi(id_str);
1585   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1586                                        ij->fqdn);
1587   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1588   ingest = eventer_alloc();
1589   ingest->mask = EVENTER_ASYNCH;
1590   ingest->callback = stratcon_datastore_asynch_execute;
1591   ingest->closure = ij;
1592   eventer_add_asynch(ij->cpool->jobq, ingest);
1593 }
1594 static void
1595 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1596   char path[PATH_MAX];
1597   DIR *root;
1598   struct dirent *de, *entry;
1599   int i = 0, cnt = 0;
1600   char **entries;
1601   int size = 0;
1602
1603   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1604            first ? "/" : "", first ? first : "",
1605            second ? "/" : "", second ? second : "",
1606            third ? "/" : "", third ? third : "");
1607 #ifdef _PC_NAME_MAX
1608   size = pathconf(path, _PC_NAME_MAX);
1609 #endif
1610   size = MIN(size, PATH_MAX + 128);
1611   de = alloca(size);
1612   root = opendir(path);
1613   if(!root) return;
1614   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1615   closedir(root);
1616   root = opendir(path);
1617   if(!root) return;
1618   entries = malloc(sizeof(*entries) * cnt);
1619   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1620     if(i < cnt) {
1621       entries[i++] = strdup(entry->d_name);
1622     }
1623   }
1624   closedir(root);
1625   cnt = i; /* could have changed, directories are fickle */
1626   qsort(entries, i, sizeof(*entries),
1627         (int (*)(const void *, const void *))strcasecmp);
1628   for(i=0; i<cnt; i++) {
1629     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1630     noitL(ds_deb, "Processing L%d entry '%s'\n",
1631           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1632     if(!first)
1633       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1634     else if(!second)
1635       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1636     else if(!third)
1637       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1638     else if(strlen(entries[i]) == 16)
1639       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1640   }
1641 }
1642 static void
1643 stratcon_datastore_sweep_journals() {
1644   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1645 }
1646
1647 int
1648 stratcon_datastore_ingest_all_storagenode_info() {
1649   int i, cnt = 0;
1650   ds_single_detail _d = { 0 }, *d = &_d;
1651   conn_q *cq;
1652   cq = get_conn_q_for_metanode();
1653
1654   while(stratcon_database_connect(cq)) {
1655     noitL(noit_error, "Error connecting to database\n");
1656     sleep(1);
1657   }
1658
1659   GET_QUERY(all_storage);
1660   PG_EXEC(all_storage);
1661   cnt = PQntuples(d->res);
1662   for(i=0; i<cnt; i++) {
1663     void *vinfo;
1664     char *tmpint, *fqdn, *dsn;
1665     int storagenode_id;
1666     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1667     storagenode_id = atoi(tmpint);
1668     PG_GET_STR_COL(fqdn, i, "fqdn");
1669     PG_GET_STR_COL(dsn, i, "dsn");
1670     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1671     storagenode_id = tmpint ? atoi(tmpint) : 0;
1672
1673     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1674                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1675       storagenode_info *info;
1676       info = calloc(1, sizeof(*info));
1677       info->storagenode_id = storagenode_id;
1678       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1679       info->dsn = dsn ? strdup(dsn) : NULL;
1680       noit_hash_store(&storagenode_to_info_cache,
1681                       (void *)&info->storagenode_id, sizeof(int), info);
1682       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1683             info->storagenode_id,
1684             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1685     }
1686   }
1687   PQclear(d->res);
1688  bad_row:
1689   free_params(d);
1690
1691   release_conn_q(cq);
1692   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1693   return cnt;
1694 }
1695 int
1696 stratcon_datastore_ingest_all_check_info() {
1697   int i, cnt, loaded = 0;
1698   ds_single_detail _d = { 0 }, *d = &_d;
1699   conn_q *cq;
1700   cq = get_conn_q_for_metanode();
1701
1702   while(stratcon_database_connect(cq)) {
1703     noitL(noit_error, "Error connecting to database\n");
1704     sleep(1);
1705   }
1706
1707   GET_QUERY(check_mapall);
1708   PG_EXEC(check_mapall);
1709   cnt = PQntuples(d->res);
1710   for(i=0; i<cnt; i++) {
1711     void *vinfo;
1712     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1713     int sid, storagenode_id;
1714     uuid_info *uuidinfo;
1715     PG_GET_STR_COL(uuid_str, i, "id");
1716     if(!uuid_str) continue;
1717     PG_GET_STR_COL(tmpint, i, "sid");
1718     if(!tmpint) continue;
1719     sid = atoi(tmpint);
1720     PG_GET_STR_COL(fqdn, i, "fqdn");
1721     PG_GET_STR_COL(dsn, i, "dsn");
1722     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1723     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1724     storagenode_id = tmpint ? atoi(tmpint) : 0;
1725
1726     uuidinfo = calloc(1, sizeof(*uuidinfo));
1727     uuidinfo->uuid_str = strdup(uuid_str);
1728     uuidinfo->remote_cn = strdup(remote_cn);
1729     uuidinfo->storagenode_id = storagenode_id;
1730     uuidinfo->sid = sid;
1731     noit_hash_store(&uuid_to_info_cache,
1732                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1733     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1734           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1735     loaded++;
1736     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1737                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1738       storagenode_info *info;
1739       info = calloc(1, sizeof(*info));
1740       info->storagenode_id = storagenode_id;
1741       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1742       info->dsn = dsn ? strdup(dsn) : NULL;
1743       noit_hash_store(&storagenode_to_info_cache,
1744                       (void *)&info->storagenode_id, sizeof(int), info);
1745       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1746             info->storagenode_id,
1747             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1748     }
1749   }
1750   PQclear(d->res);
1751  bad_row:
1752   free_params(d);
1753
1754   release_conn_q(cq);
1755   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1756   return loaded;
1757 }
1758
1759 static int
1760 rest_get_noit_config(noit_http_rest_closure_t *restc,
1761                      int npats, char **pats) {
1762   noit_http_session_ctx *ctx = restc->http_ctx;
1763   ds_single_detail *d;
1764   int row_count = 0;
1765   const char *xml = NULL;
1766   conn_q *cq = NULL;
1767
1768   if(npats != 0) {
1769     noit_http_response_server_error(ctx, "text/xml");
1770     noit_http_response_end(ctx);
1771     return 0;
1772   }
1773   d = calloc(1, sizeof(*d));
1774   GET_QUERY(config_get);
1775   cq = get_conn_q_for_metanode();
1776   if(!cq) {
1777     noit_http_response_server_error(ctx, "text/xml");
1778     goto bad_row;
1779   }
1780
1781   DECLARE_PARAM_STR(restc->remote_cn,
1782                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1783   PG_EXEC(config_get);
1784   row_count = PQntuples(d->res);
1785   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1786
1787   if(xml == NULL) {
1788     char buff[1024];
1789     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1790                                  "<row_count>%d</row_count></error>\n",
1791              restc->remote_cn, row_count);
1792     noit_http_response_append(ctx, buff, strlen(buff));
1793     noit_http_response_not_found(ctx, "text/xml");
1794   }
1795   else {
1796     noit_http_response_append(ctx, xml, strlen(xml));
1797     noit_http_response_ok(ctx, "text/xml");
1798   }
1799  bad_row:
1800   free_params((ds_single_detail *)d);
1801   d->nparams = 0;
1802   if(cq) release_conn_q(cq);
1803
1804   noit_http_response_end(ctx);
1805   return 0;
1806 }
1807
1808 void
1809 stratcon_datastore_init() {
1810   pthread_mutex_init(&ds_conns_lock, NULL);
1811   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1812   ds_err = noit_log_stream_find("error/datastore");
1813   ds_deb = noit_log_stream_find("debug/datastore");
1814   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1815   ingest_err = noit_log_stream_find("error/ingest");
1816   if(!ds_err) ds_err = noit_error;
1817   if(!ingest_err) ingest_err = noit_error;
1818   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1819                            &basejpath)) {
1820     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1821     exit(-1);
1822   }
1823   stratcon_datastore_ingest_all_check_info();
1824   stratcon_datastore_ingest_all_storagenode_info();
1825   stratcon_datastore_sweep_journals();
1826
1827   assert(noit_http_rest_register_auth(
1828     "GET", "/noits/", "^config$", rest_get_noit_config,
1829              noit_http_rest_client_cert_auth
1830   ) == 0);
1831 }
1832
Note: See TracBrowser for help on using the browser.