root/src/stratcon_datastore.c

Revision 571cfa4515ce4e86f586ced20ed6607d6ebf43c1, 57.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 8 years ago)

protect against null completion, refs #322

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 /* Thread-safe connection pools */
209
210 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
211 static void
212 release_conn_q_forceable(conn_q *cq, int forcefree) {
213   int putback = 0;
214   cq->last_use = time(NULL);
215   pthread_mutex_lock(&cq->pool->lock);
216   cq->pool->outstanding--;
217   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
218     putback = 1;
219     cq->next = cq->pool->head;
220     cq->pool->head = cq;
221     cq->pool->in_pool++;
222   }
223   pthread_mutex_unlock(&cq->pool->lock);
224   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
225         putback ? "to pool" : "and destroy", cq->pool->queue_name);
226   pthread_cond_signal(&cq->pool->cv);
227   if(putback) return;
228
229   /* Not put back, release it */
230   if(cq->dbh) PQfinish(cq->dbh);
231   if(cq->remote_str) free(cq->remote_str);
232   if(cq->remote_cn) free(cq->remote_cn);
233   if(cq->fqdn) free(cq->fqdn);
234   if(cq->dsn) free(cq->dsn);
235   free(cq);
236 }
237 static void
238 ttl_purge_conn_pool(conn_pool *pool) {
239   int old_cnt, new_cnt;
240   time_t now = time(NULL);
241   conn_q *cq, *prev = NULL, *iter;
242   /* because we always replace on the head and update the last_use time when
243      doing so, we know they are ordered LRU on the end.  So, once we hit an
244      old one, we know all the others are old too.
245    */
246   if(!pool->head) return; /* hack short circuit for no locks */
247   pthread_mutex_lock(&pool->lock);
248   old_cnt = pool->in_pool;
249   cq = pool->head;
250   while(cq) {
251     if(cq->last_use + cq->pool->ttl < now) {
252       if(prev) prev->next = NULL;
253       else pool->head = NULL;
254       break;
255     }
256     prev = cq;
257     cq = cq->next;
258   }
259   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
260   /* Fix accounting */
261   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
262   new_cnt = pool->in_pool;
263   pthread_mutex_unlock(&pool->lock);
264
265   /* Force release these without holding the lock */
266   while(cq) {
267     cq = cq->next;
268     release_conn_q_forceable(cq, 1);
269   }
270   if(old_cnt != new_cnt)
271     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
272           pool->queue_name);
273 }
274 static void
275 release_conn_q(conn_q *cq) {
276   ttl_purge_conn_pool(cq->pool);
277   release_conn_q_forceable(cq, 0);
278 }
279 static conn_pool *
280 get_conn_pool_for_remote(const char *remote_str,
281                          const char *remote_cn, const char *fqdn) {
282   void *vcpool;
283   conn_pool *cpool = NULL;
284   char queue_name[256] = "datastore_";
285   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
286            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
287            fqdn ? fqdn : "default",
288            remote_cn ? remote_cn : "default");
289   pthread_mutex_lock(&ds_conns_lock);
290   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
291                         strlen(queue_name), &vcpool))
292     cpool = vcpool;
293   pthread_mutex_unlock(&ds_conns_lock);
294   if(!cpool) {
295     vcpool = cpool = calloc(1, sizeof(*cpool));
296     cpool->queue_name = strdup(queue_name);
297     pthread_mutex_init(&cpool->lock, NULL);
298     pthread_cond_init(&cpool->cv, NULL);
299     cpool->in_pool = 0;
300     cpool->outstanding = 0;
301     cpool->max_in_pool = 1;
302     cpool->max_allocated = 1;
303     pthread_mutex_lock(&ds_conns_lock);
304     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
305                         cpool)) {
306       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
307                          strlen(queue_name), &vcpool);
308     }
309     pthread_mutex_unlock(&ds_conns_lock);
310     if(vcpool != cpool) {
311       /* someone beat us to it */
312       free(cpool->queue_name);
313       pthread_mutex_destroy(&cpool->lock);
314       pthread_cond_destroy(&cpool->cv);
315       free(cpool);
316     }
317     else {
318       int i;
319       /* Our job to setup the pool */
320       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
321       eventer_jobq_init(cpool->jobq, queue_name);
322       cpool->jobq->backq = eventer_default_backq();
323       /* Add one thread */
324       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
325         eventer_jobq_increase_concurrency(cpool->jobq);
326     }
327     cpool = vcpool;
328   }
329   return cpool;
330 }
331 static conn_q *
332 get_conn_q_for_remote(const char *remote_str,
333                       const char *remote_cn, const char *fqdn,
334                       const char *dsn) {
335   conn_pool *cpool;
336   conn_q *cq;
337   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
338   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
339         cpool->queue_name);
340   pthread_mutex_lock(&cpool->lock);
341  again:
342   if(cpool->head) {
343     assert(cpool->in_pool > 0);
344     cq = cpool->head;
345     cpool->head = cq->next;
346     cpool->in_pool--;
347     cpool->outstanding++;
348     cq->next = NULL;
349     pthread_mutex_unlock(&cpool->lock);
350     return cq;
351   }
352   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
353     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
354           (void *)pthread_self(), cpool->queue_name);
355     pthread_cond_wait(&cpool->cv, &cpool->lock);
356     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
357           (void *)pthread_self(), cpool->queue_name);
358     goto again;
359   }
360   else {
361     cpool->outstanding++;
362     pthread_mutex_unlock(&cpool->lock);
363   }
364  
365   cq = calloc(1, sizeof(*cq));
366   cq->pool = cpool;
367   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
368   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
369   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
370   cq->dsn = dsn ? strdup(dsn) : NULL;
371   return cq;
372 }
373 static conn_q *
374 get_conn_q_for_metanode() {
375   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
376 }
377
378 typedef enum {
379   DS_EXEC_SUCCESS = 0,
380   DS_EXEC_ROW_FAILED = 1,
381   DS_EXEC_TXN_FAILED = 2,
382 } execute_outcome_t;
383
384 #define DECLARE_PARAM_STR(str, len) do { \
385   d->paramValues[d->nparams] = noit__strndup(str, len); \
386   d->paramLengths[d->nparams] = len; \
387   d->paramFormats[d->nparams] = 0; \
388   d->paramAllocd[d->nparams] = 1; \
389   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
390     free(d->paramValues[d->nparams]); \
391     d->paramValues[d->nparams] = NULL; \
392     d->paramLengths[d->nparams] = 0; \
393     d->paramAllocd[d->nparams] = 0; \
394   } \
395   d->nparams++; \
396 } while(0)
397 #define DECLARE_PARAM_INT(i) do { \
398   int buffer__len; \
399   char buffer__[32]; \
400   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
401   buffer__len = strlen(buffer__); \
402   DECLARE_PARAM_STR(buffer__, buffer__len); \
403 } while(0)
404
405 #define PG_GET_STR_COL(dest, row, name) do { \
406   int colnum = PQfnumber(d->res, name); \
407   dest = NULL; \
408   if (colnum >= 0) \
409     dest = PQgetisnull(d->res, row, colnum) \
410          ? NULL : PQgetvalue(d->res, row, colnum); \
411 } while(0)
412
413 #define PG_EXEC(cmd) do { \
414   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
415                         (const char * const *)d->paramValues, \
416                         d->paramLengths, d->paramFormats, 0); \
417   d->rv = PQresultStatus(d->res); \
418   if(d->rv != PGRES_COMMAND_OK && \
419      d->rv != PGRES_TUPLES_OK) { \
420     const char *pgerr = PQresultErrorMessage(d->res); \
421     const char *pgerr_end = strchr(pgerr, '\n'); \
422     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
423     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
424           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
425           (int)(pgerr_end - pgerr), pgerr); \
426     PQclear(d->res); \
427     goto bad_row; \
428   } \
429 } while(0)
430
431 #define PG_TM_EXEC(cmd, whence) do { \
432   time_t __w = whence; \
433   char cmdbuf[4096]; \
434   struct tm tbuf, *tm; \
435   tm = gmtime_r(&__w, &tbuf); \
436   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
437   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
438                         (const char * const *)d->paramValues, \
439                         d->paramLengths, d->paramFormats, 0); \
440   d->rv = PQresultStatus(d->res); \
441   if(d->rv != PGRES_COMMAND_OK && \
442      d->rv != PGRES_TUPLES_OK) { \
443     const char *pgerr = PQresultErrorMessage(d->res); \
444     const char *pgerr_end = strchr(pgerr, '\n'); \
445     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
446     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
447           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
448           (long long unsigned)whence); \
449     PQclear(d->res); \
450     goto bad_row; \
451   } \
452 } while(0)
453
454 static void *
455 stratcon_datastore_check_loadall(void *vsn) {
456   storagenode_info *sn = vsn;
457   ds_single_detail *d;
458   int i, row_count = 0, good = 0;
459   char buff[1024];
460   conn_q *cq = NULL;
461
462   d = calloc(1, sizeof(*d));
463   GET_QUERY(check_loadall);
464   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
465   i = 0;
466   while(stratcon_database_connect(cq)) {
467     if(i++ > 4) {
468       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
469       release_conn_q(cq);
470       return (void *)(vpsized_int)good;
471     }
472     sleep(1);
473   }
474   PG_EXEC(check_loadall);
475   row_count = PQntuples(d->res);
476  
477   for(i=0; i<row_count; i++) {
478     int rv;
479     int8_t family;
480     struct sockaddr *sin;
481     struct sockaddr_in sin4 = { .sin_family = AF_INET };
482     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
483     char *remote, *id, *target, *module, *name;
484     PG_GET_STR_COL(remote, i, "remote_address");
485     PG_GET_STR_COL(id, i, "id");
486     PG_GET_STR_COL(target, i, "target");
487     PG_GET_STR_COL(module, i, "module");
488     PG_GET_STR_COL(name, i, "name");
489     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
490
491     family = AF_INET;
492     sin = (struct sockaddr *)&sin4;
493     rv = inet_pton(family, remote, &sin4.sin_addr);
494     if(rv != 1) {
495       family = AF_INET6;
496       sin = (struct sockaddr *)&sin6;
497       rv = inet_pton(family, remote, &sin6.sin6_addr);
498       if(rv != 1) {
499         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
500         sin = NULL;
501       }
502     }
503
504     /* stratcon_iep_line_processor takes an allocated operand and frees it */
505     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
506     good++;
507   }
508   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
509         good, row_count, sn->fqdn);
510  bad_row:
511   free_params((ds_single_detail *)d);
512   free(d);
513   if(cq) release_conn_q(cq);
514   return (void *)(vpsized_int)good;
515 }
516 static int
517 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
518                                     struct timeval *now) {
519   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
520   pthread_t *jobs = NULL;
521   int nodes, i = 0, tcnt = 0;
522   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
523   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
524
525   pthread_mutex_lock(&storagenode_to_info_cache_lock);
526   nodes = storagenode_to_info_cache.size;
527   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
528   sns = calloc(MAX(1,nodes), sizeof(*sns));
529   if(nodes == 0) sns[nodes++] = &self;
530   else {
531     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
532     const char *k;
533     void *v;
534     int klen;
535     while(noit_hash_next(&storagenode_to_info_cache,
536                          &iter, &k, &klen, &v)) {
537       sns[i++] = (storagenode_info *)v;
538     }
539   }
540   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
541
542   for(i=0; i<nodes; i++) {
543     if(pthread_create(&jobs[i], NULL,
544                       stratcon_datastore_check_loadall, sns[i]) != 0) {
545       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
546     }
547   }
548   for(i=0; i<nodes; i++) {
549     void *good;
550     pthread_join(jobs[i], &good);
551     tcnt += (int)(vpsized_int)good;
552   }
553   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
554   return 0;
555 }
556 void
557 stratcon_datastore_iep_check_preload() {
558   eventer_t e;
559   conn_pool *cpool;
560
561   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
562   e = eventer_alloc();
563   e->mask = EVENTER_ASYNCH;
564   e->callback = stratcon_datastore_asynch_drive_iep;
565   e->closure = NULL;
566   eventer_add_asynch(cpool->jobq, e);
567 }
568 execute_outcome_t
569 stratcon_datastore_find(ds_rt_detail *d) {
570   conn_q *cq;
571   char *val;
572   int row_count;
573   struct realtime_tracker *node;
574
575   for(node = d->rt; node; node = node->next) {
576     char uuid_str[UUID_STR_LEN+1];
577     const char *fqdn, *dsn, *remote_cn;
578     char remote_ip[32];
579     int storagenode_id;
580
581     uuid_unparse_lower(node->checkid, uuid_str);
582     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
583                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
584       continue;
585
586     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n",
587           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
588
589     /* We might be able to find the IP from our config if someone has
590      * specified the expected cn in the noit definition.
591      */
592     if(stratcon_find_noit_ip_by_cn(remote_cn,
593                                    remote_ip, sizeof(remote_ip)) == 0) {
594       node->noit = strdup(remote_ip);
595       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
596       continue;
597     }
598
599     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
600     stratcon_database_connect(cq);
601
602     GET_QUERY(check_find);
603     DECLARE_PARAM_INT(node->sid);
604     PG_EXEC(check_find);
605     row_count = PQntuples(d->res);
606     if(row_count != 1) {
607       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
608       PQclear(d->res);
609       goto bad_row;
610     }
611
612     /* Get the remote_address (which noit owns this) */
613     PG_GET_STR_COL(val, 0, "remote_address");
614     if(!val) {
615       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
616       PQclear(d->res);
617       goto bad_row;
618     }
619     node->noit = strdup(val);
620     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
621    bad_row:
622     free_params((ds_single_detail *)d);
623     d->nparams = 0;
624     release_conn_q(cq);
625   }
626   return DS_EXEC_SUCCESS;
627 }
628 execute_outcome_t
629 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
630                            ds_line_detail *d) {
631   int type, len, sid;
632   char *final_buff;
633   uLong final_len, actual_final_len;
634   char *token;
635   char raddr_blank[1] = "";
636   const char *raddr;
637
638   type = d->data[0];
639   raddr = r ? r : raddr_blank;
640
641   /* Parse the log line, but only if we haven't already */
642   if(!d->nparams) {
643     char *scp, *ecp;
644
645     scp = d->data;
646 #define PROCESS_NEXT_FIELD(t,l) do { \
647   if(!*scp) goto bad_row; \
648   ecp = strchr(scp, '\t'); \
649   if(!ecp) goto bad_row; \
650   token = scp; \
651   len = (ecp-scp); \
652   scp = ecp + 1; \
653 } while(0)
654 #define PROCESS_LAST_FIELD(t,l) do { \
655   if(!*scp) ecp = scp; \
656   else { \
657     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
658     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
659   } \
660   t = scp; \
661   l = (ecp-scp); \
662 } while(0)
663
664     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
665     switch(type) {
666       /* See noit_check_log.c for log description */
667       case 'n':
668         DECLARE_PARAM_STR(raddr, strlen(raddr));
669         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
670         DECLARE_PARAM_STR("noitd",5); /* node_type */
671         PROCESS_NEXT_FIELD(token,len);
672         d->whence = (time_t)strtoul(token, NULL, 10);
673         DECLARE_PARAM_STR(token,len); /* timestamp */
674
675         /* This is the expected uncompressed len */
676         PROCESS_NEXT_FIELD(token,len);
677         final_len = atoi(token);
678         final_buff = malloc(final_len);
679         if(!final_buff) goto bad_row;
680  
681         /* The last token is b64 endoded and compressed.
682          * we need to decode it, declare it and then free it.
683          */
684         PROCESS_LAST_FIELD(token, len);
685         /* We can in-place decode this */
686         len = noit_b64_decode((char *)token, len,
687                               (unsigned char *)token, len);
688         if(len <= 0) {
689           noitL(noit_error, "noitd config base64 decoding error.\n");
690           free(final_buff);
691           goto bad_row;
692         }
693         actual_final_len = final_len;
694         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
695                               (unsigned char *)token, len)) {
696           noitL(noit_error, "noitd config decompression failure.\n");
697           free(final_buff);
698           goto bad_row;
699         }
700         if(final_len != actual_final_len) {
701           noitL(noit_error, "noitd config decompression error.\n");
702           free(final_buff);
703           goto bad_row;
704         }
705         DECLARE_PARAM_STR(final_buff, final_len);
706         free(final_buff);
707         break;
708       case 'C':
709         DECLARE_PARAM_STR(raddr, strlen(raddr));
710         PROCESS_NEXT_FIELD(token,len);
711         DECLARE_PARAM_STR(token,len); /* timestamp */
712         d->whence = (time_t)strtoul(token, NULL, 10);
713         PROCESS_NEXT_FIELD(token, len);
714         sid = uuid_to_sid(token, remote_cn);
715         if(sid == 0) goto bad_row;
716         DECLARE_PARAM_INT(sid); /* sid */
717         DECLARE_PARAM_STR(token,len); /* uuid */
718         PROCESS_NEXT_FIELD(token, len);
719         DECLARE_PARAM_STR(token,len); /* target */
720         PROCESS_NEXT_FIELD(token, len);
721         DECLARE_PARAM_STR(token,len); /* module */
722         PROCESS_LAST_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* name */
724         break;
725       case 'M':
726         PROCESS_NEXT_FIELD(token,len);
727         DECLARE_PARAM_STR(token,len); /* timestamp */
728         d->whence = (time_t)strtoul(token, NULL, 10);
729         PROCESS_NEXT_FIELD(token, len);
730         sid = uuid_to_sid(token, remote_cn);
731         if(sid == 0) goto bad_row;
732         DECLARE_PARAM_INT(sid); /* sid */
733         PROCESS_NEXT_FIELD(token, len);
734         DECLARE_PARAM_STR(token,len); /* name */
735         PROCESS_NEXT_FIELD(token,len);
736         d->metric_type = *token;
737         PROCESS_LAST_FIELD(token,len);
738         DECLARE_PARAM_STR(token,len); /* value */
739         break;
740       case 'S':
741         PROCESS_NEXT_FIELD(token,len);
742         DECLARE_PARAM_STR(token,len); /* timestamp */
743         d->whence = (time_t)strtoul(token, NULL, 10);
744         PROCESS_NEXT_FIELD(token, len);
745         sid = uuid_to_sid(token, remote_cn);
746         if(sid == 0) goto bad_row;
747         DECLARE_PARAM_INT(sid); /* sid */
748         PROCESS_NEXT_FIELD(token, len);
749         DECLARE_PARAM_STR(token,len); /* state */
750         PROCESS_NEXT_FIELD(token, len);
751         DECLARE_PARAM_STR(token,len); /* availability */
752         PROCESS_NEXT_FIELD(token, len);
753         DECLARE_PARAM_STR(token,len); /* duration */
754         PROCESS_LAST_FIELD(token,len);
755         DECLARE_PARAM_STR(token,len); /* status */
756         break;
757       default:
758         goto bad_row;
759     }
760
761   }
762
763   /* Now execute the query */
764   switch(type) {
765     case 'n':
766       GET_QUERY(config_insert);
767       PG_EXEC(config_insert);
768       PQclear(d->res);
769       break;
770     case 'C':
771       GET_QUERY(check_insert);
772       PG_TM_EXEC(check_insert, d->whence);
773       PQclear(d->res);
774       break;
775     case 'S':
776       GET_QUERY(status_insert);
777       PG_TM_EXEC(status_insert, d->whence);
778       PQclear(d->res);
779       break;
780     case 'M':
781       switch(d->metric_type) {
782         case METRIC_INT32:
783         case METRIC_UINT32:
784         case METRIC_INT64:
785         case METRIC_UINT64:
786         case METRIC_DOUBLE:
787           GET_QUERY(metric_insert_numeric);
788           PG_TM_EXEC(metric_insert_numeric, d->whence);
789           PQclear(d->res);
790           break;
791         case METRIC_STRING:
792           GET_QUERY(metric_insert_text);
793           PG_TM_EXEC(metric_insert_text, d->whence);
794           PQclear(d->res);
795           break;
796         default:
797           goto bad_row;
798       }
799       break;
800     default:
801       /* should never get here */
802       goto bad_row;
803   }
804   return DS_EXEC_SUCCESS;
805  bad_row:
806   return DS_EXEC_ROW_FAILED;
807 }
808 static int
809 stratcon_database_post_connect(conn_q *cq) {
810   int rv = 0;
811   ds_single_detail _d = { 0 }, *d = &_d;
812   if(cq->fqdn) {
813     char *remote_str, *remote_cn;
814     /* This is the silly way we get null's in through our declare_param_str */
815     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
816     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
817     /* This is a storage node, it gets the storage node post_connect */
818     GET_QUERY(storage_post_connect);
819     rv = -1; /* now we're serious */
820     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
821     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
822     PG_EXEC(storage_post_connect);
823     PQclear(d->res);
824     rv = 0;
825   }
826   else {
827     /* Metanode post_connect */
828     GET_QUERY(metanode_post_connect);
829     rv = -1; /* now we're serious */
830     PG_EXEC(metanode_post_connect);
831     PQclear(d->res);
832     rv = 0;
833   }
834  bad_row:
835   free_params(d);
836   if(rv == -1) {
837     /* Post-connect intentions are serious and fatal */
838     PQfinish(cq->dbh);
839     cq->dbh = NULL;
840   }
841   return rv;
842 }
843 static int
844 stratcon_database_connect(conn_q *cq) {
845   char *dsn, dsn_meta[512];
846   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
847   const char *k, *v;
848   int klen;
849   noit_hash_table *t;
850
851   dsn_meta[0] = '\0';
852   if(!cq->dsn) {
853     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
854     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
855       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
856       strlcat(dsn_meta, k, sizeof(dsn_meta));
857       strlcat(dsn_meta, "=", sizeof(dsn_meta));
858       strlcat(dsn_meta, v, sizeof(dsn_meta));
859     }
860     noit_hash_destroy(t, free, free);
861     free(t);
862     dsn = dsn_meta;
863   }
864   else {
865     char options[32];
866     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
867     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
868                                options, sizeof(options))) {
869       strlcat(dsn_meta, " ", sizeof(dsn_meta));
870       strlcat(dsn_meta, "user", sizeof(dsn_meta));
871       strlcat(dsn_meta, "=", sizeof(dsn_meta));
872       strlcat(dsn_meta, options, sizeof(dsn_meta));
873     }
874     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
875                                options, sizeof(options))) {
876       strlcat(dsn_meta, " ", sizeof(dsn_meta));
877       strlcat(dsn_meta, "password", sizeof(dsn_meta));
878       strlcat(dsn_meta, "=", sizeof(dsn_meta));
879       strlcat(dsn_meta, options, sizeof(dsn_meta));
880     }
881     dsn = dsn_meta;
882   }
883
884   if(cq->dbh) {
885     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
886     PQreset(cq->dbh);
887     if(PQstatus(cq->dbh) != CONNECTION_OK) {
888       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
889             dsn, PQerrorMessage(cq->dbh));
890       return -1;
891     }
892     if(stratcon_database_post_connect(cq)) return -1;
893     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
894     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
895           dsn, PQerrorMessage(cq->dbh));
896     return -1;
897   }
898
899   cq->dbh = PQconnectdb(dsn);
900   if(!cq->dbh) return -1;
901   if(PQstatus(cq->dbh) != CONNECTION_OK) {
902     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
903           dsn, PQerrorMessage(cq->dbh));
904     return -1;
905   }
906   if(stratcon_database_post_connect(cq)) return -1;
907   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
908   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
909         dsn, PQerrorMessage(cq->dbh));
910   return -1;
911 }
912 static int
913 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
914                                 const char *name) {
915   int rv = -1;
916   PGresult *res;
917   char cmd[128];
918   strlcpy(cmd, p, sizeof(cmd));
919   strlcat(cmd, name, sizeof(cmd));
920   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
921   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
922   PQclear(res);
923   return rv;
924 }
925 static int
926 stratcon_datastore_do(conn_q *cq, const char *cmd) {
927   PGresult *res;
928   int rv = -1;
929   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
930   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
931   PQclear(res);
932   return rv;
933 }
934 #define BUSTED(cq) do { \
935   PQfinish((cq)->dbh); \
936   (cq)->dbh = NULL; \
937   goto full_monty; \
938 } while(0)
939 #define SAVEPOINT(name) do { \
940   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
941   last_sp = current; \
942 } while(0)
943 #define ROLLBACK_TO_SAVEPOINT(name) do { \
944   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
945     BUSTED(cq); \
946   last_sp = NULL; \
947 } while(0)
948 #define RELEASE_SAVEPOINT(name) do { \
949   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
950     BUSTED(cq); \
951   last_sp = NULL; \
952 } while(0)
953 int
954 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
955                                  struct timeval *now) {
956   ds_rt_detail *dsjd = closure;
957   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
958   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
959
960   assert(dsjd->rt);
961   stratcon_datastore_find(dsjd);
962   if(dsjd->completion_event)
963     eventer_add(dsjd->completion_event);
964
965   free_params((ds_single_detail *)dsjd);
966   free(dsjd);
967   return 0;
968 }
969 static const char *
970 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
971   void *vinfo;
972   char *dsn = NULL, *fqdn = NULL;
973   int found = 0;
974   storagenode_info *info = NULL;
975   pthread_mutex_lock(&storagenode_to_info_cache_lock);
976   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
977                         &vinfo)) {
978     found = 1;
979     info = vinfo;
980   }
981   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
982   if(found) {
983     if(fqdn_out) *fqdn_out = info->fqdn;
984     return info->dsn;
985   }
986
987   if(!found && can_use_db) {
988     ds_single_detail *d;
989     conn_q *cq;
990     int row_count;
991     /* Look it up and store it */
992     d = calloc(1, sizeof(*d));
993     cq = get_conn_q_for_metanode();
994     GET_QUERY(find_storage);
995     DECLARE_PARAM_INT(id);
996     PG_EXEC(find_storage);
997     row_count = PQntuples(d->res);
998     if(row_count) {
999       PG_GET_STR_COL(dsn, 0, "dsn");
1000       PG_GET_STR_COL(fqdn, 0, "fqdn");
1001       fqdn = fqdn ? strdup(fqdn) : NULL;
1002       dsn = dsn ? strdup(dsn) : NULL;
1003     }
1004     PQclear(d->res);
1005    bad_row:
1006     free_params(d);
1007     free(d);
1008     release_conn_q(cq);
1009   }
1010   if(fqdn) {
1011     info = calloc(1, sizeof(*info));
1012     info->fqdn = fqdn;
1013     if(fqdn_out) *fqdn_out = info->fqdn;
1014     info->dsn = dsn;
1015     info->storagenode_id = id;
1016     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1017     noit_hash_store(&storagenode_to_info_cache,
1018                     (void *)&info->storagenode_id, sizeof(int), info);
1019     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1020   }
1021   return info ? info->dsn : NULL;
1022 }
1023 static ds_line_detail *
1024 build_insert_batch(interim_journal_t *ij) {
1025   int rv;
1026   off_t len;
1027   const char *buff, *cp, *lcp;
1028   struct stat st;
1029   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1030
1031   if(ij->fd < 0) {
1032     ij->fd = open(ij->filename, O_RDONLY);
1033     if(ij->fd < 0) {
1034       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1035             ij->filename, strerror(errno));
1036       assert(ij->fd >= 0);
1037     }
1038   }
1039   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1040   if(rv == -1) {
1041       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1042             ij->filename, strerror(errno));
1043     assert(rv != -1);
1044   }
1045   len = st.st_size;
1046   if(len > 0) {
1047     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1048     if(buff == (void *)-1) {
1049       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1050             ij->filename, strerror(errno));
1051       assert(buff != (void *)-1);
1052     }
1053     lcp = buff;
1054     while(lcp < (buff + len) &&
1055           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1056       next = calloc(1, sizeof(*next));
1057       next->data = malloc(cp - lcp + 1);
1058       memcpy(next->data, lcp, cp - lcp);
1059       next->data[cp - lcp] = '\0';
1060       if(!head) head = next;
1061       if(last) last->next = next;
1062       last = next;
1063       lcp = cp + 1;
1064     }
1065     munmap((void *)buff, len);
1066   }
1067   close(ij->fd);
1068   return head;
1069 }
1070 static void
1071 interim_journal_remove(interim_journal_t *ij) {
1072   unlink(ij->filename);
1073   if(ij->filename) free(ij->filename);
1074   if(ij->remote_str) free(ij->remote_str);
1075   if(ij->remote_cn) free(ij->remote_cn);
1076   if(ij->fqdn) free(ij->fqdn);
1077   free(ij);
1078 }
1079 int
1080 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1081                                   struct timeval *now) {
1082   int i, total, success, sp_total, sp_success;
1083   interim_journal_t *ij;
1084   ds_line_detail *head = NULL, *current, *last_sp;
1085   const char *dsn;
1086   conn_q *cq;
1087   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1088   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1089
1090   ij = closure;
1091   if(ij->fqdn == NULL) {
1092     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1093     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1094   }
1095   else {
1096     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1097   }
1098   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1099                              ij->fqdn, dsn);
1100   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1101         ij->remote_str, ij->remote_cn, ij->fqdn);
1102  full_monty:
1103   /* Make sure we have a connection */
1104   i = 1;
1105   while(stratcon_database_connect(cq)) {
1106     noitL(noit_error, "Error connecting to database: %s\n",
1107           ij->fqdn ? ij->fqdn : "(null)");
1108     sleep(i);
1109     i *= 2;
1110     i = MIN(i, 16);
1111   }
1112
1113   if(head == NULL) head = build_insert_batch(ij);
1114   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1115         ij->remote_str ? ij->remote_str : "(null)",
1116         ij->remote_cn ? ij->remote_cn : "(null)",
1117         ij->fqdn ? ij->fqdn : "(null)");
1118   current = head;
1119   last_sp = NULL;
1120   total = success = sp_total = sp_success = 0;
1121   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1122   while(current) {
1123     execute_outcome_t rv;
1124     if(current->data) {
1125       if(!last_sp) {
1126         SAVEPOINT("batch");
1127         sp_success = success;
1128         sp_total = total;
1129       }
1130  
1131       if(current->problematic) {
1132         RELEASE_SAVEPOINT("batch");
1133         current = current->next;
1134         total++;
1135         continue;
1136       }
1137       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1138                                       current);
1139       switch(rv) {
1140         case DS_EXEC_SUCCESS:
1141           total++;
1142           success++;
1143           current = current->next;
1144           break;
1145         case DS_EXEC_ROW_FAILED:
1146           /* rollback to savepoint, mark this record as bad and start again */
1147           if(current->data[0] != 'n')
1148             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1149           current->problematic = 1;
1150           current = last_sp;
1151           success = sp_success;
1152           total = sp_total;
1153           ROLLBACK_TO_SAVEPOINT("batch");
1154           break;
1155         case DS_EXEC_TXN_FAILED:
1156           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1157           BUSTED(cq);
1158       }
1159     }
1160   }
1161   if(last_sp) RELEASE_SAVEPOINT("batch");
1162   if(stratcon_datastore_do(cq, "COMMIT")) {
1163     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1164     BUSTED(cq);
1165   }
1166   /* Cleanup the mess */
1167   while(head) {
1168     ds_line_detail *tofree;
1169     tofree = head;
1170     head = head->next;
1171     if(tofree->data) free(tofree->data);
1172     free_params((ds_single_detail *)tofree);
1173     free(tofree);
1174   }
1175   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1176         ij->remote_str ? ij->remote_str : "(null)",
1177         ij->remote_cn ? ij->remote_cn : "(null)",
1178         ij->fqdn ? ij->fqdn : "(null)", success, total);
1179   interim_journal_remove(ij);
1180   release_conn_q(cq);
1181   return 0;
1182 }
1183 static int
1184 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1185                                 struct timeval *now) {
1186   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1187   const char *k;
1188   int klen;
1189   void *vij;
1190   interim_journal_t *ij;
1191   syncset_t *syncset = closure;
1192
1193   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1194     if(syncset->completion) {
1195       eventer_add(syncset->completion);
1196       eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1197     }
1198     free(syncset);
1199     return 0;
1200   }
1201   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1202
1203   noitL(ds_deb, "Syncing journal sets...\n");
1204   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1205     eventer_t ingest;
1206     ij = vij;
1207     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1208           ij->remote_str, ij->remote_cn, ij->fqdn);
1209     fsync(ij->fd);
1210     close(ij->fd);
1211     ij->fd = -1;
1212     ingest = eventer_alloc();
1213     ingest->mask = EVENTER_ASYNCH;
1214     ingest->callback = stratcon_datastore_asynch_execute;
1215     ingest->closure = ij;
1216     eventer_add_asynch(ij->cpool->jobq, ingest);
1217   }
1218   noit_hash_destroy(syncset->ws, free, NULL);
1219   free(syncset->ws);
1220   return 0;
1221 }
1222 static interim_journal_t *
1223 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1224                     int storagenode_id, const char *fqdn_in) {
1225   void *vhash, *vij;
1226   noit_hash_table *working_set;
1227   interim_journal_t *ij;
1228   struct timeval now;
1229   char jpath[PATH_MAX];
1230   char remote_str[128];
1231   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1232   const char *fqdn = fqdn_in ? fqdn_in : "default";
1233
1234   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1235   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1236
1237   /* Lookup the working set */
1238   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1239     working_set = calloc(1, sizeof(*working_set));
1240     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1241                     working_set);
1242   }
1243   else
1244     working_set = vhash;
1245
1246   /* Lookup the interim journal within the working set */
1247   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1248     ij = calloc(1, sizeof(*ij));
1249     gettimeofday(&now, NULL);
1250     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1251              basejpath, remote_str, remote_cn, storagenode_id,
1252              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1253     ij->remote_str = strdup(remote_str);
1254     ij->remote_cn = strdup(remote_cn);
1255     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1256     ij->storagenode_id = storagenode_id;
1257     ij->filename = strdup(jpath);
1258     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1259                                          ij->fqdn);
1260     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1261     if(ij->fd < 0 && errno == ENOENT) {
1262       if(mkdir_for_file(ij->filename, 0750)) {
1263         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1264               ij->filename, strerror(errno));
1265         exit(-1);
1266       }
1267       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1268     }
1269     if(ij->fd < 0) {
1270       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1271             ij->filename, strerror(errno));
1272       exit(-1);
1273     }
1274     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1275   }
1276   else
1277     ij = vij;
1278
1279           return ij;
1280 }
1281 static int
1282 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1283                           int *sid_out, int *storagenode_id_out,
1284                           const char **remote_cn_out,
1285                           const char **fqdn_out, const char **dsn_out) {
1286   /* only called from the main thread -- no safety issues */
1287   void *vuuidinfo, *vinfo;
1288   uuid_info *uuidinfo;
1289   storagenode_info *info = NULL;
1290   char *fqdn = NULL;
1291   char *dsn = NULL;
1292   char *new_remote_cn = NULL;
1293   int storagenode_id = 0, sid = 0;
1294   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1295                          &vuuidinfo)) {
1296     int row_count = 0;
1297     char *tmpint;
1298     ds_single_detail *d;
1299     conn_q *cq;
1300
1301     /* We can't do a database lookup without the remote_cn */
1302     if(!remote_cn) {
1303       if(stratcon_datastore_get_enabled()) {
1304         /* We have an authoritatively maintained cache, we don't do lookups */
1305         return -1;
1306       }
1307       else
1308         remote_cn = "[[null]]";
1309     }
1310
1311     d = calloc(1, sizeof(*d));
1312     cq = get_conn_q_for_metanode();
1313     if(stratcon_database_connect(cq) == 0) {
1314       /* Blocking call to service the cache miss */
1315       GET_QUERY(check_map);
1316       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1317       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1318       PG_EXEC(check_map);
1319       row_count = PQntuples(d->res);
1320       if(row_count != 1) {
1321         PQclear(d->res);
1322         goto bad_row;
1323       }
1324       PG_GET_STR_COL(tmpint, 0, "sid");
1325       if(!tmpint) {
1326         row_count = 0;
1327         PQclear(d->res);
1328         goto bad_row;
1329       }
1330       sid = atoi(tmpint);
1331       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1332       if(tmpint) storagenode_id = atoi(tmpint);
1333       PG_GET_STR_COL(fqdn, 0, "fqdn");
1334       PG_GET_STR_COL(dsn, 0, "dsn");
1335       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1336       fqdn = fqdn ? strdup(fqdn) : NULL;
1337       dsn = dsn ? strdup(dsn) : NULL;
1338       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1339       PQclear(d->res);
1340     }
1341    bad_row:
1342     free_params((ds_single_detail *)d);
1343     free(d);
1344     release_conn_q(cq);
1345     if(row_count != 1) {
1346       return -1;
1347     }
1348     /* Place in cache */
1349     uuidinfo = calloc(1, sizeof(*uuidinfo));
1350     uuidinfo->sid = sid;
1351     uuidinfo->uuid_str = strdup(uuid_str);
1352     uuidinfo->storagenode_id = storagenode_id;
1353     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1354     noit_hash_store(&uuid_to_info_cache,
1355                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1356     /* Also, we may have just witnessed a new storage node, store it */
1357     if(storagenode_id) {
1358       int needs_free = 0;
1359       info = calloc(1, sizeof(*info));
1360       info->storagenode_id = storagenode_id;
1361       info->dsn = dsn ? strdup(dsn) : NULL;
1362       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1363       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1364       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1365                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1366         /* hack to save memory -- we *never* remove from these caches,
1367            so we can use the same fqdn value in the above cache for the key
1368            in the cache below -- (no strdup) */
1369         noit_hash_store(&storagenode_to_info_cache,
1370                         (void *)&info->storagenode_id, sizeof(int), info);
1371       }
1372       else needs_free = 1;
1373       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1374       if(needs_free) {
1375         if(info->dsn) free(info->dsn);
1376         if(info->fqdn) free(info->fqdn);
1377         free(info);
1378       }
1379     }
1380   }
1381   else
1382     uuidinfo = vuuidinfo;
1383
1384   if(uuidinfo && uuidinfo->storagenode_id) {
1385     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1386       /* we don't have dsn and we actually want it */
1387       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1388       if(noit_hash_retrieve(&storagenode_to_info_cache,
1389                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1390                             &vinfo))
1391         info = vinfo;
1392       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1393     }
1394   }
1395
1396   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1397   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1398   assert(uuidinfo);
1399   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1400   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1401   if(sid_out) *sid_out = uuidinfo->sid;
1402   if(fqdn) free(fqdn);
1403   if(dsn) free(dsn);
1404   if(new_remote_cn) free(new_remote_cn);
1405   return 0;
1406 }
1407 static int
1408 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1409   char uuid_str[UUID_STR_LEN+1];
1410   int sid = 0;
1411   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1412   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1413   return sid;
1414 }
1415 static void
1416 stratcon_datastore_journal(struct sockaddr *remote,
1417                            const char *remote_cn, char *line) {
1418   interim_journal_t *ij = NULL;
1419   char uuid_str[UUID_STR_LEN+1], *cp;
1420   const char *fqdn = NULL, *dsn = NULL;
1421   int storagenode_id = 0;
1422   uuid_t checkid;
1423   if(!line) return;
1424   /* if it is a UUID based thing, find the storage node */
1425   switch(*line) {
1426     case 'C':
1427     case 'S':
1428     case 'M':
1429       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1430         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1431         if(!uuid_parse(uuid_str, checkid)) {
1432           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1433                                     &storagenode_id, NULL, &fqdn, &dsn);
1434           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1435         }
1436       }
1437       break;
1438     case 'n':
1439       ij = interim_journal_get(remote,remote_cn,0,NULL);
1440       break;
1441     default:
1442       break;
1443   }
1444   if(!ij) {
1445     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1446   }
1447   else {
1448     int len;
1449     len = write(ij->fd, line, strlen(line));
1450     if(len < 0) {
1451       noitL(noit_error, "write to %s failed: %s\n",
1452             ij->filename, strerror(errno));
1453     }
1454   }
1455   free(line);
1456   return;
1457 }
1458 static noit_hash_table *
1459 stratcon_datastore_journal_remove(struct sockaddr *remote,
1460                                   const char *remote_cn) {
1461   void *vhash = NULL;
1462   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1463     /* pluck it out */
1464     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1465   }
1466   else {
1467     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1468           remote_cn);
1469     abort();
1470   }
1471   return vhash;
1472 }
1473 void
1474 stratcon_datastore_push(stratcon_datastore_op_t op,
1475                         struct sockaddr *remote,
1476                         const char *remote_cn, void *operand,
1477                         eventer_t completion) {
1478   conn_pool *cpool;
1479   syncset_t *syncset;
1480   eventer_t e;
1481   ds_rt_detail *rtdetail;
1482   struct datastore_onlooker_list *nnode;
1483
1484   for(nnode = onlookers; nnode; nnode = nnode->next)
1485     nnode->dispatch(op,remote,remote_cn,operand);
1486
1487   switch(op) {
1488     case DS_OP_INSERT:
1489       stratcon_datastore_journal(remote, remote_cn, (char *)operand);
1490       break;
1491     case DS_OP_CHKPT:
1492       e = eventer_alloc();
1493       syncset = calloc(1, sizeof(*syncset));
1494       e->mask = EVENTER_ASYNCH;
1495       e->callback = stratcon_datastore_journal_sync;
1496       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1497       syncset->completion = completion;
1498       e->closure = syncset;
1499       eventer_add(e);
1500       break;
1501     case DS_OP_FIND_COMPLETE:
1502       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1503       rtdetail = calloc(1, sizeof(*rtdetail));
1504       rtdetail->rt = operand;
1505       rtdetail->completion_event = completion;
1506       e = eventer_alloc();
1507       e->mask = EVENTER_ASYNCH;
1508       e->callback = stratcon_datastore_asynch_lookup;
1509       e->closure = rtdetail;
1510       eventer_add_asynch(cpool->jobq, e);
1511       break;
1512   }
1513 }
1514
1515 int
1516 stratcon_datastore_saveconfig(void *unused) {
1517   int rv = -1;
1518   char *buff;
1519   ds_single_detail _d = { 0 }, *d = &_d;
1520   conn_q *cq;
1521   char ipv4_str[32];
1522   struct in_addr r, l;
1523
1524   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1525   memset(&l, 0, sizeof(l));
1526   noit_getip_ipv4(r, &l);
1527   /* Ignore the error.. what are we going to do anyway */
1528   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1529     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1530
1531   cq = get_conn_q_for_metanode();
1532
1533   if(stratcon_database_connect(cq) == 0) {
1534     char time_as_str[20];
1535     size_t len;
1536     buff = noit_conf_xml_in_mem(&len);
1537     if(!buff) goto bad_row;
1538
1539     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1540     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1541     DECLARE_PARAM_STR("", 0);
1542     DECLARE_PARAM_STR("stratcond", 9);
1543     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1544     DECLARE_PARAM_STR(buff, len);
1545     free(buff);
1546
1547     GET_QUERY(config_insert);
1548     PG_EXEC(config_insert);
1549     PQclear(d->res);
1550     rv = 0;
1551
1552     bad_row:
1553       free_params(d);
1554   }
1555   release_conn_q(cq);
1556   return rv;
1557 }
1558
1559 void
1560 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1561                                                struct sockaddr *,
1562                                                const char *, void *)) {
1563   struct datastore_onlooker_list *nnode;
1564   nnode = calloc(1, sizeof(*nnode));
1565   nnode->dispatch = f;
1566   nnode->next = onlookers;
1567   while(noit_atomic_casptr((volatile void **)&onlookers,
1568                            nnode, nnode->next) != (void *)nnode->next)
1569     nnode->next = onlookers;
1570 }
1571 static void
1572 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1573                                          char *id_str, char *file) {
1574   char path[PATH_MAX];
1575   interim_journal_t *ij;
1576   eventer_t ingest;
1577
1578   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1579            basejpath, remote_str, remote_cn, id_str, file);
1580   ij = calloc(1, sizeof(*ij));
1581   ij->fd = open(path, O_RDONLY);
1582   if(ij->fd < 0) {
1583     noitL(noit_error, "cannot open journal '%s': %s\n",
1584           path, strerror(errno));
1585     free(ij);
1586     return;
1587   }
1588   close(ij->fd);
1589   ij->fd = -1;
1590   ij->filename = strdup(path);
1591   ij->remote_str = strdup(remote_str);
1592   ij->remote_cn = strdup(remote_cn);
1593   ij->storagenode_id = atoi(id_str);
1594   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1595                                        ij->fqdn);
1596   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1597   ingest = eventer_alloc();
1598   ingest->mask = EVENTER_ASYNCH;
1599   ingest->callback = stratcon_datastore_asynch_execute;
1600   ingest->closure = ij;
1601   eventer_add_asynch(ij->cpool->jobq, ingest);
1602 }
1603 static void
1604 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1605   char path[PATH_MAX];
1606   DIR *root;
1607   struct dirent *de, *entry;
1608   int i = 0, cnt = 0;
1609   char **entries;
1610   int size = 0;
1611
1612   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1613            first ? "/" : "", first ? first : "",
1614            second ? "/" : "", second ? second : "",
1615            third ? "/" : "", third ? third : "");
1616 #ifdef _PC_NAME_MAX
1617   size = pathconf(path, _PC_NAME_MAX);
1618 #endif
1619   size = MIN(size, PATH_MAX + 128);
1620   de = alloca(size);
1621   root = opendir(path);
1622   if(!root) return;
1623   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1624   closedir(root);
1625   root = opendir(path);
1626   if(!root) return;
1627   entries = malloc(sizeof(*entries) * cnt);
1628   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1629     if(i < cnt) {
1630       entries[i++] = strdup(entry->d_name);
1631     }
1632   }
1633   closedir(root);
1634   cnt = i; /* could have changed, directories are fickle */
1635   qsort(entries, i, sizeof(*entries),
1636         (int (*)(const void *, const void *))strcasecmp);
1637   for(i=0; i<cnt; i++) {
1638     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1639     noitL(ds_deb, "Processing L%d entry '%s'\n",
1640           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1641     if(!first)
1642       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1643     else if(!second)
1644       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1645     else if(!third)
1646       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1647     else if(strlen(entries[i]) == 16)
1648       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1649   }
1650   for(i=0; i<cnt; i++)
1651     free(entries[i]);
1652   free(entries);
1653 }
1654 static void
1655 stratcon_datastore_sweep_journals() {
1656   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1657 }
1658
1659 int
1660 stratcon_datastore_ingest_all_storagenode_info() {
1661   int i, cnt = 0;
1662   ds_single_detail _d = { 0 }, *d = &_d;
1663   conn_q *cq;
1664   cq = get_conn_q_for_metanode();
1665
1666   while(stratcon_database_connect(cq)) {
1667     noitL(noit_error, "Error connecting to database\n");
1668     sleep(1);
1669   }
1670
1671   GET_QUERY(all_storage);
1672   PG_EXEC(all_storage);
1673   cnt = PQntuples(d->res);
1674   for(i=0; i<cnt; i++) {
1675     void *vinfo;
1676     char *tmpint, *fqdn, *dsn;
1677     int storagenode_id;
1678     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1679     storagenode_id = atoi(tmpint);
1680     PG_GET_STR_COL(fqdn, i, "fqdn");
1681     PG_GET_STR_COL(dsn, i, "dsn");
1682     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1683     storagenode_id = tmpint ? atoi(tmpint) : 0;
1684
1685     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1686                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1687       storagenode_info *info;
1688       info = calloc(1, sizeof(*info));
1689       info->storagenode_id = storagenode_id;
1690       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1691       info->dsn = dsn ? strdup(dsn) : NULL;
1692       noit_hash_store(&storagenode_to_info_cache,
1693                       (void *)&info->storagenode_id, sizeof(int), info);
1694       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1695             info->storagenode_id,
1696             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1697     }
1698   }
1699   PQclear(d->res);
1700  bad_row:
1701   free_params(d);
1702
1703   release_conn_q(cq);
1704   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1705   return cnt;
1706 }
1707 int
1708 stratcon_datastore_ingest_all_check_info() {
1709   int i, cnt, loaded = 0;
1710   ds_single_detail _d = { 0 }, *d = &_d;
1711   conn_q *cq;
1712   cq = get_conn_q_for_metanode();
1713
1714   while(stratcon_database_connect(cq)) {
1715     noitL(noit_error, "Error connecting to database\n");
1716     sleep(1);
1717   }
1718
1719   GET_QUERY(check_mapall);
1720   PG_EXEC(check_mapall);
1721   cnt = PQntuples(d->res);
1722   for(i=0; i<cnt; i++) {
1723     void *vinfo;
1724     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1725     int sid, storagenode_id;
1726     uuid_info *uuidinfo;
1727     PG_GET_STR_COL(uuid_str, i, "id");
1728     if(!uuid_str) continue;
1729     PG_GET_STR_COL(tmpint, i, "sid");
1730     if(!tmpint) continue;
1731     sid = atoi(tmpint);
1732     PG_GET_STR_COL(fqdn, i, "fqdn");
1733     PG_GET_STR_COL(dsn, i, "dsn");
1734     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1735     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1736     storagenode_id = tmpint ? atoi(tmpint) : 0;
1737
1738     uuidinfo = calloc(1, sizeof(*uuidinfo));
1739     uuidinfo->uuid_str = strdup(uuid_str);
1740     uuidinfo->remote_cn = strdup(remote_cn);
1741     uuidinfo->storagenode_id = storagenode_id;
1742     uuidinfo->sid = sid;
1743     noit_hash_store(&uuid_to_info_cache,
1744                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1745     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1746           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1747     loaded++;
1748     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1749                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1750       storagenode_info *info;
1751       info = calloc(1, sizeof(*info));
1752       info->storagenode_id = storagenode_id;
1753       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1754       info->dsn = dsn ? strdup(dsn) : NULL;
1755       noit_hash_store(&storagenode_to_info_cache,
1756                       (void *)&info->storagenode_id, sizeof(int), info);
1757       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1758             info->storagenode_id,
1759             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1760     }
1761   }
1762   PQclear(d->res);
1763  bad_row:
1764   free_params(d);
1765
1766   release_conn_q(cq);
1767   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1768   return loaded;
1769 }
1770
1771 static int
1772 rest_get_noit_config(noit_http_rest_closure_t *restc,
1773                      int npats, char **pats) {
1774   noit_http_session_ctx *ctx = restc->http_ctx;
1775   ds_single_detail *d;
1776   int row_count = 0;
1777   const char *xml = NULL;
1778   conn_q *cq = NULL;
1779
1780   if(npats != 0) {
1781     noit_http_response_server_error(ctx, "text/xml");
1782     noit_http_response_end(ctx);
1783     return 0;
1784   }
1785   d = calloc(1, sizeof(*d));
1786   GET_QUERY(config_get);
1787   cq = get_conn_q_for_metanode();
1788   if(!cq) {
1789     noit_http_response_server_error(ctx, "text/xml");
1790     goto bad_row;
1791   }
1792
1793   DECLARE_PARAM_STR(restc->remote_cn,
1794                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1795   PG_EXEC(config_get);
1796   row_count = PQntuples(d->res);
1797   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1798
1799   if(xml == NULL) {
1800     char buff[1024];
1801     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1802                                  "<row_count>%d</row_count></error>\n",
1803              restc->remote_cn, row_count);
1804     noit_http_response_append(ctx, buff, strlen(buff));
1805     noit_http_response_not_found(ctx, "text/xml");
1806   }
1807   else {
1808     noit_http_response_append(ctx, xml, strlen(xml));
1809     noit_http_response_ok(ctx, "text/xml");
1810   }
1811  bad_row:
1812   free_params((ds_single_detail *)d);
1813   d->nparams = 0;
1814   if(cq) release_conn_q(cq);
1815
1816   noit_http_response_end(ctx);
1817   return 0;
1818 }
1819
1820 void
1821 stratcon_datastore_init() {
1822   pthread_mutex_init(&ds_conns_lock, NULL);
1823   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1824   ds_err = noit_log_stream_find("error/datastore");
1825   ds_deb = noit_log_stream_find("debug/datastore");
1826   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1827   ingest_err = noit_log_stream_find("error/ingest");
1828   if(!ds_err) ds_err = noit_error;
1829   if(!ingest_err) ingest_err = noit_error;
1830   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1831                            &basejpath)) {
1832     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1833     exit(-1);
1834   }
1835   stratcon_datastore_ingest_all_check_info();
1836   stratcon_datastore_ingest_all_storagenode_info();
1837   stratcon_datastore_sweep_journals();
1838
1839   assert(noit_http_rest_register_auth(
1840     "GET", "/noits/", "^config$", rest_get_noit_config,
1841              noit_http_rest_client_cert_auth
1842   ) == 0);
1843 }
1844
Note: See TracBrowser for help on using the browser.