root/src/stratcon_datastore.c

Revision aa31afe2fbea2bd377d8e244fd3516c9ee22986b, 57.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

leak

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 /* Thread-safe connection pools */
209
210 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
211 static void
212 release_conn_q_forceable(conn_q *cq, int forcefree) {
213   int putback = 0;
214   cq->last_use = time(NULL);
215   pthread_mutex_lock(&cq->pool->lock);
216   cq->pool->outstanding--;
217   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
218     putback = 1;
219     cq->next = cq->pool->head;
220     cq->pool->head = cq;
221     cq->pool->in_pool++;
222   }
223   pthread_mutex_unlock(&cq->pool->lock);
224   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
225         putback ? "to pool" : "and destroy", cq->pool->queue_name);
226   pthread_cond_signal(&cq->pool->cv);
227   if(putback) return;
228
229   /* Not put back, release it */
230   if(cq->dbh) PQfinish(cq->dbh);
231   if(cq->remote_str) free(cq->remote_str);
232   if(cq->remote_cn) free(cq->remote_cn);
233   if(cq->fqdn) free(cq->fqdn);
234   if(cq->dsn) free(cq->dsn);
235   free(cq);
236 }
237 static void
238 ttl_purge_conn_pool(conn_pool *pool) {
239   int old_cnt, new_cnt;
240   time_t now = time(NULL);
241   conn_q *cq, *prev = NULL, *iter;
242   /* because we always replace on the head and update the last_use time when
243      doing so, we know they are ordered LRU on the end.  So, once we hit an
244      old one, we know all the others are old too.
245    */
246   if(!pool->head) return; /* hack short circuit for no locks */
247   pthread_mutex_lock(&pool->lock);
248   old_cnt = pool->in_pool;
249   cq = pool->head;
250   while(cq) {
251     if(cq->last_use + cq->pool->ttl < now) {
252       if(prev) prev->next = NULL;
253       else pool->head = NULL;
254       break;
255     }
256     prev = cq;
257     cq = cq->next;
258   }
259   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
260   /* Fix accounting */
261   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
262   new_cnt = pool->in_pool;
263   pthread_mutex_unlock(&pool->lock);
264
265   /* Force release these without holding the lock */
266   while(cq) {
267     cq = cq->next;
268     release_conn_q_forceable(cq, 1);
269   }
270   if(old_cnt != new_cnt)
271     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
272           pool->queue_name);
273 }
274 static void
275 release_conn_q(conn_q *cq) {
276   ttl_purge_conn_pool(cq->pool);
277   release_conn_q_forceable(cq, 0);
278 }
279 static conn_pool *
280 get_conn_pool_for_remote(const char *remote_str,
281                          const char *remote_cn, const char *fqdn) {
282   void *vcpool;
283   conn_pool *cpool = NULL;
284   char queue_name[256] = "datastore_";
285   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
286            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
287            fqdn ? fqdn : "default",
288            remote_cn ? remote_cn : "default");
289   pthread_mutex_lock(&ds_conns_lock);
290   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
291                         strlen(queue_name), &vcpool))
292     cpool = vcpool;
293   pthread_mutex_unlock(&ds_conns_lock);
294   if(!cpool) {
295     vcpool = cpool = calloc(1, sizeof(*cpool));
296     cpool->queue_name = strdup(queue_name);
297     pthread_mutex_init(&cpool->lock, NULL);
298     pthread_cond_init(&cpool->cv, NULL);
299     cpool->in_pool = 0;
300     cpool->outstanding = 0;
301     cpool->max_in_pool = 1;
302     cpool->max_allocated = 1;
303     pthread_mutex_lock(&ds_conns_lock);
304     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
305                         cpool)) {
306       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
307                          strlen(queue_name), &vcpool);
308     }
309     pthread_mutex_unlock(&ds_conns_lock);
310     if(vcpool != cpool) {
311       /* someone beat us to it */
312       free(cpool->queue_name);
313       pthread_mutex_destroy(&cpool->lock);
314       pthread_cond_destroy(&cpool->cv);
315       free(cpool);
316     }
317     else {
318       int i;
319       /* Our job to setup the pool */
320       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
321       eventer_jobq_init(cpool->jobq, queue_name);
322       cpool->jobq->backq = eventer_default_backq();
323       /* Add one thread */
324       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
325         eventer_jobq_increase_concurrency(cpool->jobq);
326     }
327     cpool = vcpool;
328   }
329   return cpool;
330 }
331 static conn_q *
332 get_conn_q_for_remote(const char *remote_str,
333                       const char *remote_cn, const char *fqdn,
334                       const char *dsn) {
335   conn_pool *cpool;
336   conn_q *cq;
337   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
338   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
339         cpool->queue_name);
340   pthread_mutex_lock(&cpool->lock);
341  again:
342   if(cpool->head) {
343     assert(cpool->in_pool > 0);
344     cq = cpool->head;
345     cpool->head = cq->next;
346     cpool->in_pool--;
347     cpool->outstanding++;
348     cq->next = NULL;
349     pthread_mutex_unlock(&cpool->lock);
350     return cq;
351   }
352   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
353     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
354           (void *)pthread_self(), cpool->queue_name);
355     pthread_cond_wait(&cpool->cv, &cpool->lock);
356     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
357           (void *)pthread_self(), cpool->queue_name);
358     goto again;
359   }
360   else {
361     cpool->outstanding++;
362     pthread_mutex_unlock(&cpool->lock);
363   }
364  
365   cq = calloc(1, sizeof(*cq));
366   cq->pool = cpool;
367   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
368   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
369   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
370   cq->dsn = dsn ? strdup(dsn) : NULL;
371   return cq;
372 }
373 static conn_q *
374 get_conn_q_for_metanode() {
375   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
376 }
377
378 typedef enum {
379   DS_EXEC_SUCCESS = 0,
380   DS_EXEC_ROW_FAILED = 1,
381   DS_EXEC_TXN_FAILED = 2,
382 } execute_outcome_t;
383
384 #define DECLARE_PARAM_STR(str, len) do { \
385   d->paramValues[d->nparams] = noit__strndup(str, len); \
386   d->paramLengths[d->nparams] = len; \
387   d->paramFormats[d->nparams] = 0; \
388   d->paramAllocd[d->nparams] = 1; \
389   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
390     free(d->paramValues[d->nparams]); \
391     d->paramValues[d->nparams] = NULL; \
392     d->paramLengths[d->nparams] = 0; \
393     d->paramAllocd[d->nparams] = 0; \
394   } \
395   d->nparams++; \
396 } while(0)
397 #define DECLARE_PARAM_INT(i) do { \
398   int buffer__len; \
399   char buffer__[32]; \
400   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
401   buffer__len = strlen(buffer__); \
402   DECLARE_PARAM_STR(buffer__, buffer__len); \
403 } while(0)
404
405 #define PG_GET_STR_COL(dest, row, name) do { \
406   int colnum = PQfnumber(d->res, name); \
407   dest = NULL; \
408   if (colnum >= 0) \
409     dest = PQgetisnull(d->res, row, colnum) \
410          ? NULL : PQgetvalue(d->res, row, colnum); \
411 } while(0)
412
413 #define PG_EXEC(cmd) do { \
414   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
415                         (const char * const *)d->paramValues, \
416                         d->paramLengths, d->paramFormats, 0); \
417   d->rv = PQresultStatus(d->res); \
418   if(d->rv != PGRES_COMMAND_OK && \
419      d->rv != PGRES_TUPLES_OK) { \
420     const char *pgerr = PQresultErrorMessage(d->res); \
421     const char *pgerr_end = strchr(pgerr, '\n'); \
422     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
423     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
424           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
425           (int)(pgerr_end - pgerr), pgerr); \
426     PQclear(d->res); \
427     goto bad_row; \
428   } \
429 } while(0)
430
431 #define PG_TM_EXEC(cmd, whence) do { \
432   time_t __w = whence; \
433   char cmdbuf[4096]; \
434   struct tm tbuf, *tm; \
435   tm = gmtime_r(&__w, &tbuf); \
436   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
437   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
438                         (const char * const *)d->paramValues, \
439                         d->paramLengths, d->paramFormats, 0); \
440   d->rv = PQresultStatus(d->res); \
441   if(d->rv != PGRES_COMMAND_OK && \
442      d->rv != PGRES_TUPLES_OK) { \
443     const char *pgerr = PQresultErrorMessage(d->res); \
444     const char *pgerr_end = strchr(pgerr, '\n'); \
445     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
446     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
447           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
448           (long long unsigned)whence); \
449     PQclear(d->res); \
450     goto bad_row; \
451   } \
452 } while(0)
453
454 static void *
455 stratcon_datastore_check_loadall(void *vsn) {
456   storagenode_info *sn = vsn;
457   ds_single_detail *d;
458   int i, row_count = 0, good = 0;
459   char buff[1024];
460   conn_q *cq = NULL;
461
462   d = calloc(1, sizeof(*d));
463   GET_QUERY(check_loadall);
464   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
465   i = 0;
466   while(stratcon_database_connect(cq)) {
467     if(i++ > 4) {
468       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
469       release_conn_q(cq);
470       return (void *)(vpsized_int)good;
471     }
472     sleep(1);
473   }
474   PG_EXEC(check_loadall);
475   row_count = PQntuples(d->res);
476  
477   for(i=0; i<row_count; i++) {
478     int rv;
479     int8_t family;
480     struct sockaddr *sin;
481     struct sockaddr_in sin4 = { .sin_family = AF_INET };
482     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
483     char *remote, *id, *target, *module, *name;
484     PG_GET_STR_COL(remote, i, "remote_address");
485     PG_GET_STR_COL(id, i, "id");
486     PG_GET_STR_COL(target, i, "target");
487     PG_GET_STR_COL(module, i, "module");
488     PG_GET_STR_COL(name, i, "name");
489     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
490
491     family = AF_INET;
492     sin = (struct sockaddr *)&sin4;
493     rv = inet_pton(family, remote, &sin4.sin_addr);
494     if(rv != 1) {
495       family = AF_INET6;
496       sin = (struct sockaddr *)&sin6;
497       rv = inet_pton(family, remote, &sin6.sin6_addr);
498       if(rv != 1) {
499         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
500         sin = NULL;
501       }
502     }
503
504     /* stratcon_iep_line_processor takes an allocated operand and frees it */
505     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
506     good++;
507   }
508   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
509         good, row_count, sn->fqdn);
510  bad_row:
511   free_params((ds_single_detail *)d);
512   free(d);
513   if(cq) release_conn_q(cq);
514   return (void *)(vpsized_int)good;
515 }
516 static int
517 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
518                                     struct timeval *now) {
519   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
520   pthread_t *jobs = NULL;
521   int nodes, i = 0, tcnt = 0;
522   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
523   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
524
525   pthread_mutex_lock(&storagenode_to_info_cache_lock);
526   nodes = storagenode_to_info_cache.size;
527   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
528   sns = calloc(MAX(1,nodes), sizeof(*sns));
529   if(nodes == 0) sns[nodes++] = &self;
530   else {
531     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
532     const char *k;
533     void *v;
534     int klen;
535     while(noit_hash_next(&storagenode_to_info_cache,
536                          &iter, &k, &klen, &v)) {
537       sns[i++] = (storagenode_info *)v;
538     }
539   }
540   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
541
542   for(i=0; i<nodes; i++) {
543     if(pthread_create(&jobs[i], NULL,
544                       stratcon_datastore_check_loadall, sns[i]) != 0) {
545       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
546     }
547   }
548   for(i=0; i<nodes; i++) {
549     void *good;
550     pthread_join(jobs[i], &good);
551     tcnt += (int)(vpsized_int)good;
552   }
553   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
554   return 0;
555 }
556 void
557 stratcon_datastore_iep_check_preload() {
558   eventer_t e;
559   conn_pool *cpool;
560
561   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
562   e = eventer_alloc();
563   e->mask = EVENTER_ASYNCH;
564   e->callback = stratcon_datastore_asynch_drive_iep;
565   e->closure = NULL;
566   eventer_add_asynch(cpool->jobq, e);
567 }
568 execute_outcome_t
569 stratcon_datastore_find(ds_rt_detail *d) {
570   conn_q *cq;
571   char *val;
572   int row_count;
573   struct realtime_tracker *node;
574
575   for(node = d->rt; node; node = node->next) {
576     char uuid_str[UUID_STR_LEN+1];
577     const char *fqdn, *dsn, *remote_cn;
578     char remote_ip[32];
579     int storagenode_id;
580
581     uuid_unparse_lower(node->checkid, uuid_str);
582     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
583                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
584       continue;
585
586     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n",
587           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
588
589     /* We might be able to find the IP from our config if someone has
590      * specified the expected cn in the noit definition.
591      */
592     if(stratcon_find_noit_ip_by_cn(remote_cn,
593                                    remote_ip, sizeof(remote_ip)) == 0) {
594       node->noit = strdup(remote_ip);
595       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
596       continue;
597     }
598
599     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
600     stratcon_database_connect(cq);
601
602     GET_QUERY(check_find);
603     DECLARE_PARAM_INT(node->sid);
604     PG_EXEC(check_find);
605     row_count = PQntuples(d->res);
606     if(row_count != 1) {
607       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
608       PQclear(d->res);
609       goto bad_row;
610     }
611
612     /* Get the remote_address (which noit owns this) */
613     PG_GET_STR_COL(val, 0, "remote_address");
614     if(!val) {
615       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
616       PQclear(d->res);
617       goto bad_row;
618     }
619     node->noit = strdup(val);
620     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
621    bad_row:
622     free_params((ds_single_detail *)d);
623     d->nparams = 0;
624     release_conn_q(cq);
625   }
626   return DS_EXEC_SUCCESS;
627 }
628 execute_outcome_t
629 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
630                            ds_line_detail *d) {
631   int type, len, sid;
632   char *final_buff;
633   uLong final_len, actual_final_len;
634   char *token;
635   char raddr_blank[1] = "";
636   const char *raddr;
637
638   type = d->data[0];
639   raddr = r ? r : raddr_blank;
640
641   /* Parse the log line, but only if we haven't already */
642   if(!d->nparams) {
643     char *scp, *ecp;
644
645     scp = d->data;
646 #define PROCESS_NEXT_FIELD(t,l) do { \
647   if(!*scp) goto bad_row; \
648   ecp = strchr(scp, '\t'); \
649   if(!ecp) goto bad_row; \
650   token = scp; \
651   len = (ecp-scp); \
652   scp = ecp + 1; \
653 } while(0)
654 #define PROCESS_LAST_FIELD(t,l) do { \
655   if(!*scp) ecp = scp; \
656   else { \
657     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
658     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
659   } \
660   t = scp; \
661   l = (ecp-scp); \
662 } while(0)
663
664     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
665     switch(type) {
666       /* See noit_check_log.c for log description */
667       case 'n':
668         DECLARE_PARAM_STR(raddr, strlen(raddr));
669         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
670         DECLARE_PARAM_STR("noitd",5); /* node_type */
671         PROCESS_NEXT_FIELD(token,len);
672         d->whence = (time_t)strtoul(token, NULL, 10);
673         DECLARE_PARAM_STR(token,len); /* timestamp */
674
675         /* This is the expected uncompressed len */
676         PROCESS_NEXT_FIELD(token,len);
677         final_len = atoi(token);
678         final_buff = malloc(final_len);
679         if(!final_buff) goto bad_row;
680  
681         /* The last token is b64 endoded and compressed.
682          * we need to decode it, declare it and then free it.
683          */
684         PROCESS_LAST_FIELD(token, len);
685         /* We can in-place decode this */
686         len = noit_b64_decode((char *)token, len,
687                               (unsigned char *)token, len);
688         if(len <= 0) {
689           noitL(noit_error, "noitd config base64 decoding error.\n");
690           free(final_buff);
691           goto bad_row;
692         }
693         actual_final_len = final_len;
694         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
695                               (unsigned char *)token, len)) {
696           noitL(noit_error, "noitd config decompression failure.\n");
697           free(final_buff);
698           goto bad_row;
699         }
700         if(final_len != actual_final_len) {
701           noitL(noit_error, "noitd config decompression error.\n");
702           free(final_buff);
703           goto bad_row;
704         }
705         DECLARE_PARAM_STR(final_buff, final_len);
706         free(final_buff);
707         break;
708       case 'C':
709         DECLARE_PARAM_STR(raddr, strlen(raddr));
710         PROCESS_NEXT_FIELD(token,len);
711         DECLARE_PARAM_STR(token,len); /* timestamp */
712         d->whence = (time_t)strtoul(token, NULL, 10);
713         PROCESS_NEXT_FIELD(token, len);
714         sid = uuid_to_sid(token, remote_cn);
715         if(sid == 0) goto bad_row;
716         DECLARE_PARAM_INT(sid); /* sid */
717         DECLARE_PARAM_STR(token,len); /* uuid */
718         PROCESS_NEXT_FIELD(token, len);
719         DECLARE_PARAM_STR(token,len); /* target */
720         PROCESS_NEXT_FIELD(token, len);
721         DECLARE_PARAM_STR(token,len); /* module */
722         PROCESS_LAST_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* name */
724         break;
725       case 'M':
726         PROCESS_NEXT_FIELD(token,len);
727         DECLARE_PARAM_STR(token,len); /* timestamp */
728         d->whence = (time_t)strtoul(token, NULL, 10);
729         PROCESS_NEXT_FIELD(token, len);
730         sid = uuid_to_sid(token, remote_cn);
731         if(sid == 0) goto bad_row;
732         DECLARE_PARAM_INT(sid); /* sid */
733         PROCESS_NEXT_FIELD(token, len);
734         DECLARE_PARAM_STR(token,len); /* name */
735         PROCESS_NEXT_FIELD(token,len);
736         d->metric_type = *token;
737         PROCESS_LAST_FIELD(token,len);
738         DECLARE_PARAM_STR(token,len); /* value */
739         break;
740       case 'S':
741         PROCESS_NEXT_FIELD(token,len);
742         DECLARE_PARAM_STR(token,len); /* timestamp */
743         d->whence = (time_t)strtoul(token, NULL, 10);
744         PROCESS_NEXT_FIELD(token, len);
745         sid = uuid_to_sid(token, remote_cn);
746         if(sid == 0) goto bad_row;
747         DECLARE_PARAM_INT(sid); /* sid */
748         PROCESS_NEXT_FIELD(token, len);
749         DECLARE_PARAM_STR(token,len); /* state */
750         PROCESS_NEXT_FIELD(token, len);
751         DECLARE_PARAM_STR(token,len); /* availability */
752         PROCESS_NEXT_FIELD(token, len);
753         DECLARE_PARAM_STR(token,len); /* duration */
754         PROCESS_LAST_FIELD(token,len);
755         DECLARE_PARAM_STR(token,len); /* status */
756         break;
757       default:
758         goto bad_row;
759     }
760
761   }
762
763   /* Now execute the query */
764   switch(type) {
765     case 'n':
766       GET_QUERY(config_insert);
767       PG_EXEC(config_insert);
768       PQclear(d->res);
769       break;
770     case 'C':
771       GET_QUERY(check_insert);
772       PG_TM_EXEC(check_insert, d->whence);
773       PQclear(d->res);
774       break;
775     case 'S':
776       GET_QUERY(status_insert);
777       PG_TM_EXEC(status_insert, d->whence);
778       PQclear(d->res);
779       break;
780     case 'M':
781       switch(d->metric_type) {
782         case METRIC_INT32:
783         case METRIC_UINT32:
784         case METRIC_INT64:
785         case METRIC_UINT64:
786         case METRIC_DOUBLE:
787           GET_QUERY(metric_insert_numeric);
788           PG_TM_EXEC(metric_insert_numeric, d->whence);
789           PQclear(d->res);
790           break;
791         case METRIC_STRING:
792           GET_QUERY(metric_insert_text);
793           PG_TM_EXEC(metric_insert_text, d->whence);
794           PQclear(d->res);
795           break;
796         default:
797           goto bad_row;
798       }
799       break;
800     default:
801       /* should never get here */
802       goto bad_row;
803   }
804   return DS_EXEC_SUCCESS;
805  bad_row:
806   return DS_EXEC_ROW_FAILED;
807 }
808 static int
809 stratcon_database_post_connect(conn_q *cq) {
810   int rv = 0;
811   ds_single_detail _d = { 0 }, *d = &_d;
812   if(cq->fqdn) {
813     char *remote_str, *remote_cn;
814     /* This is the silly way we get null's in through our declare_param_str */
815     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
816     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
817     /* This is a storage node, it gets the storage node post_connect */
818     GET_QUERY(storage_post_connect);
819     rv = -1; /* now we're serious */
820     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
821     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
822     PG_EXEC(storage_post_connect);
823     PQclear(d->res);
824     rv = 0;
825   }
826   else {
827     /* Metanode post_connect */
828     GET_QUERY(metanode_post_connect);
829     rv = -1; /* now we're serious */
830     PG_EXEC(metanode_post_connect);
831     PQclear(d->res);
832     rv = 0;
833   }
834  bad_row:
835   free_params(d);
836   if(rv == -1) {
837     /* Post-connect intentions are serious and fatal */
838     PQfinish(cq->dbh);
839     cq->dbh = NULL;
840   }
841   return rv;
842 }
843 static int
844 stratcon_database_connect(conn_q *cq) {
845   char *dsn, dsn_meta[512];
846   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
847   const char *k, *v;
848   int klen;
849   noit_hash_table *t;
850
851   dsn_meta[0] = '\0';
852   if(!cq->dsn) {
853     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
854     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
855       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
856       strlcat(dsn_meta, k, sizeof(dsn_meta));
857       strlcat(dsn_meta, "=", sizeof(dsn_meta));
858       strlcat(dsn_meta, v, sizeof(dsn_meta));
859     }
860     noit_hash_destroy(t, free, free);
861     free(t);
862     dsn = dsn_meta;
863   }
864   else {
865     char options[32];
866     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
867     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
868                                options, sizeof(options))) {
869       strlcat(dsn_meta, " ", sizeof(dsn_meta));
870       strlcat(dsn_meta, "user", sizeof(dsn_meta));
871       strlcat(dsn_meta, "=", sizeof(dsn_meta));
872       strlcat(dsn_meta, options, sizeof(dsn_meta));
873     }
874     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
875                                options, sizeof(options))) {
876       strlcat(dsn_meta, " ", sizeof(dsn_meta));
877       strlcat(dsn_meta, "password", sizeof(dsn_meta));
878       strlcat(dsn_meta, "=", sizeof(dsn_meta));
879       strlcat(dsn_meta, options, sizeof(dsn_meta));
880     }
881     dsn = dsn_meta;
882   }
883
884   if(cq->dbh) {
885     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
886     PQreset(cq->dbh);
887     if(PQstatus(cq->dbh) != CONNECTION_OK) {
888       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
889             dsn, PQerrorMessage(cq->dbh));
890       return -1;
891     }
892     if(stratcon_database_post_connect(cq)) return -1;
893     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
894     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
895           dsn, PQerrorMessage(cq->dbh));
896     return -1;
897   }
898
899   cq->dbh = PQconnectdb(dsn);
900   if(!cq->dbh) return -1;
901   if(PQstatus(cq->dbh) != CONNECTION_OK) {
902     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
903           dsn, PQerrorMessage(cq->dbh));
904     return -1;
905   }
906   if(stratcon_database_post_connect(cq)) return -1;
907   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
908   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
909         dsn, PQerrorMessage(cq->dbh));
910   return -1;
911 }
912 static int
913 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
914                                 const char *name) {
915   int rv = -1;
916   PGresult *res;
917   char cmd[128];
918   strlcpy(cmd, p, sizeof(cmd));
919   strlcat(cmd, name, sizeof(cmd));
920   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
921   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
922   PQclear(res);
923   return rv;
924 }
925 static int
926 stratcon_datastore_do(conn_q *cq, const char *cmd) {
927   PGresult *res;
928   int rv = -1;
929   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
930   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
931   PQclear(res);
932   return rv;
933 }
934 #define BUSTED(cq) do { \
935   PQfinish((cq)->dbh); \
936   (cq)->dbh = NULL; \
937   goto full_monty; \
938 } while(0)
939 #define SAVEPOINT(name) do { \
940   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
941   last_sp = current; \
942 } while(0)
943 #define ROLLBACK_TO_SAVEPOINT(name) do { \
944   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
945     BUSTED(cq); \
946   last_sp = NULL; \
947 } while(0)
948 #define RELEASE_SAVEPOINT(name) do { \
949   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
950     BUSTED(cq); \
951   last_sp = NULL; \
952 } while(0)
953 int
954 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
955                                  struct timeval *now) {
956   ds_rt_detail *dsjd = closure;
957   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
958   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
959
960   assert(dsjd->rt);
961   stratcon_datastore_find(dsjd);
962   if(dsjd->completion_event)
963     eventer_add(dsjd->completion_event);
964
965   free_params((ds_single_detail *)dsjd);
966   free(dsjd);
967   return 0;
968 }
969 static const char *
970 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
971   void *vinfo;
972   char *dsn = NULL, *fqdn = NULL;
973   int found = 0;
974   storagenode_info *info = NULL;
975   pthread_mutex_lock(&storagenode_to_info_cache_lock);
976   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
977                         &vinfo)) {
978     found = 1;
979     info = vinfo;
980   }
981   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
982   if(found) {
983     if(fqdn_out) *fqdn_out = info->fqdn;
984     return info->dsn;
985   }
986
987   if(!found && can_use_db) {
988     ds_single_detail *d;
989     conn_q *cq;
990     int row_count;
991     /* Look it up and store it */
992     d = calloc(1, sizeof(*d));
993     cq = get_conn_q_for_metanode();
994     GET_QUERY(find_storage);
995     DECLARE_PARAM_INT(id);
996     PG_EXEC(find_storage);
997     row_count = PQntuples(d->res);
998     if(row_count) {
999       PG_GET_STR_COL(dsn, 0, "dsn");
1000       PG_GET_STR_COL(fqdn, 0, "fqdn");
1001       fqdn = fqdn ? strdup(fqdn) : NULL;
1002       dsn = dsn ? strdup(dsn) : NULL;
1003     }
1004     PQclear(d->res);
1005    bad_row:
1006     free_params(d);
1007     free(d);
1008     release_conn_q(cq);
1009   }
1010   if(fqdn) {
1011     info = calloc(1, sizeof(*info));
1012     info->fqdn = fqdn;
1013     if(fqdn_out) *fqdn_out = info->fqdn;
1014     info->dsn = dsn;
1015     info->storagenode_id = id;
1016     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1017     noit_hash_store(&storagenode_to_info_cache,
1018                     (void *)&info->storagenode_id, sizeof(int), info);
1019     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1020   }
1021   return info ? info->dsn : NULL;
1022 }
1023 static ds_line_detail *
1024 build_insert_batch(interim_journal_t *ij) {
1025   int rv;
1026   off_t len;
1027   const char *buff, *cp, *lcp;
1028   struct stat st;
1029   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1030
1031   if(ij->fd < 0) {
1032     ij->fd = open(ij->filename, O_RDONLY);
1033     if(ij->fd < 0) {
1034       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1035             ij->filename, strerror(errno));
1036       assert(ij->fd >= 0);
1037     }
1038   }
1039   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1040   if(rv == -1) {
1041       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1042             ij->filename, strerror(errno));
1043     assert(rv != -1);
1044   }
1045   len = st.st_size;
1046   if(len > 0) {
1047     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1048     if(buff == (void *)-1) {
1049       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1050             ij->filename, strerror(errno));
1051       assert(buff != (void *)-1);
1052     }
1053     lcp = buff;
1054     while(lcp < (buff + len) &&
1055           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1056       next = calloc(1, sizeof(*next));
1057       next->data = malloc(cp - lcp + 1);
1058       memcpy(next->data, lcp, cp - lcp);
1059       next->data[cp - lcp] = '\0';
1060       if(!head) head = next;
1061       if(last) last->next = next;
1062       last = next;
1063       lcp = cp + 1;
1064     }
1065     munmap((void *)buff, len);
1066   }
1067   close(ij->fd);
1068   return head;
1069 }
1070 static void
1071 interim_journal_remove(interim_journal_t *ij) {
1072   unlink(ij->filename);
1073   if(ij->filename) free(ij->filename);
1074   if(ij->remote_str) free(ij->remote_str);
1075   if(ij->remote_cn) free(ij->remote_cn);
1076   if(ij->fqdn) free(ij->fqdn);
1077   free(ij);
1078 }
1079 int
1080 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1081                                   struct timeval *now) {
1082   int i, total, success, sp_total, sp_success;
1083   interim_journal_t *ij;
1084   ds_line_detail *head = NULL, *current, *last_sp;
1085   const char *dsn;
1086   conn_q *cq;
1087   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1088   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1089
1090   ij = closure;
1091   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1092   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1093   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1094                              ij->fqdn, dsn);
1095   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1096         ij->remote_str, ij->remote_cn, ij->fqdn);
1097  full_monty:
1098   /* Make sure we have a connection */
1099   i = 1;
1100   while(stratcon_database_connect(cq)) {
1101     noitL(noit_error, "Error connecting to database: %s\n",
1102           ij->fqdn ? ij->fqdn : "(null)");
1103     sleep(i);
1104     i *= 2;
1105     i = MIN(i, 16);
1106   }
1107
1108   if(head == NULL) head = build_insert_batch(ij);
1109   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1110         ij->remote_str ? ij->remote_str : "(null)",
1111         ij->remote_cn ? ij->remote_cn : "(null)",
1112         ij->fqdn ? ij->fqdn : "(null)");
1113   current = head;
1114   last_sp = NULL;
1115   total = success = sp_total = sp_success = 0;
1116   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1117   while(current) {
1118     execute_outcome_t rv;
1119     if(current->data) {
1120       if(!last_sp) {
1121         SAVEPOINT("batch");
1122         sp_success = success;
1123         sp_total = total;
1124       }
1125  
1126       if(current->problematic) {
1127         RELEASE_SAVEPOINT("batch");
1128         current = current->next;
1129         total++;
1130         continue;
1131       }
1132       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1133                                       current);
1134       switch(rv) {
1135         case DS_EXEC_SUCCESS:
1136           total++;
1137           success++;
1138           current = current->next;
1139           break;
1140         case DS_EXEC_ROW_FAILED:
1141           /* rollback to savepoint, mark this record as bad and start again */
1142           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1143           current->problematic = 1;
1144           current = last_sp;
1145           success = sp_success;
1146           total = sp_total;
1147           ROLLBACK_TO_SAVEPOINT("batch");
1148           break;
1149         case DS_EXEC_TXN_FAILED:
1150           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1151           BUSTED(cq);
1152       }
1153     }
1154   }
1155   if(last_sp) RELEASE_SAVEPOINT("batch");
1156   if(stratcon_datastore_do(cq, "COMMIT")) {
1157     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1158     BUSTED(cq);
1159   }
1160   /* Cleanup the mess */
1161   while(head) {
1162     ds_line_detail *tofree;
1163     tofree = head;
1164     head = head->next;
1165     if(tofree->data) free(tofree->data);
1166     free_params((ds_single_detail *)tofree);
1167     free(tofree);
1168   }
1169   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1170         ij->remote_str ? ij->remote_str : "(null)",
1171         ij->remote_cn ? ij->remote_cn : "(null)",
1172         ij->fqdn ? ij->fqdn : "(null)", success, total);
1173   interim_journal_remove(ij);
1174   release_conn_q(cq);
1175   return 0;
1176 }
1177 static int
1178 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1179                                 struct timeval *now) {
1180   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1181   const char *k;
1182   int klen;
1183   void *vij;
1184   interim_journal_t *ij;
1185   syncset_t *syncset = closure;
1186
1187   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1188     eventer_add(syncset->completion);
1189     eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1190     free(syncset);
1191     return 0;
1192   }
1193   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1194
1195   noitL(ds_deb, "Syncing journal sets...\n");
1196   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1197     eventer_t ingest;
1198     ij = vij;
1199     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1200           ij->remote_str, ij->remote_cn, ij->fqdn);
1201     fsync(ij->fd);
1202     close(ij->fd);
1203     ij->fd = -1;
1204     ingest = eventer_alloc();
1205     ingest->mask = EVENTER_ASYNCH;
1206     ingest->callback = stratcon_datastore_asynch_execute;
1207     ingest->closure = ij;
1208     eventer_add_asynch(ij->cpool->jobq, ingest);
1209   }
1210   noit_hash_destroy(syncset->ws, free, NULL);
1211   free(syncset->ws);
1212   return 0;
1213 }
1214 static interim_journal_t *
1215 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1216                     int storagenode_id, const char *fqdn_in) {
1217   void *vhash, *vij;
1218   noit_hash_table *working_set;
1219   interim_journal_t *ij;
1220   struct timeval now;
1221   char jpath[PATH_MAX];
1222   char remote_str[128];
1223   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1224   const char *fqdn = fqdn_in ? fqdn_in : "default";
1225
1226   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1227   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1228
1229   /* Lookup the working set */
1230   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1231     working_set = calloc(1, sizeof(*working_set));
1232     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1233                     working_set);
1234   }
1235   else
1236     working_set = vhash;
1237
1238   /* Lookup the interim journal within the working set */
1239   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1240     ij = calloc(1, sizeof(*ij));
1241     gettimeofday(&now, NULL);
1242     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1243              basejpath, remote_str, remote_cn, storagenode_id,
1244              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1245     ij->remote_str = strdup(remote_str);
1246     ij->remote_cn = strdup(remote_cn);
1247     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1248     ij->storagenode_id = storagenode_id;
1249     ij->filename = strdup(jpath);
1250     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1251                                          ij->fqdn);
1252     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1253     if(ij->fd < 0 && errno == ENOENT) {
1254       if(mkdir_for_file(ij->filename, 0750)) {
1255         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1256               ij->filename, strerror(errno));
1257         exit(-1);
1258       }
1259       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1260     }
1261     if(ij->fd < 0) {
1262       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1263             ij->filename, strerror(errno));
1264       exit(-1);
1265     }
1266     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1267   }
1268   else
1269     ij = vij;
1270
1271   return ij;
1272 }
1273 static int
1274 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1275                           int *sid_out, int *storagenode_id_out,
1276                           const char **remote_cn_out,
1277                           const char **fqdn_out, const char **dsn_out) {
1278   /* only called from the main thread -- no safety issues */
1279   void *vuuidinfo, *vinfo;
1280   uuid_info *uuidinfo;
1281   storagenode_info *info = NULL;
1282   char *fqdn = NULL;
1283   char *dsn = NULL;
1284   char *new_remote_cn = NULL;
1285   int storagenode_id = 0, sid = 0;
1286   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1287                          &vuuidinfo)) {
1288     int row_count = 0;
1289     char *tmpint;
1290     ds_single_detail *d;
1291     conn_q *cq;
1292
1293     /* We can't do a database lookup without the remote_cn */
1294     if(!remote_cn) {
1295       if(stratcon_datastore_get_enabled()) {
1296         /* We have an authoritatively maintained cache, we don't do lookups */
1297         return -1;
1298       }
1299       else
1300         remote_cn = "[[null]]";
1301     }
1302
1303     d = calloc(1, sizeof(*d));
1304     cq = get_conn_q_for_metanode();
1305     if(stratcon_database_connect(cq) == 0) {
1306       /* Blocking call to service the cache miss */
1307       GET_QUERY(check_map);
1308       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1309       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1310       PG_EXEC(check_map);
1311       row_count = PQntuples(d->res);
1312       if(row_count != 1) {
1313         PQclear(d->res);
1314         goto bad_row;
1315       }
1316       PG_GET_STR_COL(tmpint, 0, "sid");
1317       if(!tmpint) {
1318         row_count = 0;
1319         PQclear(d->res);
1320         goto bad_row;
1321       }
1322       sid = atoi(tmpint);
1323       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1324       if(tmpint) storagenode_id = atoi(tmpint);
1325       PG_GET_STR_COL(fqdn, 0, "fqdn");
1326       PG_GET_STR_COL(dsn, 0, "dsn");
1327       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1328       fqdn = fqdn ? strdup(fqdn) : NULL;
1329       dsn = dsn ? strdup(dsn) : NULL;
1330       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1331       PQclear(d->res);
1332     }
1333    bad_row:
1334     free_params((ds_single_detail *)d);
1335     free(d);
1336     release_conn_q(cq);
1337     if(row_count != 1) {
1338       return -1;
1339     }
1340     /* Place in cache */
1341     uuidinfo = calloc(1, sizeof(*uuidinfo));
1342     uuidinfo->sid = sid;
1343     uuidinfo->uuid_str = strdup(uuid_str);
1344     uuidinfo->storagenode_id = storagenode_id;
1345     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1346     noit_hash_store(&uuid_to_info_cache,
1347                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1348     /* Also, we may have just witnessed a new storage node, store it */
1349     if(storagenode_id) {
1350       int needs_free = 0;
1351       info = calloc(1, sizeof(*info));
1352       info->storagenode_id = storagenode_id;
1353       info->dsn = dsn ? strdup(dsn) : NULL;
1354       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1355       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1356       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1357                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1358         /* hack to save memory -- we *never* remove from these caches,
1359            so we can use the same fqdn value in the above cache for the key
1360            in the cache below -- (no strdup) */
1361         noit_hash_store(&storagenode_to_info_cache,
1362                         (void *)&info->storagenode_id, sizeof(int), info);
1363       }
1364       else needs_free = 1;
1365       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1366       if(needs_free) {
1367         if(info->dsn) free(info->dsn);
1368         if(info->fqdn) free(info->fqdn);
1369         free(info);
1370       }
1371     }
1372   }
1373   else
1374     uuidinfo = vuuidinfo;
1375
1376   if(uuidinfo && uuidinfo->storagenode_id) {
1377     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1378       /* we don't have dsn and we actually want it */
1379       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1380       if(noit_hash_retrieve(&storagenode_to_info_cache,
1381                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1382                             &vinfo))
1383         info = vinfo;
1384       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1385     }
1386   }
1387
1388   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1389   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1390   assert(uuidinfo);
1391   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1392   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1393   if(sid_out) *sid_out = uuidinfo->sid;
1394   if(fqdn) free(fqdn);
1395   if(dsn) free(dsn);
1396   if(new_remote_cn) free(new_remote_cn);
1397   return 0;
1398 }
1399 static int
1400 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1401   char uuid_str[UUID_STR_LEN+1];
1402   int sid = 0;
1403   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1404   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1405   return sid;
1406 }
1407 static void
1408 stratcon_datastore_journal(struct sockaddr *remote,
1409                            const char *remote_cn, const char *line) {
1410   interim_journal_t *ij = NULL;
1411   char uuid_str[UUID_STR_LEN+1], *cp;
1412   const char *fqdn = NULL, *dsn = NULL;
1413   int storagenode_id = 0;
1414   uuid_t checkid;
1415   if(!line) return;
1416   /* if it is a UUID based thing, find the storage node */
1417   switch(*line) {
1418     case 'C':
1419     case 'S':
1420     case 'M':
1421       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1422         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1423         if(!uuid_parse(uuid_str, checkid)) {
1424           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1425                                     &storagenode_id, NULL, &fqdn, &dsn);
1426           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1427         }
1428       }
1429       break;
1430     case 'n':
1431       ij = interim_journal_get(remote,remote_cn,0,NULL);
1432       break;
1433     default:
1434       break;
1435   }
1436   if(!ij) {
1437     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1438   }
1439   else {
1440     int len;
1441     len = write(ij->fd, line, strlen(line));
1442     if(len < 0) {
1443       noitL(noit_error, "write to %s failed: %s\n",
1444             ij->filename, strerror(errno));
1445     }
1446   }
1447   free(line);
1448   return;
1449 }
1450 static noit_hash_table *
1451 stratcon_datastore_journal_remove(struct sockaddr *remote,
1452                                   const char *remote_cn) {
1453   void *vhash = NULL;
1454   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1455     /* pluck it out */
1456     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1457   }
1458   else {
1459     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1460           remote_cn);
1461     abort();
1462   }
1463   return vhash;
1464 }
1465 void
1466 stratcon_datastore_push(stratcon_datastore_op_t op,
1467                         struct sockaddr *remote,
1468                         const char *remote_cn, void *operand,
1469                         eventer_t completion) {
1470   conn_pool *cpool;
1471   syncset_t *syncset;
1472   eventer_t e;
1473   ds_rt_detail *rtdetail;
1474   struct datastore_onlooker_list *nnode;
1475
1476   for(nnode = onlookers; nnode; nnode = nnode->next)
1477     nnode->dispatch(op,remote,remote_cn,operand);
1478
1479   switch(op) {
1480     case DS_OP_INSERT:
1481       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1482       break;
1483     case DS_OP_CHKPT:
1484       e = eventer_alloc();
1485       syncset = calloc(1, sizeof(*syncset));
1486       e->mask = EVENTER_ASYNCH;
1487       e->callback = stratcon_datastore_journal_sync;
1488       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1489       syncset->completion = completion;
1490       e->closure = syncset;
1491       eventer_add(e);
1492       break;
1493     case DS_OP_FIND_COMPLETE:
1494       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1495       rtdetail = calloc(1, sizeof(*rtdetail));
1496       rtdetail->rt = operand;
1497       rtdetail->completion_event = completion;
1498       e = eventer_alloc();
1499       e->mask = EVENTER_ASYNCH;
1500       e->callback = stratcon_datastore_asynch_lookup;
1501       e->closure = rtdetail;
1502       eventer_add_asynch(cpool->jobq, e);
1503       break;
1504   }
1505 }
1506
1507 int
1508 stratcon_datastore_saveconfig(void *unused) {
1509   int rv = -1;
1510   char *buff;
1511   ds_single_detail _d = { 0 }, *d = &_d;
1512   conn_q *cq;
1513   char ipv4_str[32];
1514   struct in_addr r, l;
1515
1516   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1517   memset(&l, 0, sizeof(l));
1518   noit_getip_ipv4(r, &l);
1519   /* Ignore the error.. what are we going to do anyway */
1520   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1521     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1522
1523   cq = get_conn_q_for_metanode();
1524
1525   if(stratcon_database_connect(cq) == 0) {
1526     char time_as_str[20];
1527     size_t len;
1528     buff = noit_conf_xml_in_mem(&len);
1529     if(!buff) goto bad_row;
1530
1531     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1532     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1533     DECLARE_PARAM_STR("", 0);
1534     DECLARE_PARAM_STR("stratcond", 9);
1535     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1536     DECLARE_PARAM_STR(buff, len);
1537     free(buff);
1538
1539     GET_QUERY(config_insert);
1540     PG_EXEC(config_insert);
1541     PQclear(d->res);
1542     rv = 0;
1543
1544     bad_row:
1545       free_params(d);
1546   }
1547   release_conn_q(cq);
1548   return rv;
1549 }
1550
1551 void
1552 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1553                                                struct sockaddr *,
1554                                                const char *, void *)) {
1555   struct datastore_onlooker_list *nnode;
1556   nnode = calloc(1, sizeof(*nnode));
1557   nnode->dispatch = f;
1558   nnode->next = onlookers;
1559   while(noit_atomic_casptr((volatile void **)&onlookers,
1560                            nnode, nnode->next) != (void *)nnode->next)
1561     nnode->next = onlookers;
1562 }
1563 static void
1564 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1565                                          char *id_str, char *file) {
1566   char path[PATH_MAX];
1567   interim_journal_t *ij;
1568   eventer_t ingest;
1569
1570   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1571            basejpath, remote_str, remote_cn, id_str, file);
1572   ij = calloc(1, sizeof(*ij));
1573   ij->fd = open(path, O_RDONLY);
1574   if(ij->fd < 0) {
1575     noitL(noit_error, "cannot open journal '%s': %s\n",
1576           path, strerror(errno));
1577     free(ij);
1578     return;
1579   }
1580   close(ij->fd);
1581   ij->fd = -1;
1582   ij->filename = strdup(path);
1583   ij->remote_str = strdup(remote_str);
1584   ij->remote_cn = strdup(remote_cn);
1585   ij->storagenode_id = atoi(id_str);
1586   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1587                                        ij->fqdn);
1588   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1589   ingest = eventer_alloc();
1590   ingest->mask = EVENTER_ASYNCH;
1591   ingest->callback = stratcon_datastore_asynch_execute;
1592   ingest->closure = ij;
1593   eventer_add_asynch(ij->cpool->jobq, ingest);
1594 }
1595 static void
1596 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1597   char path[PATH_MAX];
1598   DIR *root;
1599   struct dirent *de, *entry;
1600   int i = 0, cnt = 0;
1601   char **entries;
1602   int size = 0;
1603
1604   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1605            first ? "/" : "", first ? first : "",
1606            second ? "/" : "", second ? second : "",
1607            third ? "/" : "", third ? third : "");
1608 #ifdef _PC_NAME_MAX
1609   size = pathconf(path, _PC_NAME_MAX);
1610 #endif
1611   size = MIN(size, PATH_MAX + 128);
1612   de = alloca(size);
1613   root = opendir(path);
1614   if(!root) return;
1615   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1616   closedir(root);
1617   root = opendir(path);
1618   if(!root) return;
1619   entries = malloc(sizeof(*entries) * cnt);
1620   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1621     if(i < cnt) {
1622       entries[i++] = strdup(entry->d_name);
1623     }
1624   }
1625   closedir(root);
1626   cnt = i; /* could have changed, directories are fickle */
1627   qsort(entries, i, sizeof(*entries),
1628         (int (*)(const void *, const void *))strcasecmp);
1629   for(i=0; i<cnt; i++) {
1630     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1631     noitL(ds_deb, "Processing L%d entry '%s'\n",
1632           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1633     if(!first)
1634       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1635     else if(!second)
1636       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1637     else if(!third)
1638       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1639     else if(strlen(entries[i]) == 16)
1640       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1641   }
1642 }
1643 static void
1644 stratcon_datastore_sweep_journals() {
1645   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1646 }
1647
1648 int
1649 stratcon_datastore_ingest_all_storagenode_info() {
1650   int i, cnt = 0;
1651   ds_single_detail _d = { 0 }, *d = &_d;
1652   conn_q *cq;
1653   cq = get_conn_q_for_metanode();
1654
1655   while(stratcon_database_connect(cq)) {
1656     noitL(noit_error, "Error connecting to database\n");
1657     sleep(1);
1658   }
1659
1660   GET_QUERY(all_storage);
1661   PG_EXEC(all_storage);
1662   cnt = PQntuples(d->res);
1663   for(i=0; i<cnt; i++) {
1664     void *vinfo;
1665     char *tmpint, *fqdn, *dsn;
1666     int storagenode_id;
1667     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1668     storagenode_id = atoi(tmpint);
1669     PG_GET_STR_COL(fqdn, i, "fqdn");
1670     PG_GET_STR_COL(dsn, i, "dsn");
1671     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1672     storagenode_id = tmpint ? atoi(tmpint) : 0;
1673
1674     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1675                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1676       storagenode_info *info;
1677       info = calloc(1, sizeof(*info));
1678       info->storagenode_id = storagenode_id;
1679       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1680       info->dsn = dsn ? strdup(dsn) : NULL;
1681       noit_hash_store(&storagenode_to_info_cache,
1682                       (void *)&info->storagenode_id, sizeof(int), info);
1683       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1684             info->storagenode_id,
1685             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1686     }
1687   }
1688   PQclear(d->res);
1689  bad_row:
1690   free_params(d);
1691
1692   release_conn_q(cq);
1693   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1694   return cnt;
1695 }
1696 int
1697 stratcon_datastore_ingest_all_check_info() {
1698   int i, cnt, loaded = 0;
1699   ds_single_detail _d = { 0 }, *d = &_d;
1700   conn_q *cq;
1701   cq = get_conn_q_for_metanode();
1702
1703   while(stratcon_database_connect(cq)) {
1704     noitL(noit_error, "Error connecting to database\n");
1705     sleep(1);
1706   }
1707
1708   GET_QUERY(check_mapall);
1709   PG_EXEC(check_mapall);
1710   cnt = PQntuples(d->res);
1711   for(i=0; i<cnt; i++) {
1712     void *vinfo;
1713     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1714     int sid, storagenode_id;
1715     uuid_info *uuidinfo;
1716     PG_GET_STR_COL(uuid_str, i, "id");
1717     if(!uuid_str) continue;
1718     PG_GET_STR_COL(tmpint, i, "sid");
1719     if(!tmpint) continue;
1720     sid = atoi(tmpint);
1721     PG_GET_STR_COL(fqdn, i, "fqdn");
1722     PG_GET_STR_COL(dsn, i, "dsn");
1723     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1724     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1725     storagenode_id = tmpint ? atoi(tmpint) : 0;
1726
1727     uuidinfo = calloc(1, sizeof(*uuidinfo));
1728     uuidinfo->uuid_str = strdup(uuid_str);
1729     uuidinfo->remote_cn = strdup(remote_cn);
1730     uuidinfo->storagenode_id = storagenode_id;
1731     uuidinfo->sid = sid;
1732     noit_hash_store(&uuid_to_info_cache,
1733                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1734     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1735           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1736     loaded++;
1737     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1738                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1739       storagenode_info *info;
1740       info = calloc(1, sizeof(*info));
1741       info->storagenode_id = storagenode_id;
1742       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1743       info->dsn = dsn ? strdup(dsn) : NULL;
1744       noit_hash_store(&storagenode_to_info_cache,
1745                       (void *)&info->storagenode_id, sizeof(int), info);
1746       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1747             info->storagenode_id,
1748             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1749     }
1750   }
1751   PQclear(d->res);
1752  bad_row:
1753   free_params(d);
1754
1755   release_conn_q(cq);
1756   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1757   return loaded;
1758 }
1759
1760 static int
1761 rest_get_noit_config(noit_http_rest_closure_t *restc,
1762                      int npats, char **pats) {
1763   noit_http_session_ctx *ctx = restc->http_ctx;
1764   ds_single_detail *d;
1765   int row_count = 0;
1766   const char *xml = NULL;
1767   conn_q *cq = NULL;
1768
1769   if(npats != 0) {
1770     noit_http_response_server_error(ctx, "text/xml");
1771     noit_http_response_end(ctx);
1772     return 0;
1773   }
1774   d = calloc(1, sizeof(*d));
1775   GET_QUERY(config_get);
1776   cq = get_conn_q_for_metanode();
1777   if(!cq) {
1778     noit_http_response_server_error(ctx, "text/xml");
1779     goto bad_row;
1780   }
1781
1782   DECLARE_PARAM_STR(restc->remote_cn,
1783                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1784   PG_EXEC(config_get);
1785   row_count = PQntuples(d->res);
1786   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1787
1788   if(xml == NULL) {
1789     char buff[1024];
1790     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1791                                  "<row_count>%d</row_count></error>\n",
1792              restc->remote_cn, row_count);
1793     noit_http_response_append(ctx, buff, strlen(buff));
1794     noit_http_response_not_found(ctx, "text/xml");
1795   }
1796   else {
1797     noit_http_response_append(ctx, xml, strlen(xml));
1798     noit_http_response_ok(ctx, "text/xml");
1799   }
1800  bad_row:
1801   free_params((ds_single_detail *)d);
1802   d->nparams = 0;
1803   if(cq) release_conn_q(cq);
1804
1805   noit_http_response_end(ctx);
1806   return 0;
1807 }
1808
1809 void
1810 stratcon_datastore_init() {
1811   pthread_mutex_init(&ds_conns_lock, NULL);
1812   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1813   ds_err = noit_log_stream_find("error/datastore");
1814   ds_deb = noit_log_stream_find("debug/datastore");
1815   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1816   ingest_err = noit_log_stream_find("error/ingest");
1817   if(!ds_err) ds_err = noit_error;
1818   if(!ingest_err) ingest_err = noit_error;
1819   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1820                            &basejpath)) {
1821     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1822     exit(-1);
1823   }
1824   stratcon_datastore_ingest_all_check_info();
1825   stratcon_datastore_ingest_all_storagenode_info();
1826   stratcon_datastore_sweep_journals();
1827
1828   assert(noit_http_rest_register_auth(
1829     "GET", "/noits/", "^config$", rest_get_noit_config,
1830              noit_http_rest_client_cert_auth
1831   ) == 0);
1832 }
1833
Note: See TracBrowser for help on using the browser.