root/src/stratcon_datastore.c

Revision f64ae82bd480bcd2c18ef07b13d7ed71cf55bcbb, 58.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

strict-aliasing dance

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 /* Thread-safe connection pools */
209
210 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
211 static void
212 release_conn_q_forceable(conn_q *cq, int forcefree) {
213   int putback = 0;
214   cq->last_use = time(NULL);
215   pthread_mutex_lock(&cq->pool->lock);
216   cq->pool->outstanding--;
217   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
218     putback = 1;
219     cq->next = cq->pool->head;
220     cq->pool->head = cq;
221     cq->pool->in_pool++;
222   }
223   pthread_mutex_unlock(&cq->pool->lock);
224   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
225         putback ? "to pool" : "and destroy", cq->pool->queue_name);
226   pthread_cond_signal(&cq->pool->cv);
227   if(putback) return;
228
229   /* Not put back, release it */
230   if(cq->dbh) PQfinish(cq->dbh);
231   if(cq->remote_str) free(cq->remote_str);
232   if(cq->remote_cn) free(cq->remote_cn);
233   if(cq->fqdn) free(cq->fqdn);
234   if(cq->dsn) free(cq->dsn);
235   free(cq);
236 }
237 static void
238 ttl_purge_conn_pool(conn_pool *pool) {
239   int old_cnt, new_cnt;
240   time_t now = time(NULL);
241   conn_q *cq, *prev = NULL, *iter;
242   /* because we always replace on the head and update the last_use time when
243      doing so, we know they are ordered LRU on the end.  So, once we hit an
244      old one, we know all the others are old too.
245    */
246   if(!pool->head) return; /* hack short circuit for no locks */
247   pthread_mutex_lock(&pool->lock);
248   old_cnt = pool->in_pool;
249   cq = pool->head;
250   while(cq) {
251     if(cq->last_use + cq->pool->ttl < now) {
252       if(prev) prev->next = NULL;
253       else pool->head = NULL;
254       break;
255     }
256     prev = cq;
257     cq = cq->next;
258   }
259   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
260   /* Fix accounting */
261   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
262   new_cnt = pool->in_pool;
263   pthread_mutex_unlock(&pool->lock);
264
265   /* Force release these without holding the lock */
266   while(cq) {
267     cq = cq->next;
268     release_conn_q_forceable(cq, 1);
269   }
270   if(old_cnt != new_cnt)
271     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
272           pool->queue_name);
273 }
274 static void
275 release_conn_q(conn_q *cq) {
276   ttl_purge_conn_pool(cq->pool);
277   release_conn_q_forceable(cq, 0);
278 }
279 static conn_pool *
280 get_conn_pool_for_remote(const char *remote_str,
281                          const char *remote_cn, const char *fqdn) {
282   void *vcpool;
283   conn_pool *cpool = NULL;
284   char queue_name[256] = "datastore_";
285   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
286            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
287            fqdn ? fqdn : "default",
288            remote_cn ? remote_cn : "default");
289   pthread_mutex_lock(&ds_conns_lock);
290   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
291                         strlen(queue_name), &vcpool))
292     cpool = vcpool;
293   pthread_mutex_unlock(&ds_conns_lock);
294   if(!cpool) {
295     vcpool = cpool = calloc(1, sizeof(*cpool));
296     cpool->queue_name = strdup(queue_name);
297     pthread_mutex_init(&cpool->lock, NULL);
298     pthread_cond_init(&cpool->cv, NULL);
299     cpool->in_pool = 0;
300     cpool->outstanding = 0;
301     cpool->max_in_pool = 1;
302     cpool->max_allocated = 1;
303     pthread_mutex_lock(&ds_conns_lock);
304     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
305                         cpool)) {
306       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
307                          strlen(queue_name), &vcpool);
308     }
309     pthread_mutex_unlock(&ds_conns_lock);
310     if(vcpool != cpool) {
311       /* someone beat us to it */
312       free(cpool->queue_name);
313       pthread_mutex_destroy(&cpool->lock);
314       pthread_cond_destroy(&cpool->cv);
315       free(cpool);
316     }
317     else {
318       int i;
319       /* Our job to setup the pool */
320       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
321       eventer_jobq_init(cpool->jobq, queue_name);
322       cpool->jobq->backq = eventer_default_backq();
323       /* Add one thread */
324       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
325         eventer_jobq_increase_concurrency(cpool->jobq);
326     }
327     cpool = vcpool;
328   }
329   return cpool;
330 }
331 static conn_q *
332 get_conn_q_for_remote(const char *remote_str,
333                       const char *remote_cn, const char *fqdn,
334                       const char *dsn) {
335   conn_pool *cpool;
336   conn_q *cq;
337   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
338   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
339         cpool->queue_name);
340   pthread_mutex_lock(&cpool->lock);
341  again:
342   if(cpool->head) {
343     assert(cpool->in_pool > 0);
344     cq = cpool->head;
345     cpool->head = cq->next;
346     cpool->in_pool--;
347     cpool->outstanding++;
348     cq->next = NULL;
349     pthread_mutex_unlock(&cpool->lock);
350     return cq;
351   }
352   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
353     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
354           (void *)pthread_self(), cpool->queue_name);
355     pthread_cond_wait(&cpool->cv, &cpool->lock);
356     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
357           (void *)pthread_self(), cpool->queue_name);
358     goto again;
359   }
360   else {
361     cpool->outstanding++;
362     pthread_mutex_unlock(&cpool->lock);
363   }
364  
365   cq = calloc(1, sizeof(*cq));
366   cq->pool = cpool;
367   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
368   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
369   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
370   cq->dsn = dsn ? strdup(dsn) : NULL;
371   return cq;
372 }
373 static conn_q *
374 get_conn_q_for_metanode() {
375   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
376 }
377
378 typedef enum {
379   DS_EXEC_SUCCESS = 0,
380   DS_EXEC_ROW_FAILED = 1,
381   DS_EXEC_TXN_FAILED = 2,
382 } execute_outcome_t;
383
384 #define DECLARE_PARAM_STR(str, len) do { \
385   d->paramValues[d->nparams] = noit__strndup(str, len); \
386   d->paramLengths[d->nparams] = len; \
387   d->paramFormats[d->nparams] = 0; \
388   d->paramAllocd[d->nparams] = 1; \
389   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
390     free(d->paramValues[d->nparams]); \
391     d->paramValues[d->nparams] = NULL; \
392     d->paramLengths[d->nparams] = 0; \
393     d->paramAllocd[d->nparams] = 0; \
394   } \
395   d->nparams++; \
396 } while(0)
397 #define DECLARE_PARAM_INT(i) do { \
398   int buffer__len; \
399   char buffer__[32]; \
400   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
401   buffer__len = strlen(buffer__); \
402   DECLARE_PARAM_STR(buffer__, buffer__len); \
403 } while(0)
404
405 #define PG_GET_STR_COL(dest, row, name) do { \
406   int colnum = PQfnumber(d->res, name); \
407   dest = NULL; \
408   if (colnum >= 0) \
409     dest = PQgetisnull(d->res, row, colnum) \
410          ? NULL : PQgetvalue(d->res, row, colnum); \
411 } while(0)
412
413 #define PG_EXEC(cmd) do { \
414   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
415                         (const char * const *)d->paramValues, \
416                         d->paramLengths, d->paramFormats, 0); \
417   d->rv = PQresultStatus(d->res); \
418   if(d->rv != PGRES_COMMAND_OK && \
419      d->rv != PGRES_TUPLES_OK) { \
420     const char *pgerr = PQresultErrorMessage(d->res); \
421     const char *pgerr_end = strchr(pgerr, '\n'); \
422     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
423     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
424           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
425           (int)(pgerr_end - pgerr), pgerr); \
426     PQclear(d->res); \
427     goto bad_row; \
428   } \
429 } while(0)
430
431 #define PG_TM_EXEC(cmd, whence) do { \
432   time_t __w = whence; \
433   char cmdbuf[4096]; \
434   struct tm tbuf, *tm; \
435   tm = gmtime_r(&__w, &tbuf); \
436   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
437   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
438                         (const char * const *)d->paramValues, \
439                         d->paramLengths, d->paramFormats, 0); \
440   d->rv = PQresultStatus(d->res); \
441   if(d->rv != PGRES_COMMAND_OK && \
442      d->rv != PGRES_TUPLES_OK) { \
443     const char *pgerr = PQresultErrorMessage(d->res); \
444     const char *pgerr_end = strchr(pgerr, '\n'); \
445     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
446     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
447           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
448           (long long unsigned)whence); \
449     PQclear(d->res); \
450     goto bad_row; \
451   } \
452 } while(0)
453
454 static void *
455 stratcon_datastore_check_loadall(void *vsn) {
456   storagenode_info *sn = vsn;
457   ds_single_detail *d;
458   int i, row_count = 0, good = 0;
459   char buff[1024];
460   conn_q *cq = NULL;
461
462   d = calloc(1, sizeof(*d));
463   GET_QUERY(check_loadall);
464   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
465   i = 0;
466   while(stratcon_database_connect(cq)) {
467     if(i++ > 4) {
468       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
469       release_conn_q(cq);
470       return (void *)(vpsized_int)good;
471     }
472     sleep(1);
473   }
474   PG_EXEC(check_loadall);
475   row_count = PQntuples(d->res);
476  
477   for(i=0; i<row_count; i++) {
478     int rv;
479     int8_t family;
480     struct sockaddr *sin;
481     struct sockaddr_in sin4 = { .sin_family = AF_INET };
482     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
483     char *remote, *id, *target, *module, *name;
484     PG_GET_STR_COL(remote, i, "remote_address");
485     PG_GET_STR_COL(id, i, "id");
486     PG_GET_STR_COL(target, i, "target");
487     PG_GET_STR_COL(module, i, "module");
488     PG_GET_STR_COL(name, i, "name");
489     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
490
491     family = AF_INET;
492     sin = (struct sockaddr *)&sin4;
493     rv = inet_pton(family, remote, &sin4.sin_addr);
494     if(rv != 1) {
495       family = AF_INET6;
496       sin = (struct sockaddr *)&sin6;
497       rv = inet_pton(family, remote, &sin6.sin6_addr);
498       if(rv != 1) {
499         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
500         sin = NULL;
501       }
502     }
503
504     /* stratcon_iep_line_processor takes an allocated operand and frees it */
505     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
506     good++;
507   }
508   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
509         good, row_count, sn->fqdn);
510  bad_row:
511   free_params((ds_single_detail *)d);
512   free(d);
513   if(cq) release_conn_q(cq);
514   return (void *)(vpsized_int)good;
515 }
516 static int
517 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
518                                     struct timeval *now) {
519   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
520   pthread_t *jobs = NULL;
521   int nodes, i = 0, tcnt = 0;
522   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
523   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
524
525   pthread_mutex_lock(&storagenode_to_info_cache_lock);
526   nodes = storagenode_to_info_cache.size;
527   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
528   sns = calloc(MAX(1,nodes), sizeof(*sns));
529   if(nodes == 0) sns[nodes++] = &self;
530   else {
531     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
532     const char *k;
533     void *v;
534     int klen;
535     while(noit_hash_next(&storagenode_to_info_cache,
536                          &iter, &k, &klen, &v)) {
537       sns[i++] = (storagenode_info *)v;
538     }
539   }
540   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
541
542   for(i=0; i<nodes; i++) {
543     if(pthread_create(&jobs[i], NULL,
544                       stratcon_datastore_check_loadall, sns[i]) != 0) {
545       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
546     }
547   }
548   for(i=0; i<nodes; i++) {
549     void *good;
550     pthread_join(jobs[i], &good);
551     tcnt += (int)(vpsized_int)good;
552   }
553   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
554   return 0;
555 }
556 void
557 stratcon_datastore_iep_check_preload() {
558   eventer_t e;
559   conn_pool *cpool;
560
561   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
562   e = eventer_alloc();
563   e->mask = EVENTER_ASYNCH;
564   e->callback = stratcon_datastore_asynch_drive_iep;
565   e->closure = NULL;
566   eventer_add_asynch(cpool->jobq, e);
567 }
568 execute_outcome_t
569 stratcon_datastore_find(ds_rt_detail *d) {
570   conn_q *cq;
571   char *val;
572   int row_count;
573   struct realtime_tracker *node;
574
575   for(node = d->rt; node; node = node->next) {
576     char uuid_str[UUID_STR_LEN+1];
577     const char *fqdn, *dsn, *remote_cn;
578     char remote_ip[32];
579     int storagenode_id;
580
581     uuid_unparse_lower(node->checkid, uuid_str);
582     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
583                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
584       continue;
585
586     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n",
587           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
588
589     /* We might be able to find the IP from our config if someone has
590      * specified the expected cn in the noit definition.
591      */
592     if(stratcon_find_noit_ip_by_cn(remote_cn,
593                                    remote_ip, sizeof(remote_ip)) == 0) {
594       node->noit = strdup(remote_ip);
595       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
596       continue;
597     }
598
599     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
600     stratcon_database_connect(cq);
601
602     GET_QUERY(check_find);
603     DECLARE_PARAM_INT(node->sid);
604     PG_EXEC(check_find);
605     row_count = PQntuples(d->res);
606     if(row_count != 1) {
607       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
608       PQclear(d->res);
609       goto bad_row;
610     }
611
612     /* Get the remote_address (which noit owns this) */
613     PG_GET_STR_COL(val, 0, "remote_address");
614     if(!val) {
615       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
616       PQclear(d->res);
617       goto bad_row;
618     }
619     node->noit = strdup(val);
620     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
621    bad_row:
622     free_params((ds_single_detail *)d);
623     d->nparams = 0;
624     release_conn_q(cq);
625   }
626   return DS_EXEC_SUCCESS;
627 }
628 execute_outcome_t
629 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
630                            ds_line_detail *d) {
631   int type, len, sid;
632   char *final_buff;
633   uLong final_len, actual_final_len;
634   char *token;
635   char raddr_blank[1] = "";
636   const char *raddr;
637
638   type = d->data[0];
639   raddr = r ? r : raddr_blank;
640
641   /* Parse the log line, but only if we haven't already */
642   if(!d->nparams) {
643     char *scp, *ecp;
644
645     scp = d->data;
646 #define PROCESS_NEXT_FIELD(t,l) do { \
647   if(!*scp) goto bad_row; \
648   ecp = strchr(scp, '\t'); \
649   if(!ecp) goto bad_row; \
650   token = scp; \
651   len = (ecp-scp); \
652   scp = ecp + 1; \
653 } while(0)
654 #define PROCESS_LAST_FIELD(t,l) do { \
655   if(!*scp) ecp = scp; \
656   else { \
657     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
658     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
659   } \
660   t = scp; \
661   l = (ecp-scp); \
662 } while(0)
663
664     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
665     switch(type) {
666       /* See noit_check_log.c for log description */
667       case 'n':
668         DECLARE_PARAM_STR(raddr, strlen(raddr));
669         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
670         DECLARE_PARAM_STR("noitd",5); /* node_type */
671         PROCESS_NEXT_FIELD(token,len);
672         d->whence = (time_t)strtoul(token, NULL, 10);
673         DECLARE_PARAM_STR(token,len); /* timestamp */
674
675         /* This is the expected uncompressed len */
676         PROCESS_NEXT_FIELD(token,len);
677         final_len = atoi(token);
678         final_buff = malloc(final_len);
679         if(!final_buff) goto bad_row;
680  
681         /* The last token is b64 endoded and compressed.
682          * we need to decode it, declare it and then free it.
683          */
684         PROCESS_LAST_FIELD(token, len);
685         /* We can in-place decode this */
686         len = noit_b64_decode((char *)token, len,
687                               (unsigned char *)token, len);
688         if(len <= 0) {
689           noitL(noit_error, "noitd config base64 decoding error.\n");
690           free(final_buff);
691           goto bad_row;
692         }
693         actual_final_len = final_len;
694         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
695                               (unsigned char *)token, len)) {
696           noitL(noit_error, "noitd config decompression failure.\n");
697           free(final_buff);
698           goto bad_row;
699         }
700         if(final_len != actual_final_len) {
701           noitL(noit_error, "noitd config decompression error.\n");
702           free(final_buff);
703           goto bad_row;
704         }
705         DECLARE_PARAM_STR(final_buff, final_len);
706         free(final_buff);
707         break;
708       case 'C':
709         DECLARE_PARAM_STR(raddr, strlen(raddr));
710         PROCESS_NEXT_FIELD(token,len);
711         DECLARE_PARAM_STR(token,len); /* timestamp */
712         d->whence = (time_t)strtoul(token, NULL, 10);
713         PROCESS_NEXT_FIELD(token, len);
714         /* uuid is last 36 bytes */
715         if(len > 36) { token += (len-36); len = 36; }
716         sid = uuid_to_sid(token, remote_cn);
717         if(sid == 0) goto bad_row;
718         DECLARE_PARAM_INT(sid); /* sid */
719         DECLARE_PARAM_STR(token,len); /* uuid */
720         PROCESS_NEXT_FIELD(token, len);
721         DECLARE_PARAM_STR(token,len); /* target */
722         PROCESS_NEXT_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* module */
724         PROCESS_LAST_FIELD(token, len);
725         DECLARE_PARAM_STR(token,len); /* name */
726         break;
727       case 'M':
728         PROCESS_NEXT_FIELD(token,len);
729         DECLARE_PARAM_STR(token,len); /* timestamp */
730         d->whence = (time_t)strtoul(token, NULL, 10);
731         PROCESS_NEXT_FIELD(token, len);
732         /* uuid is last 36 bytes */
733         if(len > 36) { token += (len-36); len = 36; }
734         sid = uuid_to_sid(token, remote_cn);
735         if(sid == 0) goto bad_row;
736         DECLARE_PARAM_INT(sid); /* sid */
737         PROCESS_NEXT_FIELD(token, len);
738         DECLARE_PARAM_STR(token,len); /* name */
739         PROCESS_NEXT_FIELD(token,len);
740         d->metric_type = *token;
741         PROCESS_LAST_FIELD(token,len);
742         DECLARE_PARAM_STR(token,len); /* value */
743         break;
744       case 'S':
745         PROCESS_NEXT_FIELD(token,len);
746         DECLARE_PARAM_STR(token,len); /* timestamp */
747         d->whence = (time_t)strtoul(token, NULL, 10);
748         PROCESS_NEXT_FIELD(token, len);
749         /* uuid is last 36 bytes */
750         if(len > 36) { token += (len-36); len = 36; }
751         sid = uuid_to_sid(token, remote_cn);
752         if(sid == 0) goto bad_row;
753         DECLARE_PARAM_INT(sid); /* sid */
754         PROCESS_NEXT_FIELD(token, len);
755         DECLARE_PARAM_STR(token,len); /* state */
756         PROCESS_NEXT_FIELD(token, len);
757         DECLARE_PARAM_STR(token,len); /* availability */
758         PROCESS_NEXT_FIELD(token, len);
759         DECLARE_PARAM_STR(token,len); /* duration */
760         PROCESS_LAST_FIELD(token,len);
761         DECLARE_PARAM_STR(token,len); /* status */
762         break;
763       default:
764         goto bad_row;
765     }
766
767   }
768
769   /* Now execute the query */
770   switch(type) {
771     case 'n':
772       GET_QUERY(config_insert);
773       PG_EXEC(config_insert);
774       PQclear(d->res);
775       break;
776     case 'C':
777       GET_QUERY(check_insert);
778       PG_TM_EXEC(check_insert, d->whence);
779       PQclear(d->res);
780       break;
781     case 'S':
782       GET_QUERY(status_insert);
783       PG_TM_EXEC(status_insert, d->whence);
784       PQclear(d->res);
785       break;
786     case 'M':
787       switch(d->metric_type) {
788         case METRIC_INT32:
789         case METRIC_UINT32:
790         case METRIC_INT64:
791         case METRIC_UINT64:
792         case METRIC_DOUBLE:
793           GET_QUERY(metric_insert_numeric);
794           PG_TM_EXEC(metric_insert_numeric, d->whence);
795           PQclear(d->res);
796           break;
797         case METRIC_STRING:
798           GET_QUERY(metric_insert_text);
799           PG_TM_EXEC(metric_insert_text, d->whence);
800           PQclear(d->res);
801           break;
802         default:
803           goto bad_row;
804       }
805       break;
806     default:
807       /* should never get here */
808       goto bad_row;
809   }
810   return DS_EXEC_SUCCESS;
811  bad_row:
812   return DS_EXEC_ROW_FAILED;
813 }
814 static int
815 stratcon_database_post_connect(conn_q *cq) {
816   int rv = 0;
817   ds_single_detail _d = { 0 }, *d = &_d;
818   if(cq->fqdn) {
819     char *remote_str, *remote_cn;
820     /* This is the silly way we get null's in through our declare_param_str */
821     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
822     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
823     /* This is a storage node, it gets the storage node post_connect */
824     GET_QUERY(storage_post_connect);
825     rv = -1; /* now we're serious */
826     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
827     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
828     PG_EXEC(storage_post_connect);
829     PQclear(d->res);
830     rv = 0;
831   }
832   else {
833     /* Metanode post_connect */
834     GET_QUERY(metanode_post_connect);
835     rv = -1; /* now we're serious */
836     PG_EXEC(metanode_post_connect);
837     PQclear(d->res);
838     rv = 0;
839   }
840  bad_row:
841   free_params(d);
842   if(rv == -1) {
843     /* Post-connect intentions are serious and fatal */
844     PQfinish(cq->dbh);
845     cq->dbh = NULL;
846   }
847   return rv;
848 }
849 static int
850 stratcon_database_connect(conn_q *cq) {
851   char *dsn, dsn_meta[512];
852   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
853   const char *k, *v;
854   int klen;
855   noit_hash_table *t;
856
857   dsn_meta[0] = '\0';
858   if(!cq->dsn) {
859     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
860     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
861       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
862       strlcat(dsn_meta, k, sizeof(dsn_meta));
863       strlcat(dsn_meta, "=", sizeof(dsn_meta));
864       strlcat(dsn_meta, v, sizeof(dsn_meta));
865     }
866     noit_hash_destroy(t, free, free);
867     free(t);
868     dsn = dsn_meta;
869   }
870   else {
871     char options[32];
872     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
873     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
874                                options, sizeof(options))) {
875       strlcat(dsn_meta, " ", sizeof(dsn_meta));
876       strlcat(dsn_meta, "user", sizeof(dsn_meta));
877       strlcat(dsn_meta, "=", sizeof(dsn_meta));
878       strlcat(dsn_meta, options, sizeof(dsn_meta));
879     }
880     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
881                                options, sizeof(options))) {
882       strlcat(dsn_meta, " ", sizeof(dsn_meta));
883       strlcat(dsn_meta, "password", sizeof(dsn_meta));
884       strlcat(dsn_meta, "=", sizeof(dsn_meta));
885       strlcat(dsn_meta, options, sizeof(dsn_meta));
886     }
887     dsn = dsn_meta;
888   }
889
890   if(cq->dbh) {
891     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
892     PQreset(cq->dbh);
893     if(PQstatus(cq->dbh) != CONNECTION_OK) {
894       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
895             dsn, PQerrorMessage(cq->dbh));
896       return -1;
897     }
898     if(stratcon_database_post_connect(cq)) return -1;
899     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
900     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
901           dsn, PQerrorMessage(cq->dbh));
902     return -1;
903   }
904
905   cq->dbh = PQconnectdb(dsn);
906   if(!cq->dbh) return -1;
907   if(PQstatus(cq->dbh) != CONNECTION_OK) {
908     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
909           dsn, PQerrorMessage(cq->dbh));
910     return -1;
911   }
912   if(stratcon_database_post_connect(cq)) return -1;
913   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
914   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
915         dsn, PQerrorMessage(cq->dbh));
916   return -1;
917 }
918 static int
919 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
920                                 const char *name) {
921   int rv = -1;
922   PGresult *res;
923   char cmd[128];
924   strlcpy(cmd, p, sizeof(cmd));
925   strlcat(cmd, name, sizeof(cmd));
926   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
927   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
928   PQclear(res);
929   return rv;
930 }
931 static int
932 stratcon_datastore_do(conn_q *cq, const char *cmd) {
933   PGresult *res;
934   int rv = -1;
935   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
936   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
937   PQclear(res);
938   return rv;
939 }
940 #define BUSTED(cq) do { \
941   PQfinish((cq)->dbh); \
942   (cq)->dbh = NULL; \
943   goto full_monty; \
944 } while(0)
945 #define SAVEPOINT(name) do { \
946   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
947   last_sp = current; \
948 } while(0)
949 #define ROLLBACK_TO_SAVEPOINT(name) do { \
950   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
951     BUSTED(cq); \
952   last_sp = NULL; \
953 } while(0)
954 #define RELEASE_SAVEPOINT(name) do { \
955   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
956     BUSTED(cq); \
957   last_sp = NULL; \
958 } while(0)
959 int
960 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
961                                  struct timeval *now) {
962   ds_rt_detail *dsjd = closure;
963   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
964   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
965
966   assert(dsjd->rt);
967   stratcon_datastore_find(dsjd);
968   if(dsjd->completion_event)
969     eventer_add(dsjd->completion_event);
970
971   free_params((ds_single_detail *)dsjd);
972   free(dsjd);
973   return 0;
974 }
975 static const char *
976 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
977   void *vinfo;
978   char *dsn = NULL, *fqdn = NULL;
979   int found = 0;
980   storagenode_info *info = NULL;
981   pthread_mutex_lock(&storagenode_to_info_cache_lock);
982   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
983                         &vinfo)) {
984     found = 1;
985     info = vinfo;
986   }
987   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
988   if(found) {
989     if(fqdn_out) *fqdn_out = info->fqdn;
990     return info->dsn;
991   }
992
993   if(!found && can_use_db) {
994     ds_single_detail *d;
995     conn_q *cq;
996     int row_count;
997     /* Look it up and store it */
998     d = calloc(1, sizeof(*d));
999     cq = get_conn_q_for_metanode();
1000     GET_QUERY(find_storage);
1001     DECLARE_PARAM_INT(id);
1002     PG_EXEC(find_storage);
1003     row_count = PQntuples(d->res);
1004     if(row_count) {
1005       PG_GET_STR_COL(dsn, 0, "dsn");
1006       PG_GET_STR_COL(fqdn, 0, "fqdn");
1007       fqdn = fqdn ? strdup(fqdn) : NULL;
1008       dsn = dsn ? strdup(dsn) : NULL;
1009     }
1010     PQclear(d->res);
1011    bad_row:
1012     free_params(d);
1013     free(d);
1014     release_conn_q(cq);
1015   }
1016   if(fqdn) {
1017     info = calloc(1, sizeof(*info));
1018     info->fqdn = fqdn;
1019     if(fqdn_out) *fqdn_out = info->fqdn;
1020     info->dsn = dsn;
1021     info->storagenode_id = id;
1022     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1023     noit_hash_store(&storagenode_to_info_cache,
1024                     (void *)&info->storagenode_id, sizeof(int), info);
1025     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1026   }
1027   return info ? info->dsn : NULL;
1028 }
1029 static ds_line_detail *
1030 build_insert_batch(interim_journal_t *ij) {
1031   int rv;
1032   off_t len;
1033   const char *buff, *cp, *lcp;
1034   struct stat st;
1035   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1036
1037   if(ij->fd < 0) {
1038     ij->fd = open(ij->filename, O_RDONLY);
1039     if(ij->fd < 0) {
1040       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1041             ij->filename, strerror(errno));
1042       assert(ij->fd >= 0);
1043     }
1044   }
1045   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1046   if(rv == -1) {
1047       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1048             ij->filename, strerror(errno));
1049     assert(rv != -1);
1050   }
1051   len = st.st_size;
1052   if(len > 0) {
1053     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1054     if(buff == (void *)-1) {
1055       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1056             ij->filename, strerror(errno));
1057       assert(buff != (void *)-1);
1058     }
1059     lcp = buff;
1060     while(lcp < (buff + len) &&
1061           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1062       next = calloc(1, sizeof(*next));
1063       next->data = malloc(cp - lcp + 1);
1064       memcpy(next->data, lcp, cp - lcp);
1065       next->data[cp - lcp] = '\0';
1066       if(!head) head = next;
1067       if(last) last->next = next;
1068       last = next;
1069       lcp = cp + 1;
1070     }
1071     munmap((void *)buff, len);
1072   }
1073   close(ij->fd);
1074   return head;
1075 }
1076 static void
1077 interim_journal_remove(interim_journal_t *ij) {
1078   unlink(ij->filename);
1079   if(ij->filename) free(ij->filename);
1080   if(ij->remote_str) free(ij->remote_str);
1081   if(ij->remote_cn) free(ij->remote_cn);
1082   if(ij->fqdn) free(ij->fqdn);
1083   free(ij);
1084 }
1085 int
1086 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1087                                   struct timeval *now) {
1088   int i, total, success, sp_total, sp_success;
1089   interim_journal_t *ij;
1090   ds_line_detail *head = NULL, *current, *last_sp;
1091   const char *dsn;
1092   conn_q *cq;
1093   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1094   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1095
1096   ij = closure;
1097   if(ij->fqdn == NULL) {
1098     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1099     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1100   }
1101   else {
1102     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL);
1103   }
1104   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1105                              ij->fqdn, dsn);
1106   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1107         ij->remote_str, ij->remote_cn, ij->fqdn);
1108  full_monty:
1109   /* Make sure we have a connection */
1110   i = 1;
1111   while(stratcon_database_connect(cq)) {
1112     noitL(noit_error, "Error connecting to database: %s\n",
1113           ij->fqdn ? ij->fqdn : "(null)");
1114     sleep(i);
1115     i *= 2;
1116     i = MIN(i, 16);
1117   }
1118
1119   if(head == NULL) head = build_insert_batch(ij);
1120   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1121         ij->remote_str ? ij->remote_str : "(null)",
1122         ij->remote_cn ? ij->remote_cn : "(null)",
1123         ij->fqdn ? ij->fqdn : "(null)");
1124   current = head;
1125   last_sp = NULL;
1126   total = success = sp_total = sp_success = 0;
1127   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1128   while(current) {
1129     execute_outcome_t rv;
1130     if(current->data) {
1131       if(!last_sp) {
1132         SAVEPOINT("batch");
1133         sp_success = success;
1134         sp_total = total;
1135       }
1136  
1137       if(current->problematic) {
1138         RELEASE_SAVEPOINT("batch");
1139         current = current->next;
1140         total++;
1141         continue;
1142       }
1143       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1144                                       current);
1145       switch(rv) {
1146         case DS_EXEC_SUCCESS:
1147           total++;
1148           success++;
1149           current = current->next;
1150           break;
1151         case DS_EXEC_ROW_FAILED:
1152           /* rollback to savepoint, mark this record as bad and start again */
1153           if(current->data[0] != 'n')
1154             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1155           current->problematic = 1;
1156           current = last_sp;
1157           success = sp_success;
1158           total = sp_total;
1159           ROLLBACK_TO_SAVEPOINT("batch");
1160           break;
1161         case DS_EXEC_TXN_FAILED:
1162           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1163           BUSTED(cq);
1164       }
1165     }
1166   }
1167   if(last_sp) RELEASE_SAVEPOINT("batch");
1168   if(stratcon_datastore_do(cq, "COMMIT")) {
1169     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1170     BUSTED(cq);
1171   }
1172   /* Cleanup the mess */
1173   while(head) {
1174     ds_line_detail *tofree;
1175     tofree = head;
1176     head = head->next;
1177     if(tofree->data) free(tofree->data);
1178     free_params((ds_single_detail *)tofree);
1179     free(tofree);
1180   }
1181   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1182         ij->remote_str ? ij->remote_str : "(null)",
1183         ij->remote_cn ? ij->remote_cn : "(null)",
1184         ij->fqdn ? ij->fqdn : "(null)", success, total);
1185   interim_journal_remove(ij);
1186   release_conn_q(cq);
1187   return 0;
1188 }
1189 static int
1190 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1191                                 struct timeval *now) {
1192   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1193   const char *k;
1194   int klen;
1195   void *vij;
1196   interim_journal_t *ij;
1197   syncset_t *syncset = closure;
1198
1199   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1200     if(syncset->completion) {
1201       eventer_add(syncset->completion);
1202       eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1203     }
1204     free(syncset);
1205     return 0;
1206   }
1207   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1208
1209   noitL(ds_deb, "Syncing journal sets...\n");
1210   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1211     eventer_t ingest;
1212     ij = vij;
1213     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1214           ij->remote_str, ij->remote_cn, ij->fqdn);
1215     fsync(ij->fd);
1216     close(ij->fd);
1217     ij->fd = -1;
1218     ingest = eventer_alloc();
1219     ingest->mask = EVENTER_ASYNCH;
1220     ingest->callback = stratcon_datastore_asynch_execute;
1221     ingest->closure = ij;
1222     eventer_add_asynch(ij->cpool->jobq, ingest);
1223   }
1224   noit_hash_destroy(syncset->ws, free, NULL);
1225   free(syncset->ws);
1226   return 0;
1227 }
1228 static interim_journal_t *
1229 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1230                     int storagenode_id, const char *fqdn_in) {
1231   void *vhash, *vij;
1232   noit_hash_table *working_set;
1233   interim_journal_t *ij;
1234   struct timeval now;
1235   char jpath[PATH_MAX];
1236   char remote_str[128];
1237   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1238   const char *fqdn = fqdn_in ? fqdn_in : "default";
1239
1240   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1241   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1242
1243   /* Lookup the working set */
1244   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1245     working_set = calloc(1, sizeof(*working_set));
1246     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1247                     working_set);
1248   }
1249   else
1250     working_set = vhash;
1251
1252   /* Lookup the interim journal within the working set */
1253   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1254     ij = calloc(1, sizeof(*ij));
1255     gettimeofday(&now, NULL);
1256     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1257              basejpath, remote_str, remote_cn, storagenode_id,
1258              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1259     ij->remote_str = strdup(remote_str);
1260     ij->remote_cn = strdup(remote_cn);
1261     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1262     ij->storagenode_id = storagenode_id;
1263     ij->filename = strdup(jpath);
1264     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1265                                          ij->fqdn);
1266     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1267     if(ij->fd < 0 && errno == ENOENT) {
1268       if(mkdir_for_file(ij->filename, 0750)) {
1269         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1270               ij->filename, strerror(errno));
1271         exit(-1);
1272       }
1273       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1274     }
1275     if(ij->fd < 0) {
1276       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1277             ij->filename, strerror(errno));
1278       exit(-1);
1279     }
1280     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1281   }
1282   else
1283     ij = vij;
1284
1285   return ij;
1286 }
1287 static int
1288 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1289                           int *sid_out, int *storagenode_id_out,
1290                           const char **remote_cn_out,
1291                           const char **fqdn_out, const char **dsn_out) {
1292   /* only called from the main thread -- no safety issues */
1293   void *vuuidinfo, *vinfo;
1294   uuid_info *uuidinfo;
1295   storagenode_info *info = NULL;
1296   char *fqdn = NULL;
1297   char *dsn = NULL;
1298   char *new_remote_cn = NULL;
1299   int storagenode_id = 0, sid = 0;
1300   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1301                          &vuuidinfo)) {
1302     int row_count = 0;
1303     char *tmpint;
1304     ds_single_detail *d;
1305     conn_q *cq;
1306
1307     /* We can't do a database lookup without the remote_cn */
1308     if(!remote_cn) {
1309       if(stratcon_datastore_get_enabled()) {
1310         /* We have an authoritatively maintained cache, we don't do lookups */
1311         return -1;
1312       }
1313       else
1314         remote_cn = "[[null]]";
1315     }
1316
1317     d = calloc(1, sizeof(*d));
1318     cq = get_conn_q_for_metanode();
1319     if(stratcon_database_connect(cq) == 0) {
1320       /* Blocking call to service the cache miss */
1321       GET_QUERY(check_map);
1322       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1323       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1324       PG_EXEC(check_map);
1325       row_count = PQntuples(d->res);
1326       if(row_count != 1) {
1327         PQclear(d->res);
1328         goto bad_row;
1329       }
1330       PG_GET_STR_COL(tmpint, 0, "sid");
1331       if(!tmpint) {
1332         row_count = 0;
1333         PQclear(d->res);
1334         goto bad_row;
1335       }
1336       sid = atoi(tmpint);
1337       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1338       if(tmpint) storagenode_id = atoi(tmpint);
1339       PG_GET_STR_COL(fqdn, 0, "fqdn");
1340       PG_GET_STR_COL(dsn, 0, "dsn");
1341       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1342       fqdn = fqdn ? strdup(fqdn) : NULL;
1343       dsn = dsn ? strdup(dsn) : NULL;
1344       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1345       PQclear(d->res);
1346     }
1347    bad_row:
1348     free_params((ds_single_detail *)d);
1349     free(d);
1350     release_conn_q(cq);
1351     if(row_count != 1) {
1352       return -1;
1353     }
1354     /* Place in cache */
1355     uuidinfo = calloc(1, sizeof(*uuidinfo));
1356     uuidinfo->sid = sid;
1357     uuidinfo->uuid_str = strdup(uuid_str);
1358     uuidinfo->storagenode_id = storagenode_id;
1359     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1360     noit_hash_store(&uuid_to_info_cache,
1361                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1362     /* Also, we may have just witnessed a new storage node, store it */
1363     if(storagenode_id) {
1364       int needs_free = 0;
1365       info = calloc(1, sizeof(*info));
1366       info->storagenode_id = storagenode_id;
1367       info->dsn = dsn ? strdup(dsn) : NULL;
1368       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1369       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1370       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1371                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1372         /* hack to save memory -- we *never* remove from these caches,
1373            so we can use the same fqdn value in the above cache for the key
1374            in the cache below -- (no strdup) */
1375         noit_hash_store(&storagenode_to_info_cache,
1376                         (void *)&info->storagenode_id, sizeof(int), info);
1377       }
1378       else needs_free = 1;
1379       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1380       if(needs_free) {
1381         if(info->dsn) free(info->dsn);
1382         if(info->fqdn) free(info->fqdn);
1383         free(info);
1384       }
1385     }
1386   }
1387   else
1388     uuidinfo = vuuidinfo;
1389
1390   if(uuidinfo && uuidinfo->storagenode_id) {
1391     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1392       /* we don't have dsn and we actually want it */
1393       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1394       if(noit_hash_retrieve(&storagenode_to_info_cache,
1395                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1396                             &vinfo))
1397         info = vinfo;
1398       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1399     }
1400   }
1401
1402   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1403   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1404   assert(uuidinfo);
1405   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1406   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1407   if(sid_out) *sid_out = uuidinfo->sid;
1408   if(fqdn) free(fqdn);
1409   if(dsn) free(dsn);
1410   if(new_remote_cn) free(new_remote_cn);
1411   return 0;
1412 }
1413 static int
1414 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1415   char uuid_str[UUID_STR_LEN+1];
1416   int sid = 0;
1417   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1418   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1419   return sid;
1420 }
1421 static void
1422 stratcon_datastore_journal(struct sockaddr *remote,
1423                            const char *remote_cn, char *line) {
1424   interim_journal_t *ij = NULL;
1425   char uuid_str[UUID_STR_LEN+1], *cp1, *cp2;
1426   const char *fqdn = NULL, *dsn = NULL;
1427   int storagenode_id = 0;
1428   uuid_t checkid;
1429   if(!line) return;
1430   /* if it is a UUID based thing, find the storage node */
1431   switch(*line) {
1432     case 'C':
1433     case 'S':
1434     case 'M':
1435       if(line[1] == '\t' && (cp1 = strchr(line+2, '\t')) != NULL &&
1436          (cp2 = strchr(cp1+1, '\t')) != NULL &&
1437          (cp2-cp1 >= UUID_STR_LEN)) {
1438         strlcpy(uuid_str, cp2 - UUID_STR_LEN, sizeof(uuid_str));
1439         if(!uuid_parse(uuid_str, checkid)) {
1440           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1441                                     &storagenode_id, NULL, &fqdn, &dsn);
1442           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1443         }
1444       }
1445       break;
1446     case 'n':
1447       ij = interim_journal_get(remote,remote_cn,0,NULL);
1448       break;
1449     default:
1450       break;
1451   }
1452   if(!ij) {
1453     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1454   }
1455   else {
1456     int len;
1457     len = write(ij->fd, line, strlen(line));
1458     if(len < 0) {
1459       noitL(noit_error, "write to %s failed: %s\n",
1460             ij->filename, strerror(errno));
1461     }
1462   }
1463   free(line);
1464   return;
1465 }
1466 static noit_hash_table *
1467 stratcon_datastore_journal_remove(struct sockaddr *remote,
1468                                   const char *remote_cn) {
1469   void *vhash = NULL;
1470   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1471     /* pluck it out */
1472     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1473   }
1474   else {
1475     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1476           remote_cn);
1477     abort();
1478   }
1479   return vhash;
1480 }
1481 void
1482 stratcon_datastore_push(stratcon_datastore_op_t op,
1483                         struct sockaddr *remote,
1484                         const char *remote_cn, void *operand,
1485                         eventer_t completion) {
1486   conn_pool *cpool;
1487   syncset_t *syncset;
1488   eventer_t e;
1489   ds_rt_detail *rtdetail;
1490   struct datastore_onlooker_list *nnode;
1491
1492   for(nnode = onlookers; nnode; nnode = nnode->next)
1493     nnode->dispatch(op,remote,remote_cn,operand);
1494
1495   switch(op) {
1496     case DS_OP_INSERT:
1497       stratcon_datastore_journal(remote, remote_cn, (char *)operand);
1498       break;
1499     case DS_OP_CHKPT:
1500       e = eventer_alloc();
1501       syncset = calloc(1, sizeof(*syncset));
1502       e->mask = EVENTER_ASYNCH;
1503       e->callback = stratcon_datastore_journal_sync;
1504       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1505       syncset->completion = completion;
1506       e->closure = syncset;
1507       eventer_add(e);
1508       break;
1509     case DS_OP_FIND_COMPLETE:
1510       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1511       rtdetail = calloc(1, sizeof(*rtdetail));
1512       rtdetail->rt = operand;
1513       rtdetail->completion_event = completion;
1514       e = eventer_alloc();
1515       e->mask = EVENTER_ASYNCH;
1516       e->callback = stratcon_datastore_asynch_lookup;
1517       e->closure = rtdetail;
1518       eventer_add_asynch(cpool->jobq, e);
1519       break;
1520   }
1521 }
1522
1523 int
1524 stratcon_datastore_saveconfig(void *unused) {
1525   int rv = -1;
1526   char *buff;
1527   ds_single_detail _d = { 0 }, *d = &_d;
1528   conn_q *cq;
1529   char ipv4_str[32];
1530   struct in_addr r, l;
1531
1532   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1533   memset(&l, 0, sizeof(l));
1534   noit_getip_ipv4(r, &l);
1535   /* Ignore the error.. what are we going to do anyway */
1536   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1537     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1538
1539   cq = get_conn_q_for_metanode();
1540
1541   if(stratcon_database_connect(cq) == 0) {
1542     char time_as_str[20];
1543     size_t len;
1544     buff = noit_conf_xml_in_mem(&len);
1545     if(!buff) goto bad_row;
1546
1547     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1548     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1549     DECLARE_PARAM_STR("", 0);
1550     DECLARE_PARAM_STR("stratcond", 9);
1551     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1552     DECLARE_PARAM_STR(buff, len);
1553     free(buff);
1554
1555     GET_QUERY(config_insert);
1556     PG_EXEC(config_insert);
1557     PQclear(d->res);
1558     rv = 0;
1559
1560     bad_row:
1561       free_params(d);
1562   }
1563   release_conn_q(cq);
1564   return rv;
1565 }
1566
1567 void
1568 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1569                                                struct sockaddr *,
1570                                                const char *, void *)) {
1571   struct datastore_onlooker_list *nnode;
1572   volatile void **vonlookers = (void *)&onlookers;
1573   nnode = calloc(1, sizeof(*nnode));
1574   nnode->dispatch = f;
1575   nnode->next = onlookers;
1576   while(noit_atomic_casptr(vonlookers,
1577                            nnode, nnode->next) != (void *)nnode->next)
1578     nnode->next = onlookers;
1579 }
1580 static void
1581 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1582                                          char *id_str, char *file) {
1583   char path[PATH_MAX];
1584   interim_journal_t *ij;
1585   eventer_t ingest;
1586
1587   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1588            basejpath, remote_str, remote_cn, id_str, file);
1589   ij = calloc(1, sizeof(*ij));
1590   ij->fd = open(path, O_RDONLY);
1591   if(ij->fd < 0) {
1592     noitL(noit_error, "cannot open journal '%s': %s\n",
1593           path, strerror(errno));
1594     free(ij);
1595     return;
1596   }
1597   close(ij->fd);
1598   ij->fd = -1;
1599   ij->filename = strdup(path);
1600   ij->remote_str = strdup(remote_str);
1601   ij->remote_cn = strdup(remote_cn);
1602   ij->storagenode_id = atoi(id_str);
1603   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1604                                        ij->fqdn);
1605   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1606   ingest = eventer_alloc();
1607   ingest->mask = EVENTER_ASYNCH;
1608   ingest->callback = stratcon_datastore_asynch_execute;
1609   ingest->closure = ij;
1610   eventer_add_asynch(ij->cpool->jobq, ingest);
1611 }
1612 static void
1613 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1614   char path[PATH_MAX];
1615   DIR *root;
1616   struct dirent *de, *entry;
1617   int i = 0, cnt = 0;
1618   char **entries;
1619   int size = 0;
1620
1621   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1622            first ? "/" : "", first ? first : "",
1623            second ? "/" : "", second ? second : "",
1624            third ? "/" : "", third ? third : "");
1625 #ifdef _PC_NAME_MAX
1626   size = pathconf(path, _PC_NAME_MAX);
1627 #endif
1628   size = MIN(size, PATH_MAX + 128);
1629   de = alloca(size);
1630   root = opendir(path);
1631   if(!root) return;
1632   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1633   closedir(root);
1634   root = opendir(path);
1635   if(!root) return;
1636   entries = malloc(sizeof(*entries) * cnt);
1637   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1638     if(i < cnt) {
1639       entries[i++] = strdup(entry->d_name);
1640     }
1641   }
1642   closedir(root);
1643   cnt = i; /* could have changed, directories are fickle */
1644   qsort(entries, i, sizeof(*entries),
1645         (int (*)(const void *, const void *))strcasecmp);
1646   for(i=0; i<cnt; i++) {
1647     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1648     noitL(ds_deb, "Processing L%d entry '%s'\n",
1649           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1650     if(!first)
1651       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1652     else if(!second)
1653       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1654     else if(!third)
1655       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1656     else if(strlen(entries[i]) == 16)
1657       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1658   }
1659   for(i=0; i<cnt; i++)
1660     free(entries[i]);
1661   free(entries);
1662 }
1663 static void
1664 stratcon_datastore_sweep_journals() {
1665   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1666 }
1667
1668 int
1669 stratcon_datastore_ingest_all_storagenode_info() {
1670   int i, cnt = 0;
1671   ds_single_detail _d = { 0 }, *d = &_d;
1672   conn_q *cq;
1673   cq = get_conn_q_for_metanode();
1674
1675   while(stratcon_database_connect(cq)) {
1676     noitL(noit_error, "Error connecting to database\n");
1677     sleep(1);
1678   }
1679
1680   GET_QUERY(all_storage);
1681   PG_EXEC(all_storage);
1682   cnt = PQntuples(d->res);
1683   for(i=0; i<cnt; i++) {
1684     void *vinfo;
1685     char *tmpint, *fqdn, *dsn;
1686     int storagenode_id;
1687     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1688     storagenode_id = atoi(tmpint);
1689     PG_GET_STR_COL(fqdn, i, "fqdn");
1690     PG_GET_STR_COL(dsn, i, "dsn");
1691     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1692     storagenode_id = tmpint ? atoi(tmpint) : 0;
1693
1694     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1695                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1696       storagenode_info *info;
1697       info = calloc(1, sizeof(*info));
1698       info->storagenode_id = storagenode_id;
1699       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1700       info->dsn = dsn ? strdup(dsn) : NULL;
1701       noit_hash_store(&storagenode_to_info_cache,
1702                       (void *)&info->storagenode_id, sizeof(int), info);
1703       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1704             info->storagenode_id,
1705             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1706     }
1707   }
1708   PQclear(d->res);
1709  bad_row:
1710   free_params(d);
1711
1712   release_conn_q(cq);
1713   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1714   return cnt;
1715 }
1716 int
1717 stratcon_datastore_ingest_all_check_info() {
1718   int i, cnt, loaded = 0;
1719   ds_single_detail _d = { 0 }, *d = &_d;
1720   conn_q *cq;
1721   cq = get_conn_q_for_metanode();
1722
1723   while(stratcon_database_connect(cq)) {
1724     noitL(noit_error, "Error connecting to database\n");
1725     sleep(1);
1726   }
1727
1728   GET_QUERY(check_mapall);
1729   PG_EXEC(check_mapall);
1730   cnt = PQntuples(d->res);
1731   for(i=0; i<cnt; i++) {
1732     void *vinfo;
1733     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1734     int sid, storagenode_id;
1735     uuid_info *uuidinfo;
1736     PG_GET_STR_COL(uuid_str, i, "id");
1737     if(!uuid_str) continue;
1738     PG_GET_STR_COL(tmpint, i, "sid");
1739     if(!tmpint) continue;
1740     sid = atoi(tmpint);
1741     PG_GET_STR_COL(fqdn, i, "fqdn");
1742     PG_GET_STR_COL(dsn, i, "dsn");
1743     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1744     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1745     storagenode_id = tmpint ? atoi(tmpint) : 0;
1746
1747     uuidinfo = calloc(1, sizeof(*uuidinfo));
1748     uuidinfo->uuid_str = strdup(uuid_str);
1749     uuidinfo->remote_cn = strdup(remote_cn);
1750     uuidinfo->storagenode_id = storagenode_id;
1751     uuidinfo->sid = sid;
1752     noit_hash_store(&uuid_to_info_cache,
1753                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1754     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1755           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1756     loaded++;
1757     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1758                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1759       storagenode_info *info;
1760       info = calloc(1, sizeof(*info));
1761       info->storagenode_id = storagenode_id;
1762       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1763       info->dsn = dsn ? strdup(dsn) : NULL;
1764       noit_hash_store(&storagenode_to_info_cache,
1765                       (void *)&info->storagenode_id, sizeof(int), info);
1766       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1767             info->storagenode_id,
1768             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1769     }
1770   }
1771   PQclear(d->res);
1772  bad_row:
1773   free_params(d);
1774
1775   release_conn_q(cq);
1776   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1777   return loaded;
1778 }
1779
1780 static int
1781 rest_get_noit_config(noit_http_rest_closure_t *restc,
1782                      int npats, char **pats) {
1783   noit_http_session_ctx *ctx = restc->http_ctx;
1784   ds_single_detail *d;
1785   int row_count = 0;
1786   const char *xml = NULL;
1787   conn_q *cq = NULL;
1788
1789   if(npats != 0) {
1790     noit_http_response_server_error(ctx, "text/xml");
1791     noit_http_response_end(ctx);
1792     return 0;
1793   }
1794   d = calloc(1, sizeof(*d));
1795   GET_QUERY(config_get);
1796   cq = get_conn_q_for_metanode();
1797   if(!cq) {
1798     noit_http_response_server_error(ctx, "text/xml");
1799     goto bad_row;
1800   }
1801
1802   DECLARE_PARAM_STR(restc->remote_cn,
1803                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1804   PG_EXEC(config_get);
1805   row_count = PQntuples(d->res);
1806   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1807
1808   if(xml == NULL) {
1809     char buff[1024];
1810     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1811                                  "<row_count>%d</row_count></error>\n",
1812              restc->remote_cn, row_count);
1813     noit_http_response_append(ctx, buff, strlen(buff));
1814     noit_http_response_not_found(ctx, "text/xml");
1815   }
1816   else {
1817     noit_http_response_append(ctx, xml, strlen(xml));
1818     noit_http_response_ok(ctx, "text/xml");
1819   }
1820  bad_row:
1821   free_params((ds_single_detail *)d);
1822   d->nparams = 0;
1823   if(cq) release_conn_q(cq);
1824
1825   noit_http_response_end(ctx);
1826   return 0;
1827 }
1828
1829 void
1830 stratcon_datastore_init() {
1831   pthread_mutex_init(&ds_conns_lock, NULL);
1832   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1833   ds_err = noit_log_stream_find("error/datastore");
1834   ds_deb = noit_log_stream_find("debug/datastore");
1835   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1836   ingest_err = noit_log_stream_find("error/ingest");
1837   if(!ds_err) ds_err = noit_error;
1838   if(!ingest_err) ingest_err = noit_error;
1839   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1840                            &basejpath)) {
1841     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1842     exit(-1);
1843   }
1844   stratcon_datastore_ingest_all_check_info();
1845   stratcon_datastore_ingest_all_storagenode_info();
1846   stratcon_datastore_sweep_journals();
1847
1848   assert(noit_http_rest_register_auth(
1849     "GET", "/noits/", "^config$", rest_get_noit_config,
1850              noit_http_rest_client_cert_auth
1851   ) == 0);
1852 }
1853
Note: See TracBrowser for help on using the browser.