root/src/stratcon_datastore.c

Revision 58fc542e232a765112f5752a98676f3e5fb39eb0, 57.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

closes #238

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static struct datastore_onlooker_list {
83   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
84                    const char *, void *);
85   struct datastore_onlooker_list *next;
86 } *onlookers = NULL;
87
88 #define GET_QUERY(a) do { \
89   if(a == NULL) \
90     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
91       goto bad_row; \
92 } while(0)
93
94 struct conn_q;
95
96 typedef struct {
97   char            *queue_name; /* the key fqdn+remote_sn */
98   eventer_jobq_t  *jobq;
99   struct conn_q   *head;
100   pthread_mutex_t  lock;
101   pthread_cond_t   cv;
102   int              ttl;
103   int              in_pool;
104   int              outstanding;
105   int              max_allocated;
106   int              max_in_pool;
107 } conn_pool;
108 typedef struct conn_q {
109   time_t           last_use;
110   char            *dsn;        /* Pg connect string */
111   char            *remote_str; /* the IP of the noit*/
112   char            *remote_cn;  /* the Cert CN of the noit */
113   char            *fqdn;       /* the fqdn of the storage node */
114   conn_pool       *pool;
115   struct conn_q   *next;
116   /* Postgres specific stuff */
117   PGconn          *dbh;
118 } conn_q;
119
120
121 #define MAX_PARAMS 8
122 #define POSTGRES_PARTS \
123   PGresult *res; \
124   int rv; \
125   time_t whence; \
126   int nparams; \
127   int metric_type; \
128   char *paramValues[MAX_PARAMS]; \
129   int paramLengths[MAX_PARAMS]; \
130   int paramFormats[MAX_PARAMS]; \
131   int paramAllocd[MAX_PARAMS];
132
133 typedef struct ds_single_detail {
134   POSTGRES_PARTS
135 } ds_single_detail;
136 typedef struct {
137   /* Postgres specific stuff */
138   POSTGRES_PARTS
139   struct realtime_tracker *rt;
140   conn_q *cq; /* connection on which to perform this job */
141   eventer_t completion_event; /* This event should be registered if non NULL */
142 } ds_rt_detail;
143 typedef struct ds_line_detail {
144   /* Postgres specific stuff */
145   POSTGRES_PARTS
146   char *data;
147   int problematic;
148   struct ds_line_detail *next;
149 } ds_line_detail;
150
151 typedef struct {
152   noit_hash_table *ws;
153   eventer_t completion;
154 } syncset_t;
155
156 typedef struct {
157   char *remote_str;
158   char *remote_cn;
159   char *fqdn;
160   int storagenode_id;
161   int fd;
162   char *filename;
163   conn_pool *cpool;
164 } interim_journal_t;
165
166 static int stratcon_database_connect(conn_q *cq);
167 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
168 static int storage_node_quick_lookup(const char *uuid_str,
169                                      const char *remote_cn,
170                                      int *sid_out, int *storagenode_id_out,
171                                      const char **remote_cn_out,
172                                      const char **fqdn_out,
173                                      const char **dsn_out);
174
175 static void
176 free_params(ds_single_detail *d) {
177   int i;
178   for(i=0; i<d->nparams; i++)
179     if(d->paramAllocd[i] && d->paramValues[i])
180       free(d->paramValues[i]);
181 }
182
183 char *basejpath = NULL;
184 pthread_mutex_t ds_conns_lock;
185 noit_hash_table ds_conns;
186 noit_hash_table working_sets;
187
188 /* the fqdn cache needs to be thread safe */
189 typedef struct {
190   char *uuid_str;
191   char *remote_cn;
192   int storagenode_id;
193   int sid;
194 } uuid_info;
195 typedef struct {
196   int storagenode_id;
197   char *fqdn;
198   char *dsn;
199 } storagenode_info;
200 noit_hash_table uuid_to_info_cache;
201 pthread_mutex_t storagenode_to_info_cache_lock;
202 noit_hash_table storagenode_to_info_cache;
203
204 int
205 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
206   char name[128] = "";
207   buff[0] = '\0';
208   if(remote) {
209     int len = 0;
210     switch(remote->sa_family) {
211       case AF_INET:
212         len = sizeof(struct sockaddr_in);
213         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
214                   name, len);
215         break;
216       case AF_INET6:
217        len = sizeof(struct sockaddr_in6);
218         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
219                   name, len);
220        break;
221       case AF_UNIX:
222         len = SUN_LEN(((struct sockaddr_un *)remote));
223         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
224         break;
225       default: return 0;
226     }
227   }
228   strlcpy(buff, name, blen);
229   return strlen(buff);
230 }
231
232 /* Thread-safe connection pools */
233
234 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
235 static void
236 release_conn_q_forceable(conn_q *cq, int forcefree) {
237   int putback = 0;
238   cq->last_use = time(NULL);
239   pthread_mutex_lock(&cq->pool->lock);
240   cq->pool->outstanding--;
241   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
242     putback = 1;
243     cq->next = cq->pool->head;
244     cq->pool->head = cq;
245     cq->pool->in_pool++;
246   }
247   pthread_mutex_unlock(&cq->pool->lock);
248   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
249         putback ? "to pool" : "and destroy", cq->pool->queue_name);
250   pthread_cond_signal(&cq->pool->cv);
251   if(putback) return;
252
253   /* Not put back, release it */
254   if(cq->dbh) PQfinish(cq->dbh);
255   if(cq->remote_str) free(cq->remote_str);
256   if(cq->remote_cn) free(cq->remote_cn);
257   if(cq->fqdn) free(cq->fqdn);
258   if(cq->dsn) free(cq->dsn);
259   free(cq);
260 }
261 static void
262 ttl_purge_conn_pool(conn_pool *pool) {
263   int old_cnt, new_cnt;
264   time_t now = time(NULL);
265   conn_q *cq, *prev = NULL, *iter;
266   /* because we always replace on the head and update the last_use time when
267      doing so, we know they are ordered LRU on the end.  So, once we hit an
268      old one, we know all the others are old too.
269    */
270   if(!pool->head) return; /* hack short circuit for no locks */
271   pthread_mutex_lock(&pool->lock);
272   old_cnt = pool->in_pool;
273   cq = pool->head;
274   while(cq) {
275     if(cq->last_use + cq->pool->ttl < now) {
276       if(prev) prev->next = NULL;
277       else pool->head = NULL;
278       break;
279     }
280     prev = cq;
281     cq = cq->next;
282   }
283   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
284   /* Fix accounting */
285   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
286   new_cnt = pool->in_pool;
287   pthread_mutex_unlock(&pool->lock);
288
289   /* Force release these without holding the lock */
290   while(cq) {
291     prev = cq;
292     cq = cq->next;
293     release_conn_q_forceable(cq, 1);
294   }
295   if(old_cnt != new_cnt)
296     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
297           pool->queue_name);
298 }
299 static void
300 release_conn_q(conn_q *cq) {
301   ttl_purge_conn_pool(cq->pool);
302   release_conn_q_forceable(cq, 0);
303 }
304 static conn_pool *
305 get_conn_pool_for_remote(const char *remote_str,
306                          const char *remote_cn, const char *fqdn) {
307   void *vcpool;
308   conn_pool *cpool = NULL;
309   char queue_name[256] = "datastore_";
310   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
311            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
312            fqdn ? fqdn : "default",
313            remote_cn ? remote_cn : "default");
314   pthread_mutex_lock(&ds_conns_lock);
315   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
316                         strlen(queue_name), &vcpool))
317     cpool = vcpool;
318   pthread_mutex_unlock(&ds_conns_lock);
319   if(!cpool) {
320     vcpool = cpool = calloc(1, sizeof(*cpool));
321     cpool->queue_name = strdup(queue_name);
322     pthread_mutex_init(&cpool->lock, NULL);
323     pthread_cond_init(&cpool->cv, NULL);
324     cpool->in_pool = 0;
325     cpool->outstanding = 0;
326     cpool->max_in_pool = 1;
327     cpool->max_allocated = 1;
328     pthread_mutex_lock(&ds_conns_lock);
329     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
330                         cpool)) {
331       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
332                          strlen(queue_name), &vcpool);
333     }
334     pthread_mutex_unlock(&ds_conns_lock);
335     if(vcpool != cpool) {
336       /* someone beat us to it */
337       free(cpool->queue_name);
338       pthread_mutex_destroy(&cpool->lock);
339       pthread_cond_destroy(&cpool->cv);
340       free(cpool);
341     }
342     else {
343       int i;
344       /* Our job to setup the pool */
345       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
346       eventer_jobq_init(cpool->jobq, queue_name);
347       cpool->jobq->backq = eventer_default_backq();
348       /* Add one thread */
349       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
350         eventer_jobq_increase_concurrency(cpool->jobq);
351     }
352     cpool = vcpool;
353   }
354   return cpool;
355 }
356 static conn_q *
357 get_conn_q_for_remote(const char *remote_str,
358                       const char *remote_cn, const char *fqdn,
359                       const char *dsn) {
360   conn_pool *cpool;
361   conn_q *cq;
362   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
363   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
364         cpool->queue_name);
365   pthread_mutex_lock(&cpool->lock);
366  again:
367   if(cpool->head) {
368     assert(cpool->in_pool > 0);
369     cq = cpool->head;
370     cpool->head = cq->next;
371     cpool->in_pool--;
372     cpool->outstanding++;
373     cq->next = NULL;
374     pthread_mutex_unlock(&cpool->lock);
375     return cq;
376   }
377   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
378     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
379           (void *)pthread_self(), cpool->queue_name);
380     pthread_cond_wait(&cpool->cv, &cpool->lock);
381     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
382           (void *)pthread_self(), cpool->queue_name);
383     goto again;
384   }
385   else {
386     cpool->outstanding++;
387     pthread_mutex_unlock(&cpool->lock);
388   }
389  
390   cq = calloc(1, sizeof(*cq));
391   cq->pool = cpool;
392   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
393   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
394   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
395   cq->dsn = dsn ? strdup(dsn) : NULL;
396   return cq;
397 }
398 static conn_q *
399 get_conn_q_for_metanode() {
400   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
401 }
402
403 typedef enum {
404   DS_EXEC_SUCCESS = 0,
405   DS_EXEC_ROW_FAILED = 1,
406   DS_EXEC_TXN_FAILED = 2,
407 } execute_outcome_t;
408
409 #define DECLARE_PARAM_STR(str, len) do { \
410   d->paramValues[d->nparams] = noit__strndup(str, len); \
411   d->paramLengths[d->nparams] = len; \
412   d->paramFormats[d->nparams] = 0; \
413   d->paramAllocd[d->nparams] = 1; \
414   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
415     free(d->paramValues[d->nparams]); \
416     d->paramValues[d->nparams] = NULL; \
417     d->paramLengths[d->nparams] = 0; \
418     d->paramAllocd[d->nparams] = 0; \
419   } \
420   d->nparams++; \
421 } while(0)
422 #define DECLARE_PARAM_INT(i) do { \
423   int buffer__len; \
424   char buffer__[32]; \
425   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
426   buffer__len = strlen(buffer__); \
427   DECLARE_PARAM_STR(buffer__, buffer__len); \
428 } while(0)
429
430 #define PG_GET_STR_COL(dest, row, name) do { \
431   int colnum = PQfnumber(d->res, name); \
432   dest = NULL; \
433   if (colnum >= 0) \
434     dest = PQgetisnull(d->res, row, colnum) \
435          ? NULL : PQgetvalue(d->res, row, colnum); \
436 } while(0)
437
438 #define PG_EXEC(cmd) do { \
439   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
440                         (const char * const *)d->paramValues, \
441                         d->paramLengths, d->paramFormats, 0); \
442   d->rv = PQresultStatus(d->res); \
443   if(d->rv != PGRES_COMMAND_OK && \
444      d->rv != PGRES_TUPLES_OK) { \
445     const char *pgerr = PQresultErrorMessage(d->res); \
446     const char *pgerr_end = strchr(pgerr, '\n'); \
447     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
448     noitL(ds_err, "[%s] stratcon datasource bad (%d): %.*s\n", \
449           cq->fqdn ? cq->fqdn : "metanode", d->rv, \
450           (int)(pgerr_end - pgerr), pgerr); \
451     PQclear(d->res); \
452     goto bad_row; \
453   } \
454 } while(0)
455
456 #define PG_TM_EXEC(cmd, whence) do { \
457   time_t __w = whence; \
458   char cmdbuf[4096]; \
459   struct tm tbuf, *tm; \
460   tm = gmtime_r(&__w, &tbuf); \
461   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
462   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
463                         (const char * const *)d->paramValues, \
464                         d->paramLengths, d->paramFormats, 0); \
465   d->rv = PQresultStatus(d->res); \
466   if(d->rv != PGRES_COMMAND_OK && \
467      d->rv != PGRES_TUPLES_OK) { \
468     const char *pgerr = PQresultErrorMessage(d->res); \
469     const char *pgerr_end = strchr(pgerr, '\n'); \
470     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
471     noitL(ds_err, "stratcon datasource bad (%d): %.*s time: %llu\n", \
472           d->rv, (int)(pgerr_end - pgerr), pgerr, \
473           (long long unsigned)whence); \
474     PQclear(d->res); \
475     goto bad_row; \
476   } \
477 } while(0)
478
479 static void *
480 stratcon_datastore_check_loadall(void *vsn) {
481   storagenode_info *sn = vsn;
482   ds_single_detail *d;
483   int i, row_count = 0, good = 0;
484   char buff[1024];
485   conn_q *cq = NULL;
486
487   d = calloc(1, sizeof(*d));
488   GET_QUERY(check_loadall);
489   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
490   i = 0;
491   while(stratcon_database_connect(cq)) {
492     if(i++ > 4) {
493       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
494       release_conn_q(cq);
495       return (void *)(vpsized_int)good;
496     }
497     sleep(1);
498   }
499   PG_EXEC(check_loadall);
500   row_count = PQntuples(d->res);
501  
502   for(i=0; i<row_count; i++) {
503     int rv;
504     int8_t family;
505     struct sockaddr *sin;
506     struct sockaddr_in sin4 = { .sin_family = AF_INET };
507     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
508     char *remote, *id, *target, *module, *name;
509     PG_GET_STR_COL(remote, i, "remote_address");
510     PG_GET_STR_COL(id, i, "id");
511     PG_GET_STR_COL(target, i, "target");
512     PG_GET_STR_COL(module, i, "module");
513     PG_GET_STR_COL(name, i, "name");
514     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
515
516     family = AF_INET;
517     sin = (struct sockaddr *)&sin4;
518     rv = inet_pton(family, remote, &sin4.sin_addr);
519     if(rv != 1) {
520       family = AF_INET6;
521       sin = (struct sockaddr *)&sin6;
522       rv = inet_pton(family, remote, &sin6.sin6_addr);
523       if(rv != 1) {
524         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
525         sin = NULL;
526       }
527     }
528
529     /* stratcon_iep_line_processor takes an allocated operand and frees it */
530     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
531     good++;
532   }
533   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
534         good, row_count, sn->fqdn);
535  bad_row:
536   free_params((ds_single_detail *)d);
537   free(d);
538   if(cq) release_conn_q(cq);
539   return (void *)(vpsized_int)good;
540 }
541 static int
542 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
543                                     struct timeval *now) {
544   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
545   pthread_t *jobs = NULL;
546   int nodes, i = 0, tcnt = 0;
547   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
548   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
549
550   pthread_mutex_lock(&storagenode_to_info_cache_lock);
551   nodes = storagenode_to_info_cache.size;
552   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
553   sns = calloc(MAX(1,nodes), sizeof(*sns));
554   if(nodes == 0) sns[nodes++] = &self;
555   else {
556     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
557     const char *k;
558     void *v;
559     int klen;
560     while(noit_hash_next(&storagenode_to_info_cache,
561                          &iter, &k, &klen, &v)) {
562       sns[i++] = (storagenode_info *)v;
563     }
564   }
565   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
566
567   for(i=0; i<nodes; i++) {
568     if(pthread_create(&jobs[i], NULL,
569                       stratcon_datastore_check_loadall, sns[i]) != 0) {
570       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
571     }
572   }
573   for(i=0; i<nodes; i++) {
574     void *good;
575     pthread_join(jobs[i], &good);
576     tcnt += (int)(vpsized_int)good;
577   }
578   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
579   return 0;
580 }
581 void
582 stratcon_datastore_iep_check_preload() {
583   eventer_t e;
584   conn_pool *cpool;
585
586   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
587   e = eventer_alloc();
588   e->mask = EVENTER_ASYNCH;
589   e->callback = stratcon_datastore_asynch_drive_iep;
590   e->closure = NULL;
591   eventer_add_asynch(cpool->jobq, e);
592 }
593 execute_outcome_t
594 stratcon_datastore_find(ds_rt_detail *d) {
595   conn_q *cq;
596   char *val;
597   int row_count;
598   struct realtime_tracker *node;
599
600   for(node = d->rt; node; node = node->next) {
601     char uuid_str[UUID_STR_LEN+1];
602     const char *fqdn, *dsn, *remote_cn;
603     int storagenode_id;
604
605     uuid_unparse_lower(node->checkid, uuid_str);
606     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
607                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
608       continue;
609
610     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
611     stratcon_database_connect(cq);
612
613     GET_QUERY(check_find);
614     DECLARE_PARAM_INT(node->sid);
615     PG_EXEC(check_find);
616     row_count = PQntuples(d->res);
617     if(row_count != 1) {
618       PQclear(d->res);
619       goto bad_row;
620     }
621
622     /* Get the check uuid */
623     PG_GET_STR_COL(val, 0, "id");
624     if(!val) {
625       PQclear(d->res);
626       goto bad_row;
627     }
628     if(uuid_parse(val, node->checkid)) {
629       PQclear(d->res);
630       goto bad_row;
631     }
632  
633     /* Get the remote_address (which noit owns this) */
634     PG_GET_STR_COL(val, 0, "remote_address");
635     if(!val) {
636       PQclear(d->res);
637       goto bad_row;
638     }
639     node->noit = strdup(val);
640  
641    bad_row:
642     free_params((ds_single_detail *)d);
643     d->nparams = 0;
644     release_conn_q(cq);
645   }
646   return DS_EXEC_SUCCESS;
647 }
648 execute_outcome_t
649 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
650                            ds_line_detail *d) {
651   int type, len, sid;
652   char *final_buff;
653   uLong final_len, actual_final_len;
654   char *token;
655   char raddr_blank[1] = "";
656   const char *raddr;
657
658   type = d->data[0];
659   raddr = r ? r : raddr_blank;
660
661   /* Parse the log line, but only if we haven't already */
662   if(!d->nparams) {
663     char *scp, *ecp;
664
665     scp = d->data;
666 #define PROCESS_NEXT_FIELD(t,l) do { \
667   if(!*scp) goto bad_row; \
668   ecp = strchr(scp, '\t'); \
669   if(!ecp) goto bad_row; \
670   token = scp; \
671   len = (ecp-scp); \
672   scp = ecp + 1; \
673 } while(0)
674 #define PROCESS_LAST_FIELD(t,l) do { \
675   if(!*scp) ecp = scp; \
676   else { \
677     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
678     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
679   } \
680   t = scp; \
681   l = (ecp-scp); \
682 } while(0)
683
684     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
685     switch(type) {
686       /* See noit_check_log.c for log description */
687       case 'n':
688         DECLARE_PARAM_STR(raddr, strlen(raddr));
689         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
690         DECLARE_PARAM_STR("noitd",5); /* node_type */
691         PROCESS_NEXT_FIELD(token,len);
692         d->whence = (time_t)strtoul(token, NULL, 10);
693         DECLARE_PARAM_STR(token,len); /* timestamp */
694
695         /* This is the expected uncompressed len */
696         PROCESS_NEXT_FIELD(token,len);
697         final_len = atoi(token);
698         final_buff = malloc(final_len);
699         if(!final_buff) goto bad_row;
700  
701         /* The last token is b64 endoded and compressed.
702          * we need to decode it, declare it and then free it.
703          */
704         PROCESS_LAST_FIELD(token, len);
705         /* We can in-place decode this */
706         len = noit_b64_decode((char *)token, len,
707                               (unsigned char *)token, len);
708         if(len <= 0) {
709           noitL(noit_error, "noitd config base64 decoding error.\n");
710           free(final_buff);
711           goto bad_row;
712         }
713         actual_final_len = final_len;
714         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
715                               (unsigned char *)token, len)) {
716           noitL(noit_error, "noitd config decompression failure.\n");
717           free(final_buff);
718           goto bad_row;
719         }
720         if(final_len != actual_final_len) {
721           noitL(noit_error, "noitd config decompression error.\n");
722           free(final_buff);
723           goto bad_row;
724         }
725         DECLARE_PARAM_STR(final_buff, final_len);
726         free(final_buff);
727         break;
728       case 'C':
729         DECLARE_PARAM_STR(raddr, strlen(raddr));
730         PROCESS_NEXT_FIELD(token,len);
731         DECLARE_PARAM_STR(token,len); /* timestamp */
732         d->whence = (time_t)strtoul(token, NULL, 10);
733         PROCESS_NEXT_FIELD(token, len);
734         sid = uuid_to_sid(token, remote_cn);
735         if(sid == 0) goto bad_row;
736         DECLARE_PARAM_INT(sid); /* sid */
737         DECLARE_PARAM_STR(token,len); /* uuid */
738         PROCESS_NEXT_FIELD(token, len);
739         DECLARE_PARAM_STR(token,len); /* target */
740         PROCESS_NEXT_FIELD(token, len);
741         DECLARE_PARAM_STR(token,len); /* module */
742         PROCESS_LAST_FIELD(token, len);
743         DECLARE_PARAM_STR(token,len); /* name */
744         break;
745       case 'M':
746         PROCESS_NEXT_FIELD(token,len);
747         DECLARE_PARAM_STR(token,len); /* timestamp */
748         d->whence = (time_t)strtoul(token, NULL, 10);
749         PROCESS_NEXT_FIELD(token, len);
750         sid = uuid_to_sid(token, remote_cn);
751         if(sid == 0) goto bad_row;
752         DECLARE_PARAM_INT(sid); /* sid */
753         PROCESS_NEXT_FIELD(token, len);
754         DECLARE_PARAM_STR(token,len); /* name */
755         PROCESS_NEXT_FIELD(token,len);
756         d->metric_type = *token;
757         PROCESS_LAST_FIELD(token,len);
758         DECLARE_PARAM_STR(token,len); /* value */
759         break;
760       case 'S':
761         PROCESS_NEXT_FIELD(token,len);
762         DECLARE_PARAM_STR(token,len); /* timestamp */
763         d->whence = (time_t)strtoul(token, NULL, 10);
764         PROCESS_NEXT_FIELD(token, len);
765         sid = uuid_to_sid(token, remote_cn);
766         if(sid == 0) goto bad_row;
767         DECLARE_PARAM_INT(sid); /* sid */
768         PROCESS_NEXT_FIELD(token, len);
769         DECLARE_PARAM_STR(token,len); /* state */
770         PROCESS_NEXT_FIELD(token, len);
771         DECLARE_PARAM_STR(token,len); /* availability */
772         PROCESS_NEXT_FIELD(token, len);
773         DECLARE_PARAM_STR(token,len); /* duration */
774         PROCESS_LAST_FIELD(token,len);
775         DECLARE_PARAM_STR(token,len); /* status */
776         break;
777       default:
778         goto bad_row;
779     }
780
781   }
782
783   /* Now execute the query */
784   switch(type) {
785     case 'n':
786       GET_QUERY(config_insert);
787       PG_EXEC(config_insert);
788       PQclear(d->res);
789       break;
790     case 'C':
791       GET_QUERY(check_insert);
792       PG_TM_EXEC(check_insert, d->whence);
793       PQclear(d->res);
794       break;
795     case 'S':
796       GET_QUERY(status_insert);
797       PG_TM_EXEC(status_insert, d->whence);
798       PQclear(d->res);
799       break;
800     case 'M':
801       switch(d->metric_type) {
802         case METRIC_INT32:
803         case METRIC_UINT32:
804         case METRIC_INT64:
805         case METRIC_UINT64:
806         case METRIC_DOUBLE:
807           GET_QUERY(metric_insert_numeric);
808           PG_TM_EXEC(metric_insert_numeric, d->whence);
809           PQclear(d->res);
810           break;
811         case METRIC_STRING:
812           GET_QUERY(metric_insert_text);
813           PG_TM_EXEC(metric_insert_text, d->whence);
814           PQclear(d->res);
815           break;
816         default:
817           goto bad_row;
818       }
819       break;
820     default:
821       /* should never get here */
822       goto bad_row;
823   }
824   return DS_EXEC_SUCCESS;
825  bad_row:
826   return DS_EXEC_ROW_FAILED;
827 }
828 static int
829 stratcon_database_post_connect(conn_q *cq) {
830   int rv = 0;
831   ds_single_detail _d = { 0 }, *d = &_d;
832   if(cq->fqdn) {
833     char *remote_str, *remote_cn;
834     /* This is the silly way we get null's in through our declare_param_str */
835     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
836     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
837     /* This is a storage node, it gets the storage node post_connect */
838     GET_QUERY(storage_post_connect);
839     rv = -1; /* now we're serious */
840     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
841     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
842     PG_EXEC(storage_post_connect);
843     PQclear(d->res);
844     rv = 0;
845   }
846   else {
847     /* Metanode post_connect */
848     GET_QUERY(metanode_post_connect);
849     rv = -1; /* now we're serious */
850     PG_EXEC(metanode_post_connect);
851     PQclear(d->res);
852     rv = 0;
853   }
854  bad_row:
855   free_params(d);
856   if(rv == -1) {
857     /* Post-connect intentions are serious and fatal */
858     PQfinish(cq->dbh);
859     cq->dbh = NULL;
860   }
861   return rv;
862 }
863 static int
864 stratcon_database_connect(conn_q *cq) {
865   char *dsn, dsn_meta[512];
866   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
867   const char *k, *v;
868   int klen;
869   noit_hash_table *t;
870
871   dsn_meta[0] = '\0';
872   if(!cq->dsn) {
873     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
874     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
875       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
876       strlcat(dsn_meta, k, sizeof(dsn_meta));
877       strlcat(dsn_meta, "=", sizeof(dsn_meta));
878       strlcat(dsn_meta, v, sizeof(dsn_meta));
879     }
880     noit_hash_destroy(t, free, free);
881     free(t);
882     dsn = dsn_meta;
883   }
884   else {
885     char options[32];
886     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
887     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
888                                options, sizeof(options))) {
889       strlcat(dsn_meta, " ", sizeof(dsn_meta));
890       strlcat(dsn_meta, "user", sizeof(dsn_meta));
891       strlcat(dsn_meta, "=", sizeof(dsn_meta));
892       strlcat(dsn_meta, options, sizeof(dsn_meta));
893     }
894     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
895                                options, sizeof(options))) {
896       strlcat(dsn_meta, " ", sizeof(dsn_meta));
897       strlcat(dsn_meta, "password", sizeof(dsn_meta));
898       strlcat(dsn_meta, "=", sizeof(dsn_meta));
899       strlcat(dsn_meta, options, sizeof(dsn_meta));
900     }
901     dsn = dsn_meta;
902   }
903
904   if(cq->dbh) {
905     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
906     PQreset(cq->dbh);
907     if(PQstatus(cq->dbh) != CONNECTION_OK) {
908       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
909             dsn, PQerrorMessage(cq->dbh));
910       return -1;
911     }
912     if(stratcon_database_post_connect(cq)) return -1;
913     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
914     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
915           dsn, PQerrorMessage(cq->dbh));
916     return -1;
917   }
918
919   cq->dbh = PQconnectdb(dsn);
920   if(!cq->dbh) return -1;
921   if(PQstatus(cq->dbh) != CONNECTION_OK) {
922     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
923           dsn, PQerrorMessage(cq->dbh));
924     return -1;
925   }
926   if(stratcon_database_post_connect(cq)) return -1;
927   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
928   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
929         dsn, PQerrorMessage(cq->dbh));
930   return -1;
931 }
932 static int
933 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
934                                 const char *name) {
935   int rv = -1;
936   PGresult *res;
937   char cmd[128];
938   strlcpy(cmd, p, sizeof(cmd));
939   strlcat(cmd, name, sizeof(cmd));
940   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
941   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
942   PQclear(res);
943   return rv;
944 }
945 static int
946 stratcon_datastore_do(conn_q *cq, const char *cmd) {
947   PGresult *res;
948   int rv = -1;
949   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
950   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
951   PQclear(res);
952   return rv;
953 }
954 #define BUSTED(cq) do { \
955   PQfinish((cq)->dbh); \
956   (cq)->dbh = NULL; \
957   goto full_monty; \
958 } while(0)
959 #define SAVEPOINT(name) do { \
960   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
961   last_sp = current; \
962 } while(0)
963 #define ROLLBACK_TO_SAVEPOINT(name) do { \
964   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
965     BUSTED(cq); \
966   last_sp = NULL; \
967 } while(0)
968 #define RELEASE_SAVEPOINT(name) do { \
969   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
970     BUSTED(cq); \
971   last_sp = NULL; \
972 } while(0)
973 int
974 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
975                                  struct timeval *now) {
976   ds_rt_detail *dsjd = closure;
977   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
978   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
979
980   assert(dsjd->rt);
981   stratcon_datastore_find(dsjd);
982   if(dsjd->completion_event)
983     eventer_add(dsjd->completion_event);
984
985   free_params((ds_single_detail *)dsjd);
986   free(dsjd);
987   return 0;
988 }
989 static const char *
990 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
991   void *vinfo;
992   const char *dsn = NULL, *fqdn = NULL;
993   int found = 0;
994   storagenode_info *info = NULL;
995   pthread_mutex_lock(&storagenode_to_info_cache_lock);
996   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
997                         &vinfo)) {
998     found = 1;
999     info = vinfo;
1000   }
1001   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1002   if(found) {
1003     if(fqdn_out) *fqdn_out = info->fqdn;
1004     return info->dsn;
1005   }
1006
1007   if(!found && can_use_db) {
1008     ds_single_detail *d;
1009     conn_q *cq;
1010     int row_count;
1011     /* Look it up and store it */
1012     d = calloc(1, sizeof(*d));
1013     cq = get_conn_q_for_metanode();
1014     GET_QUERY(find_storage);
1015     DECLARE_PARAM_INT(id);
1016     PG_EXEC(find_storage);
1017     row_count = PQntuples(d->res);
1018     if(row_count) {
1019       PG_GET_STR_COL(dsn, 0, "dsn");
1020       PG_GET_STR_COL(fqdn, 0, "fqdn");
1021     }
1022     PQclear(d->res);
1023    bad_row:
1024     free_params(d);
1025     free(d);
1026     release_conn_q(cq);
1027   }
1028   if(fqdn) {
1029     info = calloc(1, sizeof(*info));
1030     info->fqdn = strdup(fqdn);
1031     if(fqdn_out) *fqdn_out = info->fqdn;
1032     info->dsn = dsn ? strdup(dsn) : NULL;
1033     info->storagenode_id = id;
1034     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1035     noit_hash_store(&storagenode_to_info_cache,
1036                     (void *)&info->storagenode_id, sizeof(int), info);
1037     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1038   }
1039   return info ? info->dsn : NULL;
1040 }
1041 static ds_line_detail *
1042 build_insert_batch(interim_journal_t *ij) {
1043   int rv;
1044   off_t len;
1045   const char *buff, *cp, *lcp;
1046   struct stat st;
1047   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1048
1049   if(ij->fd < 0) {
1050     ij->fd = open(ij->filename, O_RDONLY);
1051     if(ij->fd < 0) {
1052       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1053             ij->filename, strerror(errno));
1054       assert(ij->fd >= 0);
1055     }
1056   }
1057   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1058   if(rv == -1) {
1059       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1060             ij->filename, strerror(errno));
1061     assert(rv != -1);
1062   }
1063   len = st.st_size;
1064   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1065   if(buff == (void *)-1) {
1066     noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1067           ij->filename, strerror(errno));
1068     assert(buff != (void *)-1);
1069   }
1070   lcp = buff;
1071   while(lcp < (buff + len) &&
1072         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1073     next = calloc(1, sizeof(*next));
1074     next->data = malloc(cp - lcp + 1);
1075     memcpy(next->data, lcp, cp - lcp);
1076     next->data[cp - lcp] = '\0';
1077     if(!head) head = next;
1078     if(last) last->next = next;
1079     last = next;
1080     lcp = cp + 1;
1081   }
1082   munmap((void *)buff, len);
1083   close(ij->fd);
1084   return head;
1085 }
1086 static void
1087 interim_journal_remove(interim_journal_t *ij) {
1088   unlink(ij->filename);
1089   if(ij->filename) free(ij->filename);
1090   if(ij->remote_str) free(ij->remote_str);
1091   if(ij->remote_cn) free(ij->remote_cn);
1092   if(ij->fqdn) free(ij->fqdn);
1093 }
1094 int
1095 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1096                                   struct timeval *now) {
1097   int i, total, success, sp_total, sp_success;
1098   interim_journal_t *ij;
1099   ds_line_detail *head = NULL, *current, *last_sp;
1100   const char *dsn;
1101   conn_q *cq;
1102   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1103   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1104
1105   ij = closure;
1106   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1107   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1108   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1109                              ij->fqdn, dsn);
1110   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1111         ij->remote_str, ij->remote_cn, ij->fqdn);
1112  full_monty:
1113   /* Make sure we have a connection */
1114   i = 1;
1115   while(stratcon_database_connect(cq)) {
1116     noitL(noit_error, "Error connecting to database: %s\n",
1117           ij->fqdn ? ij->fqdn : "(null)");
1118     sleep(i);
1119     i *= 2;
1120     i = MIN(i, 16);
1121   }
1122
1123   if(head == NULL) head = build_insert_batch(ij);
1124   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1125         ij->remote_str ? ij->remote_str : "(null)",
1126         ij->remote_cn ? ij->remote_cn : "(null)",
1127         ij->fqdn ? ij->fqdn : "(null)");
1128   current = head;
1129   last_sp = NULL;
1130   total = success = sp_total = sp_success = 0;
1131   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1132   while(current) {
1133     execute_outcome_t rv;
1134     if(current->data) {
1135       if(!last_sp) {
1136         SAVEPOINT("batch");
1137         sp_success = success;
1138         sp_total = total;
1139       }
1140  
1141       if(current->problematic) {
1142         RELEASE_SAVEPOINT("batch");
1143         current = current->next;
1144         total++;
1145         continue;
1146       }
1147       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1148                                       current);
1149       switch(rv) {
1150         case DS_EXEC_SUCCESS:
1151           total++;
1152           success++;
1153           current = current->next;
1154           break;
1155         case DS_EXEC_ROW_FAILED:
1156           /* rollback to savepoint, mark this record as bad and start again */
1157           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1158           current->problematic = 1;
1159           current = last_sp;
1160           success = sp_success;
1161           total = sp_total;
1162           ROLLBACK_TO_SAVEPOINT("batch");
1163           break;
1164         case DS_EXEC_TXN_FAILED:
1165           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1166           BUSTED(cq);
1167       }
1168     }
1169   }
1170   if(last_sp) RELEASE_SAVEPOINT("batch");
1171   if(stratcon_datastore_do(cq, "COMMIT")) {
1172     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1173     BUSTED(cq);
1174   }
1175   /* Cleanup the mess */
1176   while(head) {
1177     ds_line_detail *tofree;
1178     tofree = head;
1179     head = head->next;
1180     if(tofree->data) free(tofree->data);
1181     free_params((ds_single_detail *)tofree);
1182     free(tofree);
1183   }
1184   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1185         ij->remote_str ? ij->remote_str : "(null)",
1186         ij->remote_cn ? ij->remote_cn : "(null)",
1187         ij->fqdn ? ij->fqdn : "(null)", success, total);
1188   interim_journal_remove(ij);
1189   release_conn_q(cq);
1190   return 0;
1191 }
1192 static int
1193 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1194                                 struct timeval *now) {
1195   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1196   const char *k;
1197   int klen;
1198   void *vij;
1199   interim_journal_t *ij;
1200   syncset_t *syncset = closure;
1201
1202   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1203     eventer_add(syncset->completion);
1204     eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1205     free(syncset);
1206     return 0;
1207   }
1208   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1209
1210   noitL(ds_deb, "Syncing journal sets...\n");
1211   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1212     eventer_t ingest;
1213     ij = vij;
1214     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1215           ij->remote_str, ij->remote_cn, ij->fqdn);
1216     fsync(ij->fd);
1217     close(ij->fd);
1218     ij->fd = -1;
1219     ingest = eventer_alloc();
1220     ingest->mask = EVENTER_ASYNCH;
1221     ingest->callback = stratcon_datastore_asynch_execute;
1222     ingest->closure = ij;
1223     eventer_add_asynch(ij->cpool->jobq, ingest);
1224   }
1225   noit_hash_destroy(syncset->ws, free, NULL);
1226   free(syncset->ws);
1227   return 0;
1228 }
1229 static interim_journal_t *
1230 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1231                     int storagenode_id, const char *fqdn_in) {
1232   void *vhash, *vij;
1233   noit_hash_table *working_set;
1234   interim_journal_t *ij;
1235   struct timeval now;
1236   char jpath[PATH_MAX];
1237   char remote_str[128];
1238   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1239   const char *fqdn = fqdn_in ? fqdn_in : "default";
1240
1241   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1242   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1243
1244   /* Lookup the working set */
1245   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1246     working_set = calloc(1, sizeof(*working_set));
1247     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1248                     working_set);
1249   }
1250   else
1251     working_set = vhash;
1252
1253   /* Lookup the interim journal within the working set */
1254   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1255     ij = calloc(1, sizeof(*ij));
1256     gettimeofday(&now, NULL);
1257     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1258              basejpath, remote_str, remote_cn, storagenode_id,
1259              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1260     ij->remote_str = strdup(remote_str);
1261     ij->remote_cn = strdup(remote_cn);
1262     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1263     ij->storagenode_id = storagenode_id;
1264     ij->filename = strdup(jpath);
1265     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1266                                          ij->fqdn);
1267     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1268     if(ij->fd < 0 && errno == ENOENT) {
1269       if(mkdir_for_file(ij->filename, 0750)) {
1270         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1271               ij->filename, strerror(errno));
1272         exit(-1);
1273       }
1274       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1275     }
1276     if(ij->fd < 0) {
1277       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1278             ij->filename, strerror(errno));
1279       exit(-1);
1280     }
1281     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1282   }
1283   else
1284     ij = vij;
1285
1286   return ij;
1287 }
1288 static int
1289 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1290                           int *sid_out, int *storagenode_id_out,
1291                           const char **remote_cn_out,
1292                           const char **fqdn_out, const char **dsn_out) {
1293   /* only called from the main thread -- no safety issues */
1294   void *vuuidinfo, *vinfo;
1295   uuid_info *uuidinfo;
1296   storagenode_info *info = NULL;
1297   char *fqdn = NULL;
1298   char *dsn = NULL;
1299   int storagenode_id = 0, sid = 0;
1300   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1301                          &vuuidinfo)) {
1302     int row_count = 0;
1303     char *tmpint;
1304     ds_single_detail *d;
1305     conn_q *cq;
1306
1307     /* We can't do a database lookup without the remote_cn */
1308     if(!remote_cn) return -1;
1309
1310     d = calloc(1, sizeof(*d));
1311     cq = get_conn_q_for_metanode();
1312     if(stratcon_database_connect(cq) == 0) {
1313       /* Blocking call to service the cache miss */
1314       GET_QUERY(check_map);
1315       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1316       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1317       PG_EXEC(check_map);
1318       row_count = PQntuples(d->res);
1319       if(row_count != 1) {
1320         PQclear(d->res);
1321         goto bad_row;
1322       }
1323       PG_GET_STR_COL(tmpint, 0, "sid");
1324       sid = atoi(tmpint);
1325       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1326       if(tmpint) storagenode_id = atoi(tmpint);
1327       PG_GET_STR_COL(fqdn, 0, "fqdn");
1328       PG_GET_STR_COL(dsn, 0, "dsn");
1329       PQclear(d->res);
1330     }
1331    bad_row:
1332     free_params((ds_single_detail *)d);
1333     free(d);
1334     release_conn_q(cq);
1335     if(row_count != 1) return -1;
1336     /* Place in cache */
1337     if(fqdn) fqdn = strdup(fqdn);
1338     uuidinfo = calloc(1, sizeof(*uuidinfo));
1339     uuidinfo->sid = sid;
1340     uuidinfo->uuid_str = strdup(uuid_str);
1341     uuidinfo->storagenode_id = storagenode_id;
1342     uuidinfo->remote_cn = strdup(remote_cn);
1343     noit_hash_store(&uuid_to_info_cache,
1344                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1345     /* Also, we may have just witnessed a new storage node, store it */
1346     if(storagenode_id) {
1347       int needs_free = 0;
1348       info = calloc(1, sizeof(*info));
1349       info->storagenode_id = storagenode_id;
1350       info->dsn = dsn ? strdup(dsn) : NULL;
1351       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1352       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1353       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1354                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1355         /* hack to save memory -- we *never* remove from these caches,
1356            so we can use the same fqdn value in the above cache for the key
1357            in the cache below -- (no strdup) */
1358         noit_hash_store(&storagenode_to_info_cache,
1359                         (void *)&info->storagenode_id, sizeof(int), info);
1360       }
1361       else needs_free = 1;
1362       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1363       if(needs_free) {
1364         if(info->dsn) free(info->dsn);
1365         if(info->fqdn) free(info->fqdn);
1366         free(info);
1367       }
1368     }
1369   }
1370   else
1371     uuidinfo = vuuidinfo;
1372
1373   if(uuidinfo && uuidinfo->storagenode_id) {
1374     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1375       /* we don't have dsn and we actually want it */
1376       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1377       if(noit_hash_retrieve(&storagenode_to_info_cache,
1378                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1379                             &vinfo))
1380         info = vinfo;
1381       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1382     }
1383   }
1384
1385   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1386   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1387   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1388   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1389   if(sid_out) *sid_out = uuidinfo->sid;
1390   return 0;
1391 }
1392 static int
1393 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1394   char uuid_str[UUID_STR_LEN+1];
1395   int sid = 0;
1396   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1397   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1398   return sid;
1399 }
1400 static void
1401 stratcon_datastore_journal(struct sockaddr *remote,
1402                            const char *remote_cn, const char *line) {
1403   interim_journal_t *ij = NULL;
1404   char uuid_str[UUID_STR_LEN+1], *cp;
1405   const char *fqdn = NULL, *dsn = NULL;
1406   int storagenode_id = 0;
1407   uuid_t checkid;
1408   if(!line) return;
1409   /* if it is a UUID based thing, find the storage node */
1410   switch(*line) {
1411     case 'C':
1412     case 'S':
1413     case 'M':
1414       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1415         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1416         if(!uuid_parse(uuid_str, checkid)) {
1417           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1418                                     &storagenode_id, NULL, &fqdn, &dsn);
1419           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1420         }
1421       }
1422       break;
1423     case 'n':
1424       ij = interim_journal_get(remote,remote_cn,0,NULL);
1425       break;
1426     default:
1427       break;
1428   }
1429   if(!ij) {
1430     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1431   }
1432   else {
1433     int len;
1434     len = write(ij->fd, line, strlen(line));
1435     if(len < 0) {
1436       noitL(noit_error, "write to %s failed: %s\n",
1437             ij->filename, strerror(errno));
1438     }
1439   }
1440   return;
1441 }
1442 static noit_hash_table *
1443 stratcon_datastore_journal_remove(struct sockaddr *remote,
1444                                   const char *remote_cn) {
1445   void *vhash = NULL;
1446   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1447     /* pluck it out */
1448     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1449   }
1450   else {
1451     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1452           remote_cn);
1453     abort();
1454   }
1455   return vhash;
1456 }
1457 void
1458 stratcon_datastore_push(stratcon_datastore_op_t op,
1459                         struct sockaddr *remote,
1460                         const char *remote_cn, void *operand,
1461                         eventer_t completion) {
1462   conn_pool *cpool;
1463   syncset_t *syncset;
1464   eventer_t e;
1465   ds_rt_detail *rtdetail;
1466   struct datastore_onlooker_list *nnode;
1467
1468   for(nnode = onlookers; nnode; nnode = nnode->next)
1469     nnode->dispatch(op,remote,remote_cn,operand);
1470
1471   switch(op) {
1472     case DS_OP_INSERT:
1473       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1474       break;
1475     case DS_OP_CHKPT:
1476       e = eventer_alloc();
1477       syncset = calloc(1, sizeof(*syncset));
1478       e->mask = EVENTER_ASYNCH;
1479       e->callback = stratcon_datastore_journal_sync;
1480       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1481       syncset->completion = completion;
1482       e->closure = syncset;
1483       eventer_add(e);
1484       break;
1485     case DS_OP_FIND_COMPLETE:
1486       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1487       rtdetail = calloc(1, sizeof(*rtdetail));
1488       rtdetail->rt = operand;
1489       rtdetail->completion_event = completion;
1490       e = eventer_alloc();
1491       e->mask = EVENTER_ASYNCH;
1492       e->callback = stratcon_datastore_asynch_lookup;
1493       e->closure = rtdetail;
1494       eventer_add_asynch(cpool->jobq, e);
1495       break;
1496   }
1497 }
1498
1499 int
1500 stratcon_datastore_saveconfig(void *unused) {
1501   int rv = -1;
1502   char *buff;
1503   ds_single_detail _d = { 0 }, *d = &_d;
1504   conn_q *cq;
1505   char ipv4_str[32];
1506   struct in_addr r, l;
1507
1508   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1509   memset(&l, 0, sizeof(l));
1510   noit_getip_ipv4(r, &l);
1511   /* Ignore the error.. what are we going to do anyway */
1512   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1513     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1514
1515   cq = get_conn_q_for_metanode();
1516
1517   if(stratcon_database_connect(cq) == 0) {
1518     char time_as_str[20];
1519     size_t len;
1520     buff = noit_conf_xml_in_mem(&len);
1521     if(!buff) goto bad_row;
1522
1523     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1524     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1525     DECLARE_PARAM_STR("", 0);
1526     DECLARE_PARAM_STR("stratcond", 9);
1527     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1528     DECLARE_PARAM_STR(buff, len);
1529     free(buff);
1530
1531     GET_QUERY(config_insert);
1532     PG_EXEC(config_insert);
1533     PQclear(d->res);
1534     rv = 0;
1535
1536     bad_row:
1537       free_params(d);
1538   }
1539   release_conn_q(cq);
1540   return rv;
1541 }
1542
1543 void
1544 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1545                                                struct sockaddr *,
1546                                                const char *, void *)) {
1547   struct datastore_onlooker_list *nnode;
1548   nnode = calloc(1, sizeof(*nnode));
1549   nnode->dispatch = f;
1550   nnode->next = onlookers;
1551   while(noit_atomic_casptr((volatile void **)&onlookers,
1552                            nnode, nnode->next) != (void *)nnode->next)
1553     nnode->next = onlookers;
1554 }
1555 static void
1556 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1557                                          char *id_str, char *file) {
1558   char path[PATH_MAX];
1559   interim_journal_t *ij;
1560   eventer_t ingest;
1561
1562   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1563            basejpath, remote_str, remote_cn, id_str, file);
1564   ij = calloc(1, sizeof(*ij));
1565   ij->fd = open(path, O_RDONLY);
1566   if(ij->fd < 0) {
1567     noitL(noit_error, "cannot open journal '%s': %s\n",
1568           path, strerror(errno));
1569     free(ij);
1570     return;
1571   }
1572   close(ij->fd);
1573   ij->fd = -1;
1574   ij->filename = strdup(path);
1575   ij->remote_str = strdup(remote_str);
1576   ij->remote_cn = strdup(remote_cn);
1577   ij->storagenode_id = atoi(id_str);
1578   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1579                                        ij->fqdn);
1580   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1581   ingest = eventer_alloc();
1582   ingest->mask = EVENTER_ASYNCH;
1583   ingest->callback = stratcon_datastore_asynch_execute;
1584   ingest->closure = ij;
1585   eventer_add_asynch(ij->cpool->jobq, ingest);
1586 }
1587 static void
1588 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1589   char path[PATH_MAX];
1590   DIR *root;
1591   struct dirent *de, *entry;
1592   int i = 0, cnt = 0;
1593   char **entries;
1594   int size = 0;
1595
1596   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1597            first ? "/" : "", first ? first : "",
1598            second ? "/" : "", second ? second : "",
1599            third ? "/" : "", third ? third : "");
1600 #ifdef _PC_NAME_MAX
1601   size = pathconf(path, _PC_NAME_MAX);
1602 #endif
1603   size = MIN(size, PATH_MAX + 128);
1604   de = alloca(size);
1605   root = opendir(path);
1606   if(!root) return;
1607   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) cnt++;
1608   closedir(root);
1609   root = opendir(path);
1610   if(!root) return;
1611   entries = malloc(sizeof(*entries) * cnt);
1612   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) {
1613     if(i < cnt) {
1614       entries[i++] = strdup(entry->d_name);
1615     }
1616   }
1617   closedir(root);
1618   cnt = i; /* could have changed, directories are fickle */
1619   qsort(entries, i, sizeof(*entries),
1620         (int (*)(const void *, const void *))strcasecmp);
1621   for(i=0; i<cnt; i++) {
1622     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1623     noitL(ds_deb, "Processing L%d entry '%s'\n",
1624           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1625     if(!first)
1626       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1627     else if(!second)
1628       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1629     else if(!third)
1630       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1631     else if(strlen(entries[i]) == 16)
1632       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1633   }
1634 }
1635 static void
1636 stratcon_datastore_sweep_journals() {
1637   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1638 }
1639
1640 int
1641 stratcon_datastore_ingest_all_storagenode_info() {
1642   int i, cnt = 0;
1643   ds_single_detail _d = { 0 }, *d = &_d;
1644   conn_q *cq;
1645   cq = get_conn_q_for_metanode();
1646
1647   while(stratcon_database_connect(cq)) {
1648     noitL(noit_error, "Error connecting to database\n");
1649     sleep(1);
1650   }
1651
1652   GET_QUERY(all_storage);
1653   PG_EXEC(all_storage);
1654   cnt = PQntuples(d->res);
1655   for(i=0; i<cnt; i++) {
1656     void *vinfo;
1657     char *tmpint, *fqdn, *dsn;
1658     int storagenode_id;
1659     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1660     storagenode_id = atoi(tmpint);
1661     PG_GET_STR_COL(fqdn, i, "fqdn");
1662     PG_GET_STR_COL(dsn, i, "dsn");
1663     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1664     storagenode_id = tmpint ? atoi(tmpint) : 0;
1665
1666     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1667                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1668       storagenode_info *info;
1669       info = calloc(1, sizeof(*info));
1670       info->storagenode_id = storagenode_id;
1671       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1672       info->dsn = dsn ? strdup(dsn) : NULL;
1673       noit_hash_store(&storagenode_to_info_cache,
1674                       (void *)&info->storagenode_id, sizeof(int), info);
1675       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1676             info->storagenode_id,
1677             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1678     }
1679   }
1680   PQclear(d->res);
1681  bad_row:
1682   free_params(d);
1683
1684   release_conn_q(cq);
1685   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1686   return cnt;
1687 }
1688 int
1689 stratcon_datastore_ingest_all_check_info() {
1690   int i, cnt, loaded = 0;
1691   ds_single_detail _d = { 0 }, *d = &_d;
1692   conn_q *cq;
1693   cq = get_conn_q_for_metanode();
1694
1695   while(stratcon_database_connect(cq)) {
1696     noitL(noit_error, "Error connecting to database\n");
1697     sleep(1);
1698   }
1699
1700   GET_QUERY(check_mapall);
1701   PG_EXEC(check_mapall);
1702   cnt = PQntuples(d->res);
1703   for(i=0; i<cnt; i++) {
1704     void *vinfo;
1705     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1706     int sid, storagenode_id;
1707     uuid_info *uuidinfo;
1708     PG_GET_STR_COL(uuid_str, i, "id");
1709     if(!uuid_str) continue;
1710     PG_GET_STR_COL(tmpint, i, "sid");
1711     if(!tmpint) continue;
1712     sid = atoi(tmpint);
1713     PG_GET_STR_COL(fqdn, i, "fqdn");
1714     PG_GET_STR_COL(dsn, i, "dsn");
1715     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1716     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1717     storagenode_id = tmpint ? atoi(tmpint) : 0;
1718
1719     uuidinfo = calloc(1, sizeof(*uuidinfo));
1720     uuidinfo->uuid_str = strdup(uuid_str);
1721     uuidinfo->remote_cn = strdup(remote_cn);
1722     uuidinfo->storagenode_id = storagenode_id;
1723     uuidinfo->sid = sid;
1724     noit_hash_store(&uuid_to_info_cache,
1725                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1726     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1727           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1728     loaded++;
1729     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1730                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1731       storagenode_info *info;
1732       info = calloc(1, sizeof(*info));
1733       info->storagenode_id = storagenode_id;
1734       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1735       info->dsn = dsn ? strdup(dsn) : NULL;
1736       noit_hash_store(&storagenode_to_info_cache,
1737                       (void *)&info->storagenode_id, sizeof(int), info);
1738       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1739             info->storagenode_id,
1740             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1741     }
1742   }
1743   PQclear(d->res);
1744  bad_row:
1745   free_params(d);
1746
1747   release_conn_q(cq);
1748   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1749   return loaded;
1750 }
1751
1752 static int
1753 rest_get_noit_config(noit_http_rest_closure_t *restc,
1754                      int npats, char **pats) {
1755   noit_http_session_ctx *ctx = restc->http_ctx;
1756   ds_single_detail *d;
1757   int row_count = 0;
1758   const char *xml = NULL;
1759   conn_q *cq = NULL;
1760
1761   if(npats != 0) {
1762     noit_http_response_server_error(ctx, "text/xml");
1763     noit_http_response_end(ctx);
1764     return 0;
1765   }
1766   d = calloc(1, sizeof(*d));
1767   GET_QUERY(config_get);
1768   cq = get_conn_q_for_metanode();
1769   if(!cq) {
1770     noit_http_response_server_error(ctx, "text/xml");
1771     goto bad_row;
1772   }
1773
1774   DECLARE_PARAM_STR(restc->remote_cn,
1775                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1776   PG_EXEC(config_get);
1777   row_count = PQntuples(d->res);
1778   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1779
1780   if(xml == NULL) {
1781     char buff[1024];
1782     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1783                                  "<row_count>%d</row_count></error>\n",
1784              restc->remote_cn, row_count);
1785     noit_http_response_append(ctx, buff, strlen(buff));
1786     noit_http_response_not_found(ctx, "text/xml");
1787   }
1788   else {
1789     noit_http_response_append(ctx, xml, strlen(xml));
1790     noit_http_response_ok(ctx, "text/xml");
1791   }
1792  bad_row:
1793   free_params((ds_single_detail *)d);
1794   d->nparams = 0;
1795   if(cq) release_conn_q(cq);
1796
1797   noit_http_response_end(ctx);
1798   return 0;
1799 }
1800
1801 void
1802 stratcon_datastore_init() {
1803   pthread_mutex_init(&ds_conns_lock, NULL);
1804   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1805   ds_err = noit_log_stream_find("error/datastore");
1806   ds_deb = noit_log_stream_find("debug/datastore");
1807   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1808   ingest_err = noit_log_stream_find("error/ingest");
1809   if(!ds_err) ds_err = noit_error;
1810   if(!ingest_err) ingest_err = noit_error;
1811   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1812                            &basejpath)) {
1813     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1814     exit(-1);
1815   }
1816   stratcon_datastore_ingest_all_check_info();
1817   stratcon_datastore_ingest_all_storagenode_info();
1818   stratcon_datastore_sweep_journals();
1819
1820   assert(noit_http_rest_register_auth(
1821     "GET", "/noits/", "^config$", rest_get_noit_config,
1822              noit_http_rest_client_cert_auth
1823   ) == 0);
1824 }
1825
Note: See TracBrowser for help on using the browser.