root/src/stratcon_datastore.c

Revision d9050a442ce49ffad4040ba933511197fe71390f, 55.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

this will break realtime streaming.... need to fix the javascript in the web ui now, refs #229

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_realtime_http.h"
41 #include "stratcon_iep.h"
42 #include "noit_conf.h"
43 #include "noit_check.h"
44 #include "noit_rest.h"
45 #include <unistd.h>
46 #include <fcntl.h>
47 #include <netinet/in.h>
48 #include <sys/un.h>
49 #include <dirent.h>
50 #include <arpa/inet.h>
51 #include <sys/mman.h>
52 #include <libpq-fe.h>
53 #include <zlib.h>
54 #include <assert.h>
55 #include <errno.h>
56
57 #define DECL_STMT(codename,confname) \
58 static char *codename = NULL; \
59 static const char *codename##_conf = "/stratcon/database/statements/" #confname
60
61 DECL_STMT(storage_post_connect, storagepostconnect);
62 DECL_STMT(metanode_post_connect, metanodepostconnect);
63 DECL_STMT(find_storage, findstoragenode);
64 DECL_STMT(all_storage, allstoragenodes);
65 DECL_STMT(check_map, mapchecktostoragenode);
66 DECL_STMT(check_mapall, mapallchecks);
67 DECL_STMT(check_loadall, allchecks);
68 DECL_STMT(check_find, findcheck);
69 DECL_STMT(check_insert, check);
70 DECL_STMT(status_insert, status);
71 DECL_STMT(metric_insert_numeric, metric_numeric);
72 DECL_STMT(metric_insert_text, metric_text);
73 DECL_STMT(config_insert, config);
74 DECL_STMT(config_get, findconfig);
75
76 static noit_log_stream_t ds_err = NULL;
77 static noit_log_stream_t ds_deb = NULL;
78 static noit_log_stream_t ds_pool_deb = NULL;
79 static noit_log_stream_t ingest_err = NULL;
80
81 static struct datastore_onlooker_list {
82   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
83                    const char *, void *);
84   struct datastore_onlooker_list *next;
85 } *onlookers = NULL;
86
87 #define GET_QUERY(a) do { \
88   if(a == NULL) \
89     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
90       goto bad_row; \
91 } while(0)
92
93 struct conn_q;
94
95 typedef struct {
96   char            *queue_name; /* the key fqdn+remote_sn */
97   eventer_jobq_t  *jobq;
98   struct conn_q   *head;
99   pthread_mutex_t  lock;
100   pthread_cond_t   cv;
101   int              ttl;
102   int              in_pool;
103   int              outstanding;
104   int              max_allocated;
105   int              max_in_pool;
106 } conn_pool;
107 typedef struct conn_q {
108   time_t           last_use;
109   char            *dsn;        /* Pg connect string */
110   char            *remote_str; /* the IP of the noit*/
111   char            *remote_cn;  /* the Cert CN of the noit */
112   char            *fqdn;       /* the fqdn of the storage node */
113   conn_pool       *pool;
114   struct conn_q   *next;
115   /* Postgres specific stuff */
116   PGconn          *dbh;
117 } conn_q;
118
119
120 #define MAX_PARAMS 8
121 #define POSTGRES_PARTS \
122   PGresult *res; \
123   int rv; \
124   time_t whence; \
125   int nparams; \
126   int metric_type; \
127   char *paramValues[MAX_PARAMS]; \
128   int paramLengths[MAX_PARAMS]; \
129   int paramFormats[MAX_PARAMS]; \
130   int paramAllocd[MAX_PARAMS];
131
132 typedef struct ds_single_detail {
133   POSTGRES_PARTS
134 } ds_single_detail;
135 typedef struct {
136   /* Postgres specific stuff */
137   POSTGRES_PARTS
138   struct realtime_tracker *rt;
139   conn_q *cq; /* connection on which to perform this job */
140   eventer_t completion_event; /* This event should be registered if non NULL */
141 } ds_rt_detail;
142 typedef struct ds_line_detail {
143   /* Postgres specific stuff */
144   POSTGRES_PARTS
145   char *data;
146   int problematic;
147   struct ds_line_detail *next;
148 } ds_line_detail;
149
150 typedef struct {
151   noit_hash_table *ws;
152   eventer_t completion;
153 } syncset_t;
154
155 typedef struct {
156   char *remote_str;
157   char *remote_cn;
158   char *fqdn;
159   int storagenode_id;
160   int fd;
161   char *filename;
162   conn_pool *cpool;
163 } interim_journal_t;
164
165 static int stratcon_database_connect(conn_q *cq);
166 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
167 static int storage_node_quick_lookup(const char *uuid_str,
168                                      const char *remote_cn,
169                                      int *sid_out, int *storagenode_id_out,
170                                      const char **remote_cn_out,
171                                      const char **fqdn_out,
172                                      const char **dsn_out);
173
174 static void
175 free_params(ds_single_detail *d) {
176   int i;
177   for(i=0; i<d->nparams; i++)
178     if(d->paramAllocd[i] && d->paramValues[i])
179       free(d->paramValues[i]);
180 }
181
182 char *basejpath = NULL;
183 pthread_mutex_t ds_conns_lock;
184 noit_hash_table ds_conns;
185 noit_hash_table working_sets;
186
187 /* the fqdn cache needs to be thread safe */
188 typedef struct {
189   char *uuid_str;
190   char *remote_cn;
191   int storagenode_id;
192   int sid;
193 } uuid_info;
194 typedef struct {
195   int storagenode_id;
196   char *fqdn;
197   char *dsn;
198 } storagenode_info;
199 noit_hash_table uuid_to_info_cache;
200 pthread_mutex_t storagenode_to_info_cache_lock;
201 noit_hash_table storagenode_to_info_cache;
202
203 int
204 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
205   char name[128] = "";
206   buff[0] = '\0';
207   if(remote) {
208     int len = 0;
209     switch(remote->sa_family) {
210       case AF_INET:
211         len = sizeof(struct sockaddr_in);
212         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
213                   name, len);
214         break;
215       case AF_INET6:
216        len = sizeof(struct sockaddr_in6);
217         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
218                   name, len);
219        break;
220       case AF_UNIX:
221         len = SUN_LEN(((struct sockaddr_un *)remote));
222         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
223         break;
224       default: return 0;
225     }
226   }
227   strlcpy(buff, name, blen);
228   return strlen(buff);
229 }
230
231 /* Thread-safe connection pools */
232
233 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
234 static void
235 release_conn_q_forceable(conn_q *cq, int forcefree) {
236   int putback = 0;
237   cq->last_use = time(NULL);
238   pthread_mutex_lock(&cq->pool->lock);
239   cq->pool->outstanding--;
240   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
241     putback = 1;
242     cq->next = cq->pool->head;
243     cq->pool->head = cq;
244     cq->pool->in_pool++;
245   }
246   pthread_mutex_unlock(&cq->pool->lock);
247   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
248         putback ? "to pool" : "and destroy", cq->pool->queue_name);
249   pthread_cond_signal(&cq->pool->cv);
250   if(putback) return;
251
252   /* Not put back, release it */
253   if(cq->dbh) PQfinish(cq->dbh);
254   if(cq->remote_str) free(cq->remote_str);
255   if(cq->remote_cn) free(cq->remote_cn);
256   if(cq->fqdn) free(cq->fqdn);
257   if(cq->dsn) free(cq->dsn);
258   free(cq);
259 }
260 static void
261 ttl_purge_conn_pool(conn_pool *pool) {
262   int old_cnt, new_cnt;
263   time_t now = time(NULL);
264   conn_q *cq, *prev = NULL, *iter;
265   /* because we always replace on the head and update the last_use time when
266      doing so, we know they are ordered LRU on the end.  So, once we hit an
267      old one, we know all the others are old too.
268    */
269   if(!pool->head) return; /* hack short circuit for no locks */
270   pthread_mutex_lock(&pool->lock);
271   old_cnt = pool->in_pool;
272   cq = pool->head;
273   while(cq) {
274     if(cq->last_use + cq->pool->ttl < now) {
275       if(prev) prev->next = NULL;
276       else pool->head = NULL;
277       break;
278     }
279     prev = cq;
280     cq = cq->next;
281   }
282   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
283   /* Fix accounting */
284   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
285   new_cnt = pool->in_pool;
286   pthread_mutex_unlock(&pool->lock);
287
288   /* Force release these without holding the lock */
289   while(cq) {
290     prev = cq;
291     cq = cq->next;
292     release_conn_q_forceable(cq, 1);
293   }
294   if(old_cnt != new_cnt)
295     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
296           pool->queue_name);
297 }
298 static void
299 release_conn_q(conn_q *cq) {
300   ttl_purge_conn_pool(cq->pool);
301   release_conn_q_forceable(cq, 0);
302 }
303 static conn_pool *
304 get_conn_pool_for_remote(const char *remote_str,
305                          const char *remote_cn, const char *fqdn) {
306   void *vcpool;
307   conn_pool *cpool = NULL;
308   char queue_name[256] = "datastore_";
309   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
310            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
311            fqdn ? fqdn : "default",
312            remote_cn ? remote_cn : "default");
313   pthread_mutex_lock(&ds_conns_lock);
314   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
315                         strlen(queue_name), &vcpool))
316     cpool = vcpool;
317   pthread_mutex_unlock(&ds_conns_lock);
318   if(!cpool) {
319     vcpool = cpool = calloc(1, sizeof(*cpool));
320     cpool->queue_name = strdup(queue_name);
321     pthread_mutex_init(&cpool->lock, NULL);
322     pthread_cond_init(&cpool->cv, NULL);
323     cpool->in_pool = 0;
324     cpool->outstanding = 0;
325     cpool->max_in_pool = 1;
326     cpool->max_allocated = 1;
327     pthread_mutex_lock(&ds_conns_lock);
328     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
329                         cpool)) {
330       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
331                          strlen(queue_name), &vcpool);
332     }
333     pthread_mutex_unlock(&ds_conns_lock);
334     if(vcpool != cpool) {
335       /* someone beat us to it */
336       free(cpool->queue_name);
337       pthread_mutex_destroy(&cpool->lock);
338       pthread_cond_destroy(&cpool->cv);
339       free(cpool);
340     }
341     else {
342       int i;
343       /* Our job to setup the pool */
344       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
345       eventer_jobq_init(cpool->jobq, queue_name);
346       cpool->jobq->backq = eventer_default_backq();
347       /* Add one thread */
348       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
349         eventer_jobq_increase_concurrency(cpool->jobq);
350     }
351     cpool = vcpool;
352   }
353   return cpool;
354 }
355 static conn_q *
356 get_conn_q_for_remote(const char *remote_str,
357                       const char *remote_cn, const char *fqdn,
358                       const char *dsn) {
359   conn_pool *cpool;
360   conn_q *cq;
361   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
362   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
363         cpool->queue_name);
364   pthread_mutex_lock(&cpool->lock);
365  again:
366   if(cpool->head) {
367     assert(cpool->in_pool > 0);
368     cq = cpool->head;
369     cpool->head = cq->next;
370     cpool->in_pool--;
371     cpool->outstanding++;
372     cq->next = NULL;
373     pthread_mutex_unlock(&cpool->lock);
374     return cq;
375   }
376   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
377     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
378           (void *)pthread_self(), cpool->queue_name);
379     pthread_cond_wait(&cpool->cv, &cpool->lock);
380     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
381           (void *)pthread_self(), cpool->queue_name);
382     goto again;
383   }
384   else {
385     cpool->outstanding++;
386     pthread_mutex_unlock(&cpool->lock);
387   }
388  
389   cq = calloc(1, sizeof(*cq));
390   cq->pool = cpool;
391   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
392   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
393   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
394   cq->dsn = dsn ? strdup(dsn) : NULL;
395   return cq;
396 }
397 static conn_q *
398 get_conn_q_for_metanode() {
399   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
400 }
401
402 typedef enum {
403   DS_EXEC_SUCCESS = 0,
404   DS_EXEC_ROW_FAILED = 1,
405   DS_EXEC_TXN_FAILED = 2,
406 } execute_outcome_t;
407
408 #define DECLARE_PARAM_STR(str, len) do { \
409   d->paramValues[d->nparams] = noit__strndup(str, len); \
410   d->paramLengths[d->nparams] = len; \
411   d->paramFormats[d->nparams] = 0; \
412   d->paramAllocd[d->nparams] = 1; \
413   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
414     free(d->paramValues[d->nparams]); \
415     d->paramValues[d->nparams] = NULL; \
416     d->paramLengths[d->nparams] = 0; \
417     d->paramAllocd[d->nparams] = 0; \
418   } \
419   d->nparams++; \
420 } while(0)
421 #define DECLARE_PARAM_INT(i) do { \
422   int buffer__len; \
423   char buffer__[32]; \
424   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
425   buffer__len = strlen(buffer__); \
426   DECLARE_PARAM_STR(buffer__, buffer__len); \
427 } while(0)
428
429 #define PG_GET_STR_COL(dest, row, name) do { \
430   int colnum = PQfnumber(d->res, name); \
431   dest = NULL; \
432   if (colnum >= 0) \
433     dest = PQgetisnull(d->res, row, colnum) \
434          ? NULL : PQgetvalue(d->res, row, colnum); \
435 } while(0)
436
437 #define PG_EXEC(cmd) do { \
438   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
439                         (const char * const *)d->paramValues, \
440                         d->paramLengths, d->paramFormats, 0); \
441   d->rv = PQresultStatus(d->res); \
442   if(d->rv != PGRES_COMMAND_OK && \
443      d->rv != PGRES_TUPLES_OK) { \
444     const char *pgerr = PQresultErrorMessage(d->res); \
445     const char *pgerr_end = strchr(pgerr, '\n'); \
446     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
447     noitL(ds_err, "[%s] stratcon datasource bad (%d): %.*s\n", \
448           cq->fqdn ? cq->fqdn : "metanode", d->rv, \
449           (int)(pgerr_end - pgerr), pgerr); \
450     PQclear(d->res); \
451     goto bad_row; \
452   } \
453 } while(0)
454
455 #define PG_TM_EXEC(cmd, whence) do { \
456   time_t __w = whence; \
457   char cmdbuf[4096]; \
458   struct tm tbuf, *tm; \
459   tm = gmtime_r(&__w, &tbuf); \
460   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
461   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
462                         (const char * const *)d->paramValues, \
463                         d->paramLengths, d->paramFormats, 0); \
464   d->rv = PQresultStatus(d->res); \
465   if(d->rv != PGRES_COMMAND_OK && \
466      d->rv != PGRES_TUPLES_OK) { \
467     const char *pgerr = PQresultErrorMessage(d->res); \
468     const char *pgerr_end = strchr(pgerr, '\n'); \
469     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
470     noitL(ds_err, "stratcon datasource bad (%d): %.*s time: %llu\n", \
471           d->rv, (int)(pgerr_end - pgerr), pgerr, \
472           (long long unsigned)whence); \
473     PQclear(d->res); \
474     goto bad_row; \
475   } \
476 } while(0)
477
478 static void *
479 stratcon_datastore_check_loadall(void *vsn) {
480   storagenode_info *sn = vsn;
481   ds_single_detail *d;
482   int i, row_count = 0, good = 0;
483   char buff[1024];
484   conn_q *cq = NULL;
485
486   d = calloc(1, sizeof(*d));
487   GET_QUERY(check_loadall);
488   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
489   i = 0;
490   while(stratcon_database_connect(cq)) {
491     if(i++ > 4) {
492       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
493       release_conn_q(cq);
494       return (void *)(vpsized_int)good;
495     }
496     sleep(1);
497   }
498   PG_EXEC(check_loadall);
499   row_count = PQntuples(d->res);
500  
501   for(i=0; i<row_count; i++) {
502     int rv;
503     int8_t family;
504     struct sockaddr *sin;
505     struct sockaddr_in sin4 = { .sin_family = AF_INET };
506     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
507     char *remote, *id, *target, *module, *name;
508     PG_GET_STR_COL(remote, i, "remote_address");
509     PG_GET_STR_COL(id, i, "id");
510     PG_GET_STR_COL(target, i, "target");
511     PG_GET_STR_COL(module, i, "module");
512     PG_GET_STR_COL(name, i, "name");
513     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
514
515     family = AF_INET;
516     sin = (struct sockaddr *)&sin4;
517     rv = inet_pton(family, remote, &sin4.sin_addr);
518     if(rv != 1) {
519       family = AF_INET6;
520       sin = (struct sockaddr *)&sin6;
521       rv = inet_pton(family, remote, &sin6.sin6_addr);
522       if(rv != 1) {
523         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
524         sin = NULL;
525       }
526     }
527
528     /* stratcon_iep_line_processor takes an allocated operand and frees it */
529     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
530     good++;
531   }
532   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
533         good, row_count, sn->fqdn);
534  bad_row:
535   free_params((ds_single_detail *)d);
536   free(d);
537   if(cq) release_conn_q(cq);
538   return (void *)(vpsized_int)good;
539 }
540 static int
541 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
542                                     struct timeval *now) {
543   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
544   pthread_t *jobs = NULL;
545   int nodes, i = 0, tcnt = 0;
546   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
547   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
548
549   pthread_mutex_lock(&storagenode_to_info_cache_lock);
550   nodes = storagenode_to_info_cache.size;
551   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
552   sns = calloc(MAX(1,nodes), sizeof(*sns));
553   if(nodes == 0) sns[nodes++] = &self;
554   else {
555     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
556     const char *k;
557     void *v;
558     int klen;
559     while(noit_hash_next(&storagenode_to_info_cache,
560                          &iter, &k, &klen, &v)) {
561       sns[i++] = (storagenode_info *)v;
562     }
563   }
564   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
565
566   for(i=0; i<nodes; i++) {
567     if(pthread_create(&jobs[i], NULL,
568                       stratcon_datastore_check_loadall, sns[i]) != 0) {
569       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
570     }
571   }
572   for(i=0; i<nodes; i++) {
573     void *good;
574     pthread_join(jobs[i], &good);
575     tcnt += (int)(vpsized_int)good;
576   }
577   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
578   return 0;
579 }
580 void
581 stratcon_datastore_iep_check_preload() {
582   eventer_t e;
583   conn_pool *cpool;
584
585   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
586   e = eventer_alloc();
587   e->mask = EVENTER_ASYNCH;
588   e->callback = stratcon_datastore_asynch_drive_iep;
589   e->closure = NULL;
590   eventer_add_asynch(cpool->jobq, e);
591 }
592 execute_outcome_t
593 stratcon_datastore_find(ds_rt_detail *d) {
594   conn_q *cq;
595   char *val;
596   int row_count;
597   struct realtime_tracker *node;
598
599   for(node = d->rt; node; node = node->next) {
600     char uuid_str[UUID_STR_LEN+1];
601     const char *fqdn, *dsn, *remote_cn;
602     int storagenode_id;
603
604     uuid_unparse_lower(node->checkid, uuid_str);
605     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
606                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
607       continue;
608
609     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
610     stratcon_database_connect(cq);
611
612     GET_QUERY(check_find);
613     DECLARE_PARAM_INT(node->sid);
614     PG_EXEC(check_find);
615     row_count = PQntuples(d->res);
616     if(row_count != 1) {
617       PQclear(d->res);
618       goto bad_row;
619     }
620
621     /* Get the check uuid */
622     PG_GET_STR_COL(val, 0, "id");
623     if(!val) {
624       PQclear(d->res);
625       goto bad_row;
626     }
627     if(uuid_parse(val, node->checkid)) {
628       PQclear(d->res);
629       goto bad_row;
630     }
631  
632     /* Get the remote_address (which noit owns this) */
633     PG_GET_STR_COL(val, 0, "remote_address");
634     if(!val) {
635       PQclear(d->res);
636       goto bad_row;
637     }
638     node->noit = strdup(val);
639  
640    bad_row:
641     free_params((ds_single_detail *)d);
642     d->nparams = 0;
643     release_conn_q(cq);
644   }
645   return DS_EXEC_SUCCESS;
646 }
647 execute_outcome_t
648 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
649                            ds_line_detail *d) {
650   int type, len, sid;
651   char *final_buff;
652   uLong final_len, actual_final_len;
653   char *token;
654   char raddr_blank[1] = "";
655   const char *raddr;
656
657   type = d->data[0];
658   raddr = r ? r : raddr_blank;
659
660   /* Parse the log line, but only if we haven't already */
661   if(!d->nparams) {
662     char *scp, *ecp;
663
664     scp = d->data;
665 #define PROCESS_NEXT_FIELD(t,l) do { \
666   if(!*scp) goto bad_row; \
667   ecp = strchr(scp, '\t'); \
668   if(!ecp) goto bad_row; \
669   token = scp; \
670   len = (ecp-scp); \
671   scp = ecp + 1; \
672 } while(0)
673 #define PROCESS_LAST_FIELD(t,l) do { \
674   if(!*scp) ecp = scp; \
675   else { \
676     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
677     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
678   } \
679   t = scp; \
680   l = (ecp-scp); \
681 } while(0)
682
683     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
684     switch(type) {
685       /* See noit_check_log.c for log description */
686       case 'n':
687         DECLARE_PARAM_STR(raddr, strlen(raddr));
688         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
689         DECLARE_PARAM_STR("noitd",5); /* node_type */
690         PROCESS_NEXT_FIELD(token,len);
691         d->whence = (time_t)strtoul(token, NULL, 10);
692         DECLARE_PARAM_STR(token,len); /* timestamp */
693
694         /* This is the expected uncompressed len */
695         PROCESS_NEXT_FIELD(token,len);
696         final_len = atoi(token);
697         final_buff = malloc(final_len);
698         if(!final_buff) goto bad_row;
699  
700         /* The last token is b64 endoded and compressed.
701          * we need to decode it, declare it and then free it.
702          */
703         PROCESS_LAST_FIELD(token, len);
704         /* We can in-place decode this */
705         len = noit_b64_decode((char *)token, len,
706                               (unsigned char *)token, len);
707         if(len <= 0) {
708           noitL(noit_error, "noitd config base64 decoding error.\n");
709           free(final_buff);
710           goto bad_row;
711         }
712         actual_final_len = final_len;
713         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
714                               (unsigned char *)token, len)) {
715           noitL(noit_error, "noitd config decompression failure.\n");
716           free(final_buff);
717           goto bad_row;
718         }
719         if(final_len != actual_final_len) {
720           noitL(noit_error, "noitd config decompression error.\n");
721           free(final_buff);
722           goto bad_row;
723         }
724         DECLARE_PARAM_STR(final_buff, final_len);
725         free(final_buff);
726         break;
727       case 'C':
728         DECLARE_PARAM_STR(raddr, strlen(raddr));
729         PROCESS_NEXT_FIELD(token,len);
730         DECLARE_PARAM_STR(token,len); /* timestamp */
731         d->whence = (time_t)strtoul(token, NULL, 10);
732         PROCESS_NEXT_FIELD(token, len);
733         sid = uuid_to_sid(token, remote_cn);
734         if(sid == 0) goto bad_row;
735         DECLARE_PARAM_INT(sid); /* sid */
736         DECLARE_PARAM_STR(token,len); /* uuid */
737         PROCESS_NEXT_FIELD(token, len);
738         DECLARE_PARAM_STR(token,len); /* target */
739         PROCESS_NEXT_FIELD(token, len);
740         DECLARE_PARAM_STR(token,len); /* module */
741         PROCESS_LAST_FIELD(token, len);
742         DECLARE_PARAM_STR(token,len); /* name */
743         break;
744       case 'M':
745         PROCESS_NEXT_FIELD(token,len);
746         DECLARE_PARAM_STR(token,len); /* timestamp */
747         d->whence = (time_t)strtoul(token, NULL, 10);
748         PROCESS_NEXT_FIELD(token, len);
749         sid = uuid_to_sid(token, remote_cn);
750         if(sid == 0) goto bad_row;
751         DECLARE_PARAM_INT(sid); /* sid */
752         PROCESS_NEXT_FIELD(token, len);
753         DECLARE_PARAM_STR(token,len); /* name */
754         PROCESS_NEXT_FIELD(token,len);
755         d->metric_type = *token;
756         PROCESS_LAST_FIELD(token,len);
757         DECLARE_PARAM_STR(token,len); /* value */
758         break;
759       case 'S':
760         PROCESS_NEXT_FIELD(token,len);
761         DECLARE_PARAM_STR(token,len); /* timestamp */
762         d->whence = (time_t)strtoul(token, NULL, 10);
763         PROCESS_NEXT_FIELD(token, len);
764         sid = uuid_to_sid(token, remote_cn);
765         if(sid == 0) goto bad_row;
766         DECLARE_PARAM_INT(sid); /* sid */
767         PROCESS_NEXT_FIELD(token, len);
768         DECLARE_PARAM_STR(token,len); /* state */
769         PROCESS_NEXT_FIELD(token, len);
770         DECLARE_PARAM_STR(token,len); /* availability */
771         PROCESS_NEXT_FIELD(token, len);
772         DECLARE_PARAM_STR(token,len); /* duration */
773         PROCESS_LAST_FIELD(token,len);
774         DECLARE_PARAM_STR(token,len); /* status */
775         break;
776       default:
777         goto bad_row;
778     }
779
780   }
781
782   /* Now execute the query */
783   switch(type) {
784     case 'n':
785       GET_QUERY(config_insert);
786       PG_EXEC(config_insert);
787       PQclear(d->res);
788       break;
789     case 'C':
790       GET_QUERY(check_insert);
791       PG_TM_EXEC(check_insert, d->whence);
792       PQclear(d->res);
793       break;
794     case 'S':
795       GET_QUERY(status_insert);
796       PG_TM_EXEC(status_insert, d->whence);
797       PQclear(d->res);
798       break;
799     case 'M':
800       switch(d->metric_type) {
801         case METRIC_INT32:
802         case METRIC_UINT32:
803         case METRIC_INT64:
804         case METRIC_UINT64:
805         case METRIC_DOUBLE:
806           GET_QUERY(metric_insert_numeric);
807           PG_TM_EXEC(metric_insert_numeric, d->whence);
808           PQclear(d->res);
809           break;
810         case METRIC_STRING:
811           GET_QUERY(metric_insert_text);
812           PG_TM_EXEC(metric_insert_text, d->whence);
813           PQclear(d->res);
814           break;
815         default:
816           goto bad_row;
817       }
818       break;
819     default:
820       /* should never get here */
821       goto bad_row;
822   }
823   return DS_EXEC_SUCCESS;
824  bad_row:
825   return DS_EXEC_ROW_FAILED;
826 }
827 static int
828 stratcon_database_post_connect(conn_q *cq) {
829   int rv = 0;
830   ds_single_detail _d = { 0 }, *d = &_d;
831   if(cq->fqdn) {
832     char *remote_str, *remote_cn;
833     /* This is the silly way we get null's in through our declare_param_str */
834     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
835     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
836     /* This is a storage node, it gets the storage node post_connect */
837     GET_QUERY(storage_post_connect);
838     rv = -1; /* now we're serious */
839     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
840     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
841     PG_EXEC(storage_post_connect);
842     PQclear(d->res);
843     rv = 0;
844   }
845   else {
846     /* Metanode post_connect */
847     GET_QUERY(metanode_post_connect);
848     rv = -1; /* now we're serious */
849     PG_EXEC(metanode_post_connect);
850     PQclear(d->res);
851     rv = 0;
852   }
853  bad_row:
854   free_params(d);
855   if(rv == -1) {
856     /* Post-connect intentions are serious and fatal */
857     PQfinish(cq->dbh);
858     cq->dbh = NULL;
859   }
860   return rv;
861 }
862 static int
863 stratcon_database_connect(conn_q *cq) {
864   char *dsn, dsn_meta[512];
865   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
866   const char *k, *v;
867   int klen;
868   noit_hash_table *t;
869
870   dsn_meta[0] = '\0';
871   if(!cq->dsn) {
872     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
873     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
874       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
875       strlcat(dsn_meta, k, sizeof(dsn_meta));
876       strlcat(dsn_meta, "=", sizeof(dsn_meta));
877       strlcat(dsn_meta, v, sizeof(dsn_meta));
878     }
879     noit_hash_destroy(t, free, free);
880     free(t);
881     dsn = dsn_meta;
882   }
883   else dsn = cq->dsn;
884
885   if(cq->dbh) {
886     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
887     PQreset(cq->dbh);
888     if(PQstatus(cq->dbh) != CONNECTION_OK) {
889       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
890             dsn, PQerrorMessage(cq->dbh));
891       return -1;
892     }
893     if(stratcon_database_post_connect(cq)) return -1;
894     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
895     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
896           dsn, PQerrorMessage(cq->dbh));
897     return -1;
898   }
899
900   cq->dbh = PQconnectdb(dsn);
901   if(!cq->dbh) return -1;
902   if(PQstatus(cq->dbh) != CONNECTION_OK) {
903     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
904           dsn, PQerrorMessage(cq->dbh));
905     return -1;
906   }
907   if(stratcon_database_post_connect(cq)) return -1;
908   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
909   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
910         dsn, PQerrorMessage(cq->dbh));
911   return -1;
912 }
913 static int
914 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
915                                 const char *name) {
916   int rv = -1;
917   PGresult *res;
918   char cmd[128];
919   strlcpy(cmd, p, sizeof(cmd));
920   strlcat(cmd, name, sizeof(cmd));
921   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
922   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
923   PQclear(res);
924   return rv;
925 }
926 static int
927 stratcon_datastore_do(conn_q *cq, const char *cmd) {
928   PGresult *res;
929   int rv = -1;
930   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
931   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
932   PQclear(res);
933   return rv;
934 }
935 #define BUSTED(cq) do { \
936   PQfinish((cq)->dbh); \
937   (cq)->dbh = NULL; \
938   goto full_monty; \
939 } while(0)
940 #define SAVEPOINT(name) do { \
941   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
942   last_sp = current; \
943 } while(0)
944 #define ROLLBACK_TO_SAVEPOINT(name) do { \
945   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
946     BUSTED(cq); \
947   last_sp = NULL; \
948 } while(0)
949 #define RELEASE_SAVEPOINT(name) do { \
950   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
951     BUSTED(cq); \
952   last_sp = NULL; \
953 } while(0)
954 int
955 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
956                                  struct timeval *now) {
957   ds_rt_detail *dsjd = closure;
958   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
959   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
960
961   assert(dsjd->rt);
962   stratcon_datastore_find(dsjd);
963   if(dsjd->completion_event)
964     eventer_add(dsjd->completion_event);
965
966   free_params((ds_single_detail *)dsjd);
967   free(dsjd);
968   return 0;
969 }
970 static const char *
971 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
972   void *vinfo;
973   const char *dsn = NULL, *fqdn = NULL;
974   int found = 0;
975   storagenode_info *info = NULL;
976   pthread_mutex_lock(&storagenode_to_info_cache_lock);
977   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
978                         &vinfo)) {
979     found = 1;
980     info = vinfo;
981   }
982   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
983   if(found) {
984     if(fqdn_out) *fqdn_out = info->fqdn;
985     return info->dsn;
986   }
987
988   if(!found && can_use_db) {
989     ds_single_detail *d;
990     conn_q *cq;
991     int row_count;
992     /* Look it up and store it */
993     d = calloc(1, sizeof(*d));
994     cq = get_conn_q_for_metanode();
995     GET_QUERY(find_storage);
996     DECLARE_PARAM_INT(id);
997     PG_EXEC(find_storage);
998     row_count = PQntuples(d->res);
999     if(row_count) {
1000       PG_GET_STR_COL(dsn, 0, "dsn");
1001       PG_GET_STR_COL(fqdn, 0, "fqdn");
1002     }
1003     PQclear(d->res);
1004    bad_row:
1005     free_params(d);
1006     free(d);
1007     release_conn_q(cq);
1008   }
1009   if(fqdn) {
1010     info = calloc(1, sizeof(*info));
1011     info->fqdn = strdup(fqdn);
1012     if(fqdn_out) *fqdn_out = info->fqdn;
1013     info->dsn = dsn ? strdup(dsn) : NULL;
1014     info->storagenode_id = id;
1015     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1016     noit_hash_store(&storagenode_to_info_cache,
1017                     (void *)&info->storagenode_id, sizeof(int), info);
1018     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1019   }
1020   return info ? info->dsn : NULL;
1021 }
1022 static ds_line_detail *
1023 build_insert_batch(interim_journal_t *ij) {
1024   int rv;
1025   off_t len;
1026   const char *buff, *cp, *lcp;
1027   struct stat st;
1028   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1029
1030   if(ij->fd < 0) {
1031     ij->fd = open(ij->filename, O_RDONLY);
1032     if(ij->fd < 0) {
1033       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1034             ij->filename, strerror(errno));
1035       assert(ij->fd >= 0);
1036     }
1037   }
1038   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1039   if(rv == -1) {
1040       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1041             ij->filename, strerror(errno));
1042     assert(rv != -1);
1043   }
1044   len = st.st_size;
1045   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1046   if(buff == (void *)-1) {
1047     noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1048           ij->filename, strerror(errno));
1049     assert(buff != (void *)-1);
1050   }
1051   lcp = buff;
1052   while(lcp < (buff + len) &&
1053         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1054     next = calloc(1, sizeof(*next));
1055     next->data = malloc(cp - lcp + 1);
1056     memcpy(next->data, lcp, cp - lcp);
1057     next->data[cp - lcp] = '\0';
1058     if(!head) head = next;
1059     if(last) last->next = next;
1060     last = next;
1061     lcp = cp + 1;
1062   }
1063   munmap((void *)buff, len);
1064   close(ij->fd);
1065   return head;
1066 }
1067 static void
1068 interim_journal_remove(interim_journal_t *ij) {
1069   unlink(ij->filename);
1070   if(ij->filename) free(ij->filename);
1071   if(ij->remote_str) free(ij->remote_str);
1072   if(ij->remote_cn) free(ij->remote_cn);
1073   if(ij->fqdn) free(ij->fqdn);
1074 }
1075 int
1076 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1077                                   struct timeval *now) {
1078   int i, total, success, sp_total, sp_success;
1079   interim_journal_t *ij;
1080   ds_line_detail *head = NULL, *current, *last_sp;
1081   const char *dsn;
1082   conn_q *cq;
1083   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1084   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1085
1086   ij = closure;
1087   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1088   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1089   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1090                              ij->fqdn, dsn);
1091   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1092         ij->remote_str, ij->remote_cn, ij->fqdn);
1093  full_monty:
1094   /* Make sure we have a connection */
1095   i = 1;
1096   while(stratcon_database_connect(cq)) {
1097     noitL(noit_error, "Error connecting to database: %s\n",
1098           ij->fqdn ? ij->fqdn : "(null)");
1099     sleep(i);
1100     i *= 2;
1101     i = MIN(i, 16);
1102   }
1103
1104   if(head == NULL) head = build_insert_batch(ij);
1105   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1106         ij->remote_str ? ij->remote_str : "(null)",
1107         ij->remote_cn ? ij->remote_cn : "(null)",
1108         ij->fqdn ? ij->fqdn : "(null)");
1109   current = head;
1110   last_sp = NULL;
1111   total = success = sp_total = sp_success = 0;
1112   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1113   while(current) {
1114     execute_outcome_t rv;
1115     if(current->data) {
1116       if(!last_sp) {
1117         SAVEPOINT("batch");
1118         sp_success = success;
1119         sp_total = total;
1120       }
1121  
1122       if(current->problematic) {
1123         RELEASE_SAVEPOINT("batch");
1124         current = current->next;
1125         total++;
1126         continue;
1127       }
1128       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1129                                       current);
1130       switch(rv) {
1131         case DS_EXEC_SUCCESS:
1132           total++;
1133           success++;
1134           current = current->next;
1135           break;
1136         case DS_EXEC_ROW_FAILED:
1137           /* rollback to savepoint, mark this record as bad and start again */
1138           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1139           current->problematic = 1;
1140           current = last_sp;
1141           success = sp_success;
1142           total = sp_total;
1143           ROLLBACK_TO_SAVEPOINT("batch");
1144           break;
1145         case DS_EXEC_TXN_FAILED:
1146           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1147           BUSTED(cq);
1148       }
1149     }
1150   }
1151   if(last_sp) RELEASE_SAVEPOINT("batch");
1152   if(stratcon_datastore_do(cq, "COMMIT")) {
1153     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1154     BUSTED(cq);
1155   }
1156   /* Cleanup the mess */
1157   while(head) {
1158     ds_line_detail *tofree;
1159     tofree = head;
1160     head = head->next;
1161     if(tofree->data) free(tofree->data);
1162     free_params((ds_single_detail *)tofree);
1163     free(tofree);
1164   }
1165   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1166         ij->remote_str ? ij->remote_str : "(null)",
1167         ij->remote_cn ? ij->remote_cn : "(null)",
1168         ij->fqdn ? ij->fqdn : "(null)", success, total);
1169   interim_journal_remove(ij);
1170   release_conn_q(cq);
1171   return 0;
1172 }
1173 static int
1174 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1175                                 struct timeval *now) {
1176   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1177   const char *k;
1178   int klen;
1179   void *vij;
1180   interim_journal_t *ij;
1181   syncset_t *syncset = closure;
1182
1183   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1184   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1185
1186   noitL(ds_deb, "Syncing journal sets...\n");
1187   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1188     eventer_t ingest;
1189     ij = vij;
1190     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1191           ij->remote_str, ij->remote_cn, ij->fqdn);
1192     fsync(ij->fd);
1193     close(ij->fd);
1194     ij->fd = -1;
1195     ingest = eventer_alloc();
1196     ingest->mask = EVENTER_ASYNCH;
1197     ingest->callback = stratcon_datastore_asynch_execute;
1198     ingest->closure = ij;
1199     eventer_add_asynch(ij->cpool->jobq, ingest);
1200   }
1201   noit_hash_destroy(syncset->ws, free, NULL);
1202   free(syncset->ws);
1203   eventer_add(syncset->completion);
1204   eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1205   free(syncset);
1206   return 0;
1207 }
1208 static interim_journal_t *
1209 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1210                     int storagenode_id, const char *fqdn_in) {
1211   void *vhash, *vij;
1212   noit_hash_table *working_set;
1213   interim_journal_t *ij;
1214   struct timeval now;
1215   char jpath[PATH_MAX];
1216   char remote_str[128];
1217   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1218   const char *fqdn = fqdn_in ? fqdn_in : "default";
1219
1220   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1221   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1222
1223   /* Lookup the working set */
1224   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1225     working_set = calloc(1, sizeof(*working_set));
1226     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1227                     working_set);
1228   }
1229   else
1230     working_set = vhash;
1231
1232   /* Lookup the interim journal within the working set */
1233   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1234     ij = calloc(1, sizeof(*ij));
1235     gettimeofday(&now, NULL);
1236     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1237              basejpath, remote_str, remote_cn, storagenode_id,
1238              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1239     ij->remote_str = strdup(remote_str);
1240     ij->remote_cn = strdup(remote_cn);
1241     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1242     ij->storagenode_id = storagenode_id;
1243     ij->filename = strdup(jpath);
1244     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1245                                          ij->fqdn);
1246     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1247     if(ij->fd < 0 && errno == ENOENT) {
1248       if(mkdir_for_file(ij->filename, 0750)) {
1249         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1250               ij->filename, strerror(errno));
1251         exit(-1);
1252       }
1253       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1254     }
1255     if(ij->fd < 0) {
1256       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1257             ij->filename, strerror(errno));
1258       exit(-1);
1259     }
1260     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1261   }
1262   else
1263     ij = vij;
1264
1265   return ij;
1266 }
1267 static int
1268 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1269                           int *sid_out, int *storagenode_id_out,
1270                           const char **remote_cn_out,
1271                           const char **fqdn_out, const char **dsn_out) {
1272   /* only called from the main thread -- no safety issues */
1273   void *vuuidinfo, *vinfo;
1274   uuid_info *uuidinfo;
1275   storagenode_info *info = NULL;
1276   char *fqdn = NULL;
1277   char *dsn = NULL;
1278   int storagenode_id = 0, sid = 0;
1279   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1280                          &vuuidinfo)) {
1281     int row_count = 0;
1282     char *tmpint;
1283     ds_single_detail *d;
1284     conn_q *cq;
1285
1286     /* We can't do a database lookup without the remote_cn */
1287     if(!remote_cn) return -1;
1288
1289     d = calloc(1, sizeof(*d));
1290     cq = get_conn_q_for_metanode();
1291     if(stratcon_database_connect(cq) == 0) {
1292       /* Blocking call to service the cache miss */
1293       GET_QUERY(check_map);
1294       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1295       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1296       PG_EXEC(check_map);
1297       row_count = PQntuples(d->res);
1298       if(row_count != 1) {
1299         PQclear(d->res);
1300         goto bad_row;
1301       }
1302       PG_GET_STR_COL(tmpint, 0, "sid");
1303       sid = atoi(tmpint);
1304       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1305       if(tmpint) storagenode_id = atoi(tmpint);
1306       PG_GET_STR_COL(fqdn, 0, "fqdn");
1307       PG_GET_STR_COL(dsn, 0, "dsn");
1308       PQclear(d->res);
1309     }
1310    bad_row:
1311     free_params((ds_single_detail *)d);
1312     free(d);
1313     release_conn_q(cq);
1314     if(row_count != 1) return -1;
1315     /* Place in cache */
1316     if(fqdn) fqdn = strdup(fqdn);
1317     uuidinfo = calloc(1, sizeof(*uuidinfo));
1318     uuidinfo->sid = sid;
1319     uuidinfo->uuid_str = strdup(uuid_str);
1320     uuidinfo->storagenode_id = storagenode_id;
1321     uuidinfo->remote_cn = strdup(remote_cn);
1322     noit_hash_store(&uuid_to_info_cache,
1323                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1324     /* Also, we may have just witnessed a new storage node, store it */
1325     if(storagenode_id) {
1326       int needs_free = 0;
1327       info = calloc(1, sizeof(*info));
1328       info->storagenode_id = storagenode_id;
1329       info->dsn = dsn ? strdup(dsn) : NULL;
1330       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1331       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1332       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1333                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1334         /* hack to save memory -- we *never* remove from these caches,
1335            so we can use the same fqdn value in the above cache for the key
1336            in the cache below -- (no strdup) */
1337         noit_hash_store(&storagenode_to_info_cache,
1338                         (void *)&info->storagenode_id, sizeof(int), info);
1339       }
1340       else needs_free = 1;
1341       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1342       if(needs_free) {
1343         if(info->dsn) free(info->dsn);
1344         if(info->fqdn) free(info->fqdn);
1345         free(info);
1346       }
1347     }
1348   }
1349   else
1350     uuidinfo = vuuidinfo;
1351
1352   if(uuidinfo && uuidinfo->storagenode_id) {
1353     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1354       /* we don't have dsn and we actually want it */
1355       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1356       if(noit_hash_retrieve(&storagenode_to_info_cache,
1357                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1358                             &vinfo))
1359         info = vinfo;
1360       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1361     }
1362   }
1363
1364   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1365   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1366   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1367   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1368   if(sid_out) *sid_out = uuidinfo->sid;
1369   return 0;
1370 }
1371 static int
1372 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1373   char uuid_str[UUID_STR_LEN+1];
1374   int sid = 0;
1375   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1376   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1377   return sid;
1378 }
1379 static void
1380 stratcon_datastore_journal(struct sockaddr *remote,
1381                            const char *remote_cn, const char *line) {
1382   interim_journal_t *ij = NULL;
1383   char uuid_str[UUID_STR_LEN+1], *cp;
1384   const char *fqdn = NULL, *dsn = NULL;
1385   int storagenode_id = 0;
1386   uuid_t checkid;
1387   if(!line) return;
1388   /* if it is a UUID based thing, find the storage node */
1389   switch(*line) {
1390     case 'C':
1391     case 'S':
1392     case 'M':
1393       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1394         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1395         if(!uuid_parse(uuid_str, checkid)) {
1396           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1397                                     &storagenode_id, NULL, &fqdn, &dsn);
1398           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1399         }
1400       }
1401       break;
1402     case 'n':
1403       ij = interim_journal_get(remote,remote_cn,0,NULL);
1404       break;
1405     default:
1406       break;
1407   }
1408   if(!ij) {
1409     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1410   }
1411   else {
1412     int len;
1413     len = write(ij->fd, line, strlen(line));
1414     if(len < 0) {
1415       noitL(noit_error, "write to %s failed: %s\n",
1416             ij->filename, strerror(errno));
1417     }
1418   }
1419   return;
1420 }
1421 static noit_hash_table *
1422 stratcon_datastore_journal_remove(struct sockaddr *remote,
1423                                   const char *remote_cn) {
1424   void *vhash = NULL;
1425   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1426     /* pluck it out */
1427     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1428   }
1429   else {
1430     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1431           remote_cn);
1432     abort();
1433   }
1434   return vhash;
1435 }
1436 void
1437 stratcon_datastore_push(stratcon_datastore_op_t op,
1438                         struct sockaddr *remote,
1439                         const char *remote_cn, void *operand,
1440                         eventer_t completion) {
1441   conn_pool *cpool;
1442   syncset_t *syncset;
1443   eventer_t e;
1444   ds_rt_detail *rtdetail;
1445   struct datastore_onlooker_list *nnode;
1446
1447   for(nnode = onlookers; nnode; nnode = nnode->next)
1448     nnode->dispatch(op,remote,remote_cn,operand);
1449
1450   switch(op) {
1451     case DS_OP_INSERT:
1452       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1453       break;
1454     case DS_OP_CHKPT:
1455       e = eventer_alloc();
1456       syncset = calloc(1, sizeof(*syncset));
1457       e->mask = EVENTER_ASYNCH;
1458       e->callback = stratcon_datastore_journal_sync;
1459       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1460       syncset->completion = completion;
1461       e->closure = syncset;
1462       eventer_add(e);
1463       break;
1464     case DS_OP_FIND_COMPLETE:
1465       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1466       rtdetail = calloc(1, sizeof(*rtdetail));
1467       rtdetail->rt = operand;
1468       rtdetail->completion_event = completion;
1469       e = eventer_alloc();
1470       e->mask = EVENTER_ASYNCH;
1471       e->callback = stratcon_datastore_asynch_lookup;
1472       e->closure = rtdetail;
1473       eventer_add_asynch(cpool->jobq, e);
1474       break;
1475   }
1476 }
1477
1478 int
1479 stratcon_datastore_saveconfig(void *unused) {
1480   int rv = -1;
1481   char *buff;
1482   ds_single_detail _d = { 0 }, *d = &_d;
1483   conn_q *cq;
1484   cq = get_conn_q_for_metanode();
1485
1486   if(stratcon_database_connect(cq) == 0) {
1487     char time_as_str[20];
1488     size_t len;
1489     buff = noit_conf_xml_in_mem(&len);
1490     if(!buff) goto bad_row;
1491
1492     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1493     DECLARE_PARAM_STR("0.0.0.0", 7);
1494     DECLARE_PARAM_STR("", 0);
1495     DECLARE_PARAM_STR("stratcond", 9);
1496     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1497     DECLARE_PARAM_STR(buff, len);
1498     free(buff);
1499
1500     GET_QUERY(config_insert);
1501     PG_EXEC(config_insert);
1502     PQclear(d->res);
1503     rv = 0;
1504
1505     bad_row:
1506       free_params(d);
1507   }
1508   release_conn_q(cq);
1509   return rv;
1510 }
1511
1512 void
1513 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1514                                                struct sockaddr *,
1515                                                const char *, void *)) {
1516   struct datastore_onlooker_list *nnode;
1517   nnode = calloc(1, sizeof(*nnode));
1518   nnode->dispatch = f;
1519   nnode->next = onlookers;
1520   while(noit_atomic_casptr((volatile void **)&onlookers,
1521                            nnode, nnode->next) != (void *)nnode->next)
1522     nnode->next = onlookers;
1523 }
1524 static void
1525 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1526                                          char *id_str, char *file) {
1527   char path[PATH_MAX];
1528   interim_journal_t *ij;
1529   eventer_t ingest;
1530
1531   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1532            basejpath, remote_str, remote_cn, id_str, file);
1533   ij = calloc(1, sizeof(*ij));
1534   ij->fd = open(path, O_RDONLY);
1535   if(ij->fd < 0) {
1536     noitL(noit_error, "cannot open journal '%s': %s\n",
1537           path, strerror(errno));
1538     free(ij);
1539     return;
1540   }
1541   close(ij->fd);
1542   ij->fd = -1;
1543   ij->filename = strdup(path);
1544   ij->remote_str = strdup(remote_str);
1545   ij->remote_cn = strdup(remote_cn);
1546   ij->storagenode_id = atoi(id_str);
1547   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1548                                        ij->fqdn);
1549   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1550   ingest = eventer_alloc();
1551   ingest->mask = EVENTER_ASYNCH;
1552   ingest->callback = stratcon_datastore_asynch_execute;
1553   ingest->closure = ij;
1554   eventer_add_asynch(ij->cpool->jobq, ingest);
1555 }
1556 static void
1557 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1558   char path[PATH_MAX];
1559   DIR *root;
1560   struct dirent *de, *entry;
1561   int i = 0, cnt = 0;
1562   char **entries;
1563   int size = 0;
1564
1565   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1566            first ? "/" : "", first ? first : "",
1567            second ? "/" : "", second ? second : "",
1568            third ? "/" : "", third ? third : "");
1569 #ifdef _PC_NAME_MAX
1570   size = pathconf(path, _PC_NAME_MAX);
1571 #endif
1572   size = MIN(size, PATH_MAX + 128);
1573   de = alloca(size);
1574   root = opendir(path);
1575   if(!root) return;
1576   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) cnt++;
1577   closedir(root);
1578   root = opendir(path);
1579   if(!root) return;
1580   entries = malloc(sizeof(*entries) * cnt);
1581   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) {
1582     if(i < cnt) {
1583       entries[i++] = strdup(entry->d_name);
1584     }
1585   }
1586   closedir(root);
1587   cnt = i; /* could have changed, directories are fickle */
1588   qsort(entries, i, sizeof(*entries),
1589         (int (*)(const void *, const void *))strcasecmp);
1590   for(i=0; i<cnt; i++) {
1591     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1592     noitL(ds_deb, "Processing L%d entry '%s'\n",
1593           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1594     if(!first)
1595       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1596     else if(!second)
1597       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1598     else if(!third)
1599       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1600     else if(strlen(entries[i]) == 16)
1601       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1602   }
1603 }
1604 static void
1605 stratcon_datastore_sweep_journals() {
1606   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1607 }
1608
1609 int
1610 stratcon_datastore_ingest_all_storagenode_info() {
1611   int i, cnt = 0;
1612   ds_single_detail _d = { 0 }, *d = &_d;
1613   conn_q *cq;
1614   cq = get_conn_q_for_metanode();
1615
1616   while(stratcon_database_connect(cq)) {
1617     noitL(noit_error, "Error connecting to database\n");
1618     sleep(1);
1619   }
1620
1621   GET_QUERY(all_storage);
1622   PG_EXEC(all_storage);
1623   cnt = PQntuples(d->res);
1624   for(i=0; i<cnt; i++) {
1625     void *vinfo;
1626     char *tmpint, *fqdn, *dsn;
1627     int storagenode_id;
1628     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1629     storagenode_id = atoi(tmpint);
1630     PG_GET_STR_COL(fqdn, i, "fqdn");
1631     PG_GET_STR_COL(dsn, i, "dsn");
1632     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1633     storagenode_id = tmpint ? atoi(tmpint) : 0;
1634
1635     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1636                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1637       storagenode_info *info;
1638       info = calloc(1, sizeof(*info));
1639       info->storagenode_id = storagenode_id;
1640       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1641       info->dsn = dsn ? strdup(dsn) : NULL;
1642       noit_hash_store(&storagenode_to_info_cache,
1643                       (void *)&info->storagenode_id, sizeof(int), info);
1644       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1645             info->storagenode_id,
1646             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1647     }
1648   }
1649   PQclear(d->res);
1650  bad_row:
1651   free_params(d);
1652
1653   release_conn_q(cq);
1654   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1655   return cnt;
1656 }
1657 int
1658 stratcon_datastore_ingest_all_check_info() {
1659   int i, cnt, loaded = 0;
1660   ds_single_detail _d = { 0 }, *d = &_d;
1661   conn_q *cq;
1662   cq = get_conn_q_for_metanode();
1663
1664   while(stratcon_database_connect(cq)) {
1665     noitL(noit_error, "Error connecting to database\n");
1666     sleep(1);
1667   }
1668
1669   GET_QUERY(check_mapall);
1670   PG_EXEC(check_mapall);
1671   cnt = PQntuples(d->res);
1672   for(i=0; i<cnt; i++) {
1673     void *vinfo;
1674     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1675     int sid, storagenode_id;
1676     uuid_info *uuidinfo;
1677     PG_GET_STR_COL(uuid_str, i, "id");
1678     if(!uuid_str) continue;
1679     PG_GET_STR_COL(tmpint, i, "sid");
1680     if(!tmpint) continue;
1681     sid = atoi(tmpint);
1682     PG_GET_STR_COL(fqdn, i, "fqdn");
1683     PG_GET_STR_COL(dsn, i, "dsn");
1684     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1685     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1686     storagenode_id = tmpint ? atoi(tmpint) : 0;
1687
1688     uuidinfo = calloc(1, sizeof(*uuidinfo));
1689     uuidinfo->uuid_str = strdup(uuid_str);
1690     uuidinfo->remote_cn = strdup(remote_cn);
1691     uuidinfo->storagenode_id = storagenode_id;
1692     uuidinfo->sid = sid;
1693     noit_hash_store(&uuid_to_info_cache,
1694                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1695     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1696           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1697     loaded++;
1698     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1699                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1700       storagenode_info *info;
1701       info = calloc(1, sizeof(*info));
1702       info->storagenode_id = storagenode_id;
1703       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1704       info->dsn = dsn ? strdup(dsn) : NULL;
1705       noit_hash_store(&storagenode_to_info_cache,
1706                       (void *)&info->storagenode_id, sizeof(int), info);
1707       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1708             info->storagenode_id,
1709             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1710     }
1711   }
1712   PQclear(d->res);
1713  bad_row:
1714   free_params(d);
1715
1716   release_conn_q(cq);
1717   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1718   return loaded;
1719 }
1720
1721 static int
1722 rest_get_noit_config(noit_http_rest_closure_t *restc,
1723                      int npats, char **pats) {
1724   noit_http_session_ctx *ctx = restc->http_ctx;
1725   ds_single_detail *d;
1726   int row_count = 0;
1727   const char *xml = NULL;
1728   conn_q *cq = NULL;
1729
1730   if(npats != 0) {
1731     noit_http_response_server_error(ctx, "text/xml");
1732     noit_http_response_end(ctx);
1733     return 0;
1734   }
1735   d = calloc(1, sizeof(*d));
1736   GET_QUERY(config_get);
1737   cq = get_conn_q_for_metanode();
1738   if(!cq) {
1739     noit_http_response_server_error(ctx, "text/xml");
1740     goto bad_row;
1741   }
1742
1743   DECLARE_PARAM_STR(restc->remote_cn,
1744                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1745   PG_EXEC(config_get);
1746   row_count = PQntuples(d->res);
1747   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1748
1749   if(xml == NULL) {
1750     char buff[1024];
1751     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1752                                  "<row_count>%d</row_count></error>\n",
1753              restc->remote_cn, row_count);
1754     noit_http_response_append(ctx, buff, strlen(buff));
1755     noit_http_response_not_found(ctx, "text/xml");
1756   }
1757   else {
1758     noit_http_response_append(ctx, xml, strlen(xml));
1759     noit_http_response_ok(ctx, "text/xml");
1760   }
1761  bad_row:
1762   free_params((ds_single_detail *)d);
1763   d->nparams = 0;
1764   if(cq) release_conn_q(cq);
1765
1766   noit_http_response_end(ctx);
1767   return 0;
1768 }
1769
1770 void
1771 stratcon_datastore_init() {
1772   pthread_mutex_init(&ds_conns_lock, NULL);
1773   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1774   ds_err = noit_log_stream_find("error/datastore");
1775   ds_deb = noit_log_stream_find("debug/datastore");
1776   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1777   ingest_err = noit_log_stream_find("error/ingest");
1778   if(!ds_err) ds_err = noit_error;
1779   if(!ingest_err) ingest_err = noit_error;
1780   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1781                            &basejpath)) {
1782     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1783     exit(-1);
1784   }
1785   stratcon_datastore_ingest_all_check_info();
1786   stratcon_datastore_ingest_all_storagenode_info();
1787   stratcon_datastore_sweep_journals();
1788
1789   assert(noit_http_rest_register_auth(
1790     "GET", "/noits/", "^config$", rest_get_noit_config,
1791              noit_http_rest_client_cert_auth
1792   ) == 0);
1793 }
1794
Note: See TracBrowser for help on using the browser.