root/src/stratcon_datastore.c

Revision 7a8eec37fbb0637d35006674681160506259391f, 58.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

logging fixes and line numbers, refs #42

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 int
209 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
210   char name[128] = "";
211   buff[0] = '\0';
212   if(remote) {
213     int len = 0;
214     switch(remote->sa_family) {
215       case AF_INET:
216         len = sizeof(struct sockaddr_in);
217         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
218                   name, len);
219         break;
220       case AF_INET6:
221        len = sizeof(struct sockaddr_in6);
222         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
223                   name, len);
224        break;
225       case AF_UNIX:
226         len = SUN_LEN(((struct sockaddr_un *)remote));
227         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
228         break;
229       default: return 0;
230     }
231   }
232   strlcpy(buff, name, blen);
233   return strlen(buff);
234 }
235
236 /* Thread-safe connection pools */
237
238 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
239 static void
240 release_conn_q_forceable(conn_q *cq, int forcefree) {
241   int putback = 0;
242   cq->last_use = time(NULL);
243   pthread_mutex_lock(&cq->pool->lock);
244   cq->pool->outstanding--;
245   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
246     putback = 1;
247     cq->next = cq->pool->head;
248     cq->pool->head = cq;
249     cq->pool->in_pool++;
250   }
251   pthread_mutex_unlock(&cq->pool->lock);
252   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
253         putback ? "to pool" : "and destroy", cq->pool->queue_name);
254   pthread_cond_signal(&cq->pool->cv);
255   if(putback) return;
256
257   /* Not put back, release it */
258   if(cq->dbh) PQfinish(cq->dbh);
259   if(cq->remote_str) free(cq->remote_str);
260   if(cq->remote_cn) free(cq->remote_cn);
261   if(cq->fqdn) free(cq->fqdn);
262   if(cq->dsn) free(cq->dsn);
263   free(cq);
264 }
265 static void
266 ttl_purge_conn_pool(conn_pool *pool) {
267   int old_cnt, new_cnt;
268   time_t now = time(NULL);
269   conn_q *cq, *prev = NULL, *iter;
270   /* because we always replace on the head and update the last_use time when
271      doing so, we know they are ordered LRU on the end.  So, once we hit an
272      old one, we know all the others are old too.
273    */
274   if(!pool->head) return; /* hack short circuit for no locks */
275   pthread_mutex_lock(&pool->lock);
276   old_cnt = pool->in_pool;
277   cq = pool->head;
278   while(cq) {
279     if(cq->last_use + cq->pool->ttl < now) {
280       if(prev) prev->next = NULL;
281       else pool->head = NULL;
282       break;
283     }
284     prev = cq;
285     cq = cq->next;
286   }
287   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
288   /* Fix accounting */
289   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
290   new_cnt = pool->in_pool;
291   pthread_mutex_unlock(&pool->lock);
292
293   /* Force release these without holding the lock */
294   while(cq) {
295     prev = cq;
296     cq = cq->next;
297     release_conn_q_forceable(cq, 1);
298   }
299   if(old_cnt != new_cnt)
300     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
301           pool->queue_name);
302 }
303 static void
304 release_conn_q(conn_q *cq) {
305   ttl_purge_conn_pool(cq->pool);
306   release_conn_q_forceable(cq, 0);
307 }
308 static conn_pool *
309 get_conn_pool_for_remote(const char *remote_str,
310                          const char *remote_cn, const char *fqdn) {
311   void *vcpool;
312   conn_pool *cpool = NULL;
313   char queue_name[256] = "datastore_";
314   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
315            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
316            fqdn ? fqdn : "default",
317            remote_cn ? remote_cn : "default");
318   pthread_mutex_lock(&ds_conns_lock);
319   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
320                         strlen(queue_name), &vcpool))
321     cpool = vcpool;
322   pthread_mutex_unlock(&ds_conns_lock);
323   if(!cpool) {
324     vcpool = cpool = calloc(1, sizeof(*cpool));
325     cpool->queue_name = strdup(queue_name);
326     pthread_mutex_init(&cpool->lock, NULL);
327     pthread_cond_init(&cpool->cv, NULL);
328     cpool->in_pool = 0;
329     cpool->outstanding = 0;
330     cpool->max_in_pool = 1;
331     cpool->max_allocated = 1;
332     pthread_mutex_lock(&ds_conns_lock);
333     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
334                         cpool)) {
335       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
336                          strlen(queue_name), &vcpool);
337     }
338     pthread_mutex_unlock(&ds_conns_lock);
339     if(vcpool != cpool) {
340       /* someone beat us to it */
341       free(cpool->queue_name);
342       pthread_mutex_destroy(&cpool->lock);
343       pthread_cond_destroy(&cpool->cv);
344       free(cpool);
345     }
346     else {
347       int i;
348       /* Our job to setup the pool */
349       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
350       eventer_jobq_init(cpool->jobq, queue_name);
351       cpool->jobq->backq = eventer_default_backq();
352       /* Add one thread */
353       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
354         eventer_jobq_increase_concurrency(cpool->jobq);
355     }
356     cpool = vcpool;
357   }
358   return cpool;
359 }
360 static conn_q *
361 get_conn_q_for_remote(const char *remote_str,
362                       const char *remote_cn, const char *fqdn,
363                       const char *dsn) {
364   conn_pool *cpool;
365   conn_q *cq;
366   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
367   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
368         cpool->queue_name);
369   pthread_mutex_lock(&cpool->lock);
370  again:
371   if(cpool->head) {
372     assert(cpool->in_pool > 0);
373     cq = cpool->head;
374     cpool->head = cq->next;
375     cpool->in_pool--;
376     cpool->outstanding++;
377     cq->next = NULL;
378     pthread_mutex_unlock(&cpool->lock);
379     return cq;
380   }
381   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
382     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
383           (void *)pthread_self(), cpool->queue_name);
384     pthread_cond_wait(&cpool->cv, &cpool->lock);
385     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
386           (void *)pthread_self(), cpool->queue_name);
387     goto again;
388   }
389   else {
390     cpool->outstanding++;
391     pthread_mutex_unlock(&cpool->lock);
392   }
393  
394   cq = calloc(1, sizeof(*cq));
395   cq->pool = cpool;
396   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
397   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
398   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
399   cq->dsn = dsn ? strdup(dsn) : NULL;
400   return cq;
401 }
402 static conn_q *
403 get_conn_q_for_metanode() {
404   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
405 }
406
407 typedef enum {
408   DS_EXEC_SUCCESS = 0,
409   DS_EXEC_ROW_FAILED = 1,
410   DS_EXEC_TXN_FAILED = 2,
411 } execute_outcome_t;
412
413 #define DECLARE_PARAM_STR(str, len) do { \
414   d->paramValues[d->nparams] = noit__strndup(str, len); \
415   d->paramLengths[d->nparams] = len; \
416   d->paramFormats[d->nparams] = 0; \
417   d->paramAllocd[d->nparams] = 1; \
418   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
419     free(d->paramValues[d->nparams]); \
420     d->paramValues[d->nparams] = NULL; \
421     d->paramLengths[d->nparams] = 0; \
422     d->paramAllocd[d->nparams] = 0; \
423   } \
424   d->nparams++; \
425 } while(0)
426 #define DECLARE_PARAM_INT(i) do { \
427   int buffer__len; \
428   char buffer__[32]; \
429   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
430   buffer__len = strlen(buffer__); \
431   DECLARE_PARAM_STR(buffer__, buffer__len); \
432 } while(0)
433
434 #define PG_GET_STR_COL(dest, row, name) do { \
435   int colnum = PQfnumber(d->res, name); \
436   dest = NULL; \
437   if (colnum >= 0) \
438     dest = PQgetisnull(d->res, row, colnum) \
439          ? NULL : PQgetvalue(d->res, row, colnum); \
440 } while(0)
441
442 #define PG_EXEC(cmd) do { \
443   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
444                         (const char * const *)d->paramValues, \
445                         d->paramLengths, d->paramFormats, 0); \
446   d->rv = PQresultStatus(d->res); \
447   if(d->rv != PGRES_COMMAND_OK && \
448      d->rv != PGRES_TUPLES_OK) { \
449     const char *pgerr = PQresultErrorMessage(d->res); \
450     const char *pgerr_end = strchr(pgerr, '\n'); \
451     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
452     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
453           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
454           (int)(pgerr_end - pgerr), pgerr); \
455     PQclear(d->res); \
456     goto bad_row; \
457   } \
458 } while(0)
459
460 #define PG_TM_EXEC(cmd, whence) do { \
461   time_t __w = whence; \
462   char cmdbuf[4096]; \
463   struct tm tbuf, *tm; \
464   tm = gmtime_r(&__w, &tbuf); \
465   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
466   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
467                         (const char * const *)d->paramValues, \
468                         d->paramLengths, d->paramFormats, 0); \
469   d->rv = PQresultStatus(d->res); \
470   if(d->rv != PGRES_COMMAND_OK && \
471      d->rv != PGRES_TUPLES_OK) { \
472     const char *pgerr = PQresultErrorMessage(d->res); \
473     const char *pgerr_end = strchr(pgerr, '\n'); \
474     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
475     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
476           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
477           (long long unsigned)whence); \
478     PQclear(d->res); \
479     goto bad_row; \
480   } \
481 } while(0)
482
483 static void *
484 stratcon_datastore_check_loadall(void *vsn) {
485   storagenode_info *sn = vsn;
486   ds_single_detail *d;
487   int i, row_count = 0, good = 0;
488   char buff[1024];
489   conn_q *cq = NULL;
490
491   d = calloc(1, sizeof(*d));
492   GET_QUERY(check_loadall);
493   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
494   i = 0;
495   while(stratcon_database_connect(cq)) {
496     if(i++ > 4) {
497       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
498       release_conn_q(cq);
499       return (void *)(vpsized_int)good;
500     }
501     sleep(1);
502   }
503   PG_EXEC(check_loadall);
504   row_count = PQntuples(d->res);
505  
506   for(i=0; i<row_count; i++) {
507     int rv;
508     int8_t family;
509     struct sockaddr *sin;
510     struct sockaddr_in sin4 = { .sin_family = AF_INET };
511     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
512     char *remote, *id, *target, *module, *name;
513     PG_GET_STR_COL(remote, i, "remote_address");
514     PG_GET_STR_COL(id, i, "id");
515     PG_GET_STR_COL(target, i, "target");
516     PG_GET_STR_COL(module, i, "module");
517     PG_GET_STR_COL(name, i, "name");
518     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
519
520     family = AF_INET;
521     sin = (struct sockaddr *)&sin4;
522     rv = inet_pton(family, remote, &sin4.sin_addr);
523     if(rv != 1) {
524       family = AF_INET6;
525       sin = (struct sockaddr *)&sin6;
526       rv = inet_pton(family, remote, &sin6.sin6_addr);
527       if(rv != 1) {
528         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
529         sin = NULL;
530       }
531     }
532
533     /* stratcon_iep_line_processor takes an allocated operand and frees it */
534     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
535     good++;
536   }
537   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
538         good, row_count, sn->fqdn);
539  bad_row:
540   free_params((ds_single_detail *)d);
541   free(d);
542   if(cq) release_conn_q(cq);
543   return (void *)(vpsized_int)good;
544 }
545 static int
546 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
547                                     struct timeval *now) {
548   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
549   pthread_t *jobs = NULL;
550   int nodes, i = 0, tcnt = 0;
551   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
552   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
553
554   pthread_mutex_lock(&storagenode_to_info_cache_lock);
555   nodes = storagenode_to_info_cache.size;
556   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
557   sns = calloc(MAX(1,nodes), sizeof(*sns));
558   if(nodes == 0) sns[nodes++] = &self;
559   else {
560     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
561     const char *k;
562     void *v;
563     int klen;
564     while(noit_hash_next(&storagenode_to_info_cache,
565                          &iter, &k, &klen, &v)) {
566       sns[i++] = (storagenode_info *)v;
567     }
568   }
569   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
570
571   for(i=0; i<nodes; i++) {
572     if(pthread_create(&jobs[i], NULL,
573                       stratcon_datastore_check_loadall, sns[i]) != 0) {
574       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
575     }
576   }
577   for(i=0; i<nodes; i++) {
578     void *good;
579     pthread_join(jobs[i], &good);
580     tcnt += (int)(vpsized_int)good;
581   }
582   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
583   return 0;
584 }
585 void
586 stratcon_datastore_iep_check_preload() {
587   eventer_t e;
588   conn_pool *cpool;
589
590   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
591   e = eventer_alloc();
592   e->mask = EVENTER_ASYNCH;
593   e->callback = stratcon_datastore_asynch_drive_iep;
594   e->closure = NULL;
595   eventer_add_asynch(cpool->jobq, e);
596 }
597 execute_outcome_t
598 stratcon_datastore_find(ds_rt_detail *d) {
599   conn_q *cq;
600   char *val;
601   int row_count;
602   struct realtime_tracker *node;
603
604   for(node = d->rt; node; node = node->next) {
605     char uuid_str[UUID_STR_LEN+1];
606     const char *fqdn, *dsn, *remote_cn;
607     char remote_ip[32];
608     int storagenode_id;
609
610     uuid_unparse_lower(node->checkid, uuid_str);
611     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
612                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
613       continue;
614
615     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n",
616           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
617
618     /* We might be able to find the IP from our config if someone has
619      * specified the expected cn in the noit definition.
620      */
621     if(stratcon_find_noit_ip_by_cn(remote_cn,
622                                    remote_ip, sizeof(remote_ip)) == 0) {
623       node->noit = strdup(remote_ip);
624       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
625       continue;
626     }
627
628     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
629     stratcon_database_connect(cq);
630
631     GET_QUERY(check_find);
632     DECLARE_PARAM_INT(node->sid);
633     PG_EXEC(check_find);
634     row_count = PQntuples(d->res);
635     if(row_count != 1) {
636       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
637       PQclear(d->res);
638       goto bad_row;
639     }
640
641     /* Get the remote_address (which noit owns this) */
642     PG_GET_STR_COL(val, 0, "remote_address");
643     if(!val) {
644       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
645       PQclear(d->res);
646       goto bad_row;
647     }
648     node->noit = strdup(val);
649     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
650    bad_row:
651     free_params((ds_single_detail *)d);
652     d->nparams = 0;
653     release_conn_q(cq);
654   }
655   return DS_EXEC_SUCCESS;
656 }
657 execute_outcome_t
658 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
659                            ds_line_detail *d) {
660   int type, len, sid;
661   char *final_buff;
662   uLong final_len, actual_final_len;
663   char *token;
664   char raddr_blank[1] = "";
665   const char *raddr;
666
667   type = d->data[0];
668   raddr = r ? r : raddr_blank;
669
670   /* Parse the log line, but only if we haven't already */
671   if(!d->nparams) {
672     char *scp, *ecp;
673
674     scp = d->data;
675 #define PROCESS_NEXT_FIELD(t,l) do { \
676   if(!*scp) goto bad_row; \
677   ecp = strchr(scp, '\t'); \
678   if(!ecp) goto bad_row; \
679   token = scp; \
680   len = (ecp-scp); \
681   scp = ecp + 1; \
682 } while(0)
683 #define PROCESS_LAST_FIELD(t,l) do { \
684   if(!*scp) ecp = scp; \
685   else { \
686     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
687     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
688   } \
689   t = scp; \
690   l = (ecp-scp); \
691 } while(0)
692
693     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
694     switch(type) {
695       /* See noit_check_log.c for log description */
696       case 'n':
697         DECLARE_PARAM_STR(raddr, strlen(raddr));
698         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
699         DECLARE_PARAM_STR("noitd",5); /* node_type */
700         PROCESS_NEXT_FIELD(token,len);
701         d->whence = (time_t)strtoul(token, NULL, 10);
702         DECLARE_PARAM_STR(token,len); /* timestamp */
703
704         /* This is the expected uncompressed len */
705         PROCESS_NEXT_FIELD(token,len);
706         final_len = atoi(token);
707         final_buff = malloc(final_len);
708         if(!final_buff) goto bad_row;
709  
710         /* The last token is b64 endoded and compressed.
711          * we need to decode it, declare it and then free it.
712          */
713         PROCESS_LAST_FIELD(token, len);
714         /* We can in-place decode this */
715         len = noit_b64_decode((char *)token, len,
716                               (unsigned char *)token, len);
717         if(len <= 0) {
718           noitL(noit_error, "noitd config base64 decoding error.\n");
719           free(final_buff);
720           goto bad_row;
721         }
722         actual_final_len = final_len;
723         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
724                               (unsigned char *)token, len)) {
725           noitL(noit_error, "noitd config decompression failure.\n");
726           free(final_buff);
727           goto bad_row;
728         }
729         if(final_len != actual_final_len) {
730           noitL(noit_error, "noitd config decompression error.\n");
731           free(final_buff);
732           goto bad_row;
733         }
734         DECLARE_PARAM_STR(final_buff, final_len);
735         free(final_buff);
736         break;
737       case 'C':
738         DECLARE_PARAM_STR(raddr, strlen(raddr));
739         PROCESS_NEXT_FIELD(token,len);
740         DECLARE_PARAM_STR(token,len); /* timestamp */
741         d->whence = (time_t)strtoul(token, NULL, 10);
742         PROCESS_NEXT_FIELD(token, len);
743         sid = uuid_to_sid(token, remote_cn);
744         if(sid == 0) goto bad_row;
745         DECLARE_PARAM_INT(sid); /* sid */
746         DECLARE_PARAM_STR(token,len); /* uuid */
747         PROCESS_NEXT_FIELD(token, len);
748         DECLARE_PARAM_STR(token,len); /* target */
749         PROCESS_NEXT_FIELD(token, len);
750         DECLARE_PARAM_STR(token,len); /* module */
751         PROCESS_LAST_FIELD(token, len);
752         DECLARE_PARAM_STR(token,len); /* name */
753         break;
754       case 'M':
755         PROCESS_NEXT_FIELD(token,len);
756         DECLARE_PARAM_STR(token,len); /* timestamp */
757         d->whence = (time_t)strtoul(token, NULL, 10);
758         PROCESS_NEXT_FIELD(token, len);
759         sid = uuid_to_sid(token, remote_cn);
760         if(sid == 0) goto bad_row;
761         DECLARE_PARAM_INT(sid); /* sid */
762         PROCESS_NEXT_FIELD(token, len);
763         DECLARE_PARAM_STR(token,len); /* name */
764         PROCESS_NEXT_FIELD(token,len);
765         d->metric_type = *token;
766         PROCESS_LAST_FIELD(token,len);
767         DECLARE_PARAM_STR(token,len); /* value */
768         break;
769       case 'S':
770         PROCESS_NEXT_FIELD(token,len);
771         DECLARE_PARAM_STR(token,len); /* timestamp */
772         d->whence = (time_t)strtoul(token, NULL, 10);
773         PROCESS_NEXT_FIELD(token, len);
774         sid = uuid_to_sid(token, remote_cn);
775         if(sid == 0) goto bad_row;
776         DECLARE_PARAM_INT(sid); /* sid */
777         PROCESS_NEXT_FIELD(token, len);
778         DECLARE_PARAM_STR(token,len); /* state */
779         PROCESS_NEXT_FIELD(token, len);
780         DECLARE_PARAM_STR(token,len); /* availability */
781         PROCESS_NEXT_FIELD(token, len);
782         DECLARE_PARAM_STR(token,len); /* duration */
783         PROCESS_LAST_FIELD(token,len);
784         DECLARE_PARAM_STR(token,len); /* status */
785         break;
786       default:
787         goto bad_row;
788     }
789
790   }
791
792   /* Now execute the query */
793   switch(type) {
794     case 'n':
795       GET_QUERY(config_insert);
796       PG_EXEC(config_insert);
797       PQclear(d->res);
798       break;
799     case 'C':
800       GET_QUERY(check_insert);
801       PG_TM_EXEC(check_insert, d->whence);
802       PQclear(d->res);
803       break;
804     case 'S':
805       GET_QUERY(status_insert);
806       PG_TM_EXEC(status_insert, d->whence);
807       PQclear(d->res);
808       break;
809     case 'M':
810       switch(d->metric_type) {
811         case METRIC_INT32:
812         case METRIC_UINT32:
813         case METRIC_INT64:
814         case METRIC_UINT64:
815         case METRIC_DOUBLE:
816           GET_QUERY(metric_insert_numeric);
817           PG_TM_EXEC(metric_insert_numeric, d->whence);
818           PQclear(d->res);
819           break;
820         case METRIC_STRING:
821           GET_QUERY(metric_insert_text);
822           PG_TM_EXEC(metric_insert_text, d->whence);
823           PQclear(d->res);
824           break;
825         default:
826           goto bad_row;
827       }
828       break;
829     default:
830       /* should never get here */
831       goto bad_row;
832   }
833   return DS_EXEC_SUCCESS;
834  bad_row:
835   return DS_EXEC_ROW_FAILED;
836 }
837 static int
838 stratcon_database_post_connect(conn_q *cq) {
839   int rv = 0;
840   ds_single_detail _d = { 0 }, *d = &_d;
841   if(cq->fqdn) {
842     char *remote_str, *remote_cn;
843     /* This is the silly way we get null's in through our declare_param_str */
844     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
845     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
846     /* This is a storage node, it gets the storage node post_connect */
847     GET_QUERY(storage_post_connect);
848     rv = -1; /* now we're serious */
849     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
850     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
851     PG_EXEC(storage_post_connect);
852     PQclear(d->res);
853     rv = 0;
854   }
855   else {
856     /* Metanode post_connect */
857     GET_QUERY(metanode_post_connect);
858     rv = -1; /* now we're serious */
859     PG_EXEC(metanode_post_connect);
860     PQclear(d->res);
861     rv = 0;
862   }
863  bad_row:
864   free_params(d);
865   if(rv == -1) {
866     /* Post-connect intentions are serious and fatal */
867     PQfinish(cq->dbh);
868     cq->dbh = NULL;
869   }
870   return rv;
871 }
872 static int
873 stratcon_database_connect(conn_q *cq) {
874   char *dsn, dsn_meta[512];
875   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
876   const char *k, *v;
877   int klen;
878   noit_hash_table *t;
879
880   dsn_meta[0] = '\0';
881   if(!cq->dsn) {
882     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
883     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
884       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
885       strlcat(dsn_meta, k, sizeof(dsn_meta));
886       strlcat(dsn_meta, "=", sizeof(dsn_meta));
887       strlcat(dsn_meta, v, sizeof(dsn_meta));
888     }
889     noit_hash_destroy(t, free, free);
890     free(t);
891     dsn = dsn_meta;
892   }
893   else {
894     char options[32];
895     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
896     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
897                                options, sizeof(options))) {
898       strlcat(dsn_meta, " ", sizeof(dsn_meta));
899       strlcat(dsn_meta, "user", sizeof(dsn_meta));
900       strlcat(dsn_meta, "=", sizeof(dsn_meta));
901       strlcat(dsn_meta, options, sizeof(dsn_meta));
902     }
903     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
904                                options, sizeof(options))) {
905       strlcat(dsn_meta, " ", sizeof(dsn_meta));
906       strlcat(dsn_meta, "password", sizeof(dsn_meta));
907       strlcat(dsn_meta, "=", sizeof(dsn_meta));
908       strlcat(dsn_meta, options, sizeof(dsn_meta));
909     }
910     dsn = dsn_meta;
911   }
912
913   if(cq->dbh) {
914     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
915     PQreset(cq->dbh);
916     if(PQstatus(cq->dbh) != CONNECTION_OK) {
917       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
918             dsn, PQerrorMessage(cq->dbh));
919       return -1;
920     }
921     if(stratcon_database_post_connect(cq)) return -1;
922     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
923     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
924           dsn, PQerrorMessage(cq->dbh));
925     return -1;
926   }
927
928   cq->dbh = PQconnectdb(dsn);
929   if(!cq->dbh) return -1;
930   if(PQstatus(cq->dbh) != CONNECTION_OK) {
931     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
932           dsn, PQerrorMessage(cq->dbh));
933     return -1;
934   }
935   if(stratcon_database_post_connect(cq)) return -1;
936   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
937   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
938         dsn, PQerrorMessage(cq->dbh));
939   return -1;
940 }
941 static int
942 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
943                                 const char *name) {
944   int rv = -1;
945   PGresult *res;
946   char cmd[128];
947   strlcpy(cmd, p, sizeof(cmd));
948   strlcat(cmd, name, sizeof(cmd));
949   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
950   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
951   PQclear(res);
952   return rv;
953 }
954 static int
955 stratcon_datastore_do(conn_q *cq, const char *cmd) {
956   PGresult *res;
957   int rv = -1;
958   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
959   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
960   PQclear(res);
961   return rv;
962 }
963 #define BUSTED(cq) do { \
964   PQfinish((cq)->dbh); \
965   (cq)->dbh = NULL; \
966   goto full_monty; \
967 } while(0)
968 #define SAVEPOINT(name) do { \
969   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
970   last_sp = current; \
971 } while(0)
972 #define ROLLBACK_TO_SAVEPOINT(name) do { \
973   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
974     BUSTED(cq); \
975   last_sp = NULL; \
976 } while(0)
977 #define RELEASE_SAVEPOINT(name) do { \
978   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
979     BUSTED(cq); \
980   last_sp = NULL; \
981 } while(0)
982 int
983 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
984                                  struct timeval *now) {
985   ds_rt_detail *dsjd = closure;
986   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
987   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
988
989   assert(dsjd->rt);
990   stratcon_datastore_find(dsjd);
991   if(dsjd->completion_event)
992     eventer_add(dsjd->completion_event);
993
994   free_params((ds_single_detail *)dsjd);
995   free(dsjd);
996   return 0;
997 }
998 static const char *
999 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
1000   void *vinfo;
1001   const char *dsn = NULL, *fqdn = NULL;
1002   int found = 0;
1003   storagenode_info *info = NULL;
1004   pthread_mutex_lock(&storagenode_to_info_cache_lock);
1005   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
1006                         &vinfo)) {
1007     found = 1;
1008     info = vinfo;
1009   }
1010   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1011   if(found) {
1012     if(fqdn_out) *fqdn_out = info->fqdn;
1013     return info->dsn;
1014   }
1015
1016   if(!found && can_use_db) {
1017     ds_single_detail *d;
1018     conn_q *cq;
1019     int row_count;
1020     /* Look it up and store it */
1021     d = calloc(1, sizeof(*d));
1022     cq = get_conn_q_for_metanode();
1023     GET_QUERY(find_storage);
1024     DECLARE_PARAM_INT(id);
1025     PG_EXEC(find_storage);
1026     row_count = PQntuples(d->res);
1027     if(row_count) {
1028       PG_GET_STR_COL(dsn, 0, "dsn");
1029       PG_GET_STR_COL(fqdn, 0, "fqdn");
1030     }
1031     PQclear(d->res);
1032    bad_row:
1033     free_params(d);
1034     free(d);
1035     release_conn_q(cq);
1036   }
1037   if(fqdn) {
1038     info = calloc(1, sizeof(*info));
1039     info->fqdn = strdup(fqdn);
1040     if(fqdn_out) *fqdn_out = info->fqdn;
1041     info->dsn = dsn ? strdup(dsn) : NULL;
1042     info->storagenode_id = id;
1043     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1044     noit_hash_store(&storagenode_to_info_cache,
1045                     (void *)&info->storagenode_id, sizeof(int), info);
1046     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1047   }
1048   return info ? info->dsn : NULL;
1049 }
1050 static ds_line_detail *
1051 build_insert_batch(interim_journal_t *ij) {
1052   int rv;
1053   off_t len;
1054   const char *buff, *cp, *lcp;
1055   struct stat st;
1056   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1057
1058   if(ij->fd < 0) {
1059     ij->fd = open(ij->filename, O_RDONLY);
1060     if(ij->fd < 0) {
1061       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1062             ij->filename, strerror(errno));
1063       assert(ij->fd >= 0);
1064     }
1065   }
1066   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1067   if(rv == -1) {
1068       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1069             ij->filename, strerror(errno));
1070     assert(rv != -1);
1071   }
1072   len = st.st_size;
1073   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1074   if(buff == (void *)-1) {
1075     noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1076           ij->filename, strerror(errno));
1077     assert(buff != (void *)-1);
1078   }
1079   lcp = buff;
1080   while(lcp < (buff + len) &&
1081         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1082     next = calloc(1, sizeof(*next));
1083     next->data = malloc(cp - lcp + 1);
1084     memcpy(next->data, lcp, cp - lcp);
1085     next->data[cp - lcp] = '\0';
1086     if(!head) head = next;
1087     if(last) last->next = next;
1088     last = next;
1089     lcp = cp + 1;
1090   }
1091   munmap((void *)buff, len);
1092   close(ij->fd);
1093   return head;
1094 }
1095 static void
1096 interim_journal_remove(interim_journal_t *ij) {
1097   unlink(ij->filename);
1098   if(ij->filename) free(ij->filename);
1099   if(ij->remote_str) free(ij->remote_str);
1100   if(ij->remote_cn) free(ij->remote_cn);
1101   if(ij->fqdn) free(ij->fqdn);
1102 }
1103 int
1104 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1105                                   struct timeval *now) {
1106   int i, total, success, sp_total, sp_success;
1107   interim_journal_t *ij;
1108   ds_line_detail *head = NULL, *current, *last_sp;
1109   const char *dsn;
1110   conn_q *cq;
1111   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1112   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1113
1114   ij = closure;
1115   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1116   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1117   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1118                              ij->fqdn, dsn);
1119   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1120         ij->remote_str, ij->remote_cn, ij->fqdn);
1121  full_monty:
1122   /* Make sure we have a connection */
1123   i = 1;
1124   while(stratcon_database_connect(cq)) {
1125     noitL(noit_error, "Error connecting to database: %s\n",
1126           ij->fqdn ? ij->fqdn : "(null)");
1127     sleep(i);
1128     i *= 2;
1129     i = MIN(i, 16);
1130   }
1131
1132   if(head == NULL) head = build_insert_batch(ij);
1133   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1134         ij->remote_str ? ij->remote_str : "(null)",
1135         ij->remote_cn ? ij->remote_cn : "(null)",
1136         ij->fqdn ? ij->fqdn : "(null)");
1137   current = head;
1138   last_sp = NULL;
1139   total = success = sp_total = sp_success = 0;
1140   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1141   while(current) {
1142     execute_outcome_t rv;
1143     if(current->data) {
1144       if(!last_sp) {
1145         SAVEPOINT("batch");
1146         sp_success = success;
1147         sp_total = total;
1148       }
1149  
1150       if(current->problematic) {
1151         RELEASE_SAVEPOINT("batch");
1152         current = current->next;
1153         total++;
1154         continue;
1155       }
1156       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1157                                       current);
1158       switch(rv) {
1159         case DS_EXEC_SUCCESS:
1160           total++;
1161           success++;
1162           current = current->next;
1163           break;
1164         case DS_EXEC_ROW_FAILED:
1165           /* rollback to savepoint, mark this record as bad and start again */
1166           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1167           current->problematic = 1;
1168           current = last_sp;
1169           success = sp_success;
1170           total = sp_total;
1171           ROLLBACK_TO_SAVEPOINT("batch");
1172           break;
1173         case DS_EXEC_TXN_FAILED:
1174           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1175           BUSTED(cq);
1176       }
1177     }
1178   }
1179   if(last_sp) RELEASE_SAVEPOINT("batch");
1180   if(stratcon_datastore_do(cq, "COMMIT")) {
1181     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1182     BUSTED(cq);
1183   }
1184   /* Cleanup the mess */
1185   while(head) {
1186     ds_line_detail *tofree;
1187     tofree = head;
1188     head = head->next;
1189     if(tofree->data) free(tofree->data);
1190     free_params((ds_single_detail *)tofree);
1191     free(tofree);
1192   }
1193   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1194         ij->remote_str ? ij->remote_str : "(null)",
1195         ij->remote_cn ? ij->remote_cn : "(null)",
1196         ij->fqdn ? ij->fqdn : "(null)", success, total);
1197   interim_journal_remove(ij);
1198   release_conn_q(cq);
1199   return 0;
1200 }
1201 static int
1202 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1203                                 struct timeval *now) {
1204   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1205   const char *k;
1206   int klen;
1207   void *vij;
1208   interim_journal_t *ij;
1209   syncset_t *syncset = closure;
1210
1211   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1212     eventer_add(syncset->completion);
1213     eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1214     free(syncset);
1215     return 0;
1216   }
1217   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1218
1219   noitL(ds_deb, "Syncing journal sets...\n");
1220   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1221     eventer_t ingest;
1222     ij = vij;
1223     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1224           ij->remote_str, ij->remote_cn, ij->fqdn);
1225     fsync(ij->fd);
1226     close(ij->fd);
1227     ij->fd = -1;
1228     ingest = eventer_alloc();
1229     ingest->mask = EVENTER_ASYNCH;
1230     ingest->callback = stratcon_datastore_asynch_execute;
1231     ingest->closure = ij;
1232     eventer_add_asynch(ij->cpool->jobq, ingest);
1233   }
1234   noit_hash_destroy(syncset->ws, free, NULL);
1235   free(syncset->ws);
1236   return 0;
1237 }
1238 static interim_journal_t *
1239 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1240                     int storagenode_id, const char *fqdn_in) {
1241   void *vhash, *vij;
1242   noit_hash_table *working_set;
1243   interim_journal_t *ij;
1244   struct timeval now;
1245   char jpath[PATH_MAX];
1246   char remote_str[128];
1247   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1248   const char *fqdn = fqdn_in ? fqdn_in : "default";
1249
1250   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1251   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1252
1253   /* Lookup the working set */
1254   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1255     working_set = calloc(1, sizeof(*working_set));
1256     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1257                     working_set);
1258   }
1259   else
1260     working_set = vhash;
1261
1262   /* Lookup the interim journal within the working set */
1263   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1264     ij = calloc(1, sizeof(*ij));
1265     gettimeofday(&now, NULL);
1266     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1267              basejpath, remote_str, remote_cn, storagenode_id,
1268              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1269     ij->remote_str = strdup(remote_str);
1270     ij->remote_cn = strdup(remote_cn);
1271     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1272     ij->storagenode_id = storagenode_id;
1273     ij->filename = strdup(jpath);
1274     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1275                                          ij->fqdn);
1276     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1277     if(ij->fd < 0 && errno == ENOENT) {
1278       if(mkdir_for_file(ij->filename, 0750)) {
1279         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1280               ij->filename, strerror(errno));
1281         exit(-1);
1282       }
1283       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1284     }
1285     if(ij->fd < 0) {
1286       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1287             ij->filename, strerror(errno));
1288       exit(-1);
1289     }
1290     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1291   }
1292   else
1293     ij = vij;
1294
1295   return ij;
1296 }
1297 static int
1298 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1299                           int *sid_out, int *storagenode_id_out,
1300                           const char **remote_cn_out,
1301                           const char **fqdn_out, const char **dsn_out) {
1302   /* only called from the main thread -- no safety issues */
1303   void *vuuidinfo, *vinfo;
1304   uuid_info *uuidinfo;
1305   storagenode_info *info = NULL;
1306   char *fqdn = NULL;
1307   char *dsn = NULL;
1308   char *new_remote_cn = NULL;
1309   int storagenode_id = 0, sid = 0;
1310   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1311                          &vuuidinfo)) {
1312     int row_count = 0;
1313     char *tmpint;
1314     ds_single_detail *d;
1315     conn_q *cq;
1316
1317     /* We can't do a database lookup without the remote_cn */
1318     if(!remote_cn) {
1319       if(stratcon_datastore_get_enabled()) {
1320         /* We have an authoritatively maintained cache, we don't do lookups */
1321         return -1;
1322       }
1323       else
1324         remote_cn = "[[null]]";
1325     }
1326
1327     d = calloc(1, sizeof(*d));
1328     cq = get_conn_q_for_metanode();
1329     if(stratcon_database_connect(cq) == 0) {
1330       /* Blocking call to service the cache miss */
1331       GET_QUERY(check_map);
1332       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1333       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1334       PG_EXEC(check_map);
1335       row_count = PQntuples(d->res);
1336       if(row_count != 1) {
1337         PQclear(d->res);
1338         goto bad_row;
1339       }
1340       PG_GET_STR_COL(tmpint, 0, "sid");
1341       if(!tmpint) {
1342         row_count = 0;
1343         PQclear(d->res);
1344         goto bad_row;
1345       }
1346       sid = atoi(tmpint);
1347       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1348       if(tmpint) storagenode_id = atoi(tmpint);
1349       PG_GET_STR_COL(fqdn, 0, "fqdn");
1350       PG_GET_STR_COL(dsn, 0, "dsn");
1351       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1352       PQclear(d->res);
1353     }
1354    bad_row:
1355     free_params((ds_single_detail *)d);
1356     free(d);
1357     release_conn_q(cq);
1358     if(row_count != 1) {
1359       return -1;
1360     }
1361     /* Place in cache */
1362     if(fqdn) fqdn = strdup(fqdn);
1363     uuidinfo = calloc(1, sizeof(*uuidinfo));
1364     uuidinfo->sid = sid;
1365     uuidinfo->uuid_str = strdup(uuid_str);
1366     uuidinfo->storagenode_id = storagenode_id;
1367     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1368     noit_hash_store(&uuid_to_info_cache,
1369                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1370     /* Also, we may have just witnessed a new storage node, store it */
1371     if(storagenode_id) {
1372       int needs_free = 0;
1373       info = calloc(1, sizeof(*info));
1374       info->storagenode_id = storagenode_id;
1375       info->dsn = dsn ? strdup(dsn) : NULL;
1376       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1377       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1378       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1379                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1380         /* hack to save memory -- we *never* remove from these caches,
1381            so we can use the same fqdn value in the above cache for the key
1382            in the cache below -- (no strdup) */
1383         noit_hash_store(&storagenode_to_info_cache,
1384                         (void *)&info->storagenode_id, sizeof(int), info);
1385       }
1386       else needs_free = 1;
1387       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1388       if(needs_free) {
1389         if(info->dsn) free(info->dsn);
1390         if(info->fqdn) free(info->fqdn);
1391         free(info);
1392       }
1393     }
1394   }
1395   else
1396     uuidinfo = vuuidinfo;
1397
1398   if(uuidinfo && uuidinfo->storagenode_id) {
1399     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1400       /* we don't have dsn and we actually want it */
1401       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1402       if(noit_hash_retrieve(&storagenode_to_info_cache,
1403                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1404                             &vinfo))
1405         info = vinfo;
1406       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1407     }
1408   }
1409
1410   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1411   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1412   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1413   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1414   if(sid_out) *sid_out = uuidinfo->sid;
1415   return 0;
1416 }
1417 static int
1418 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1419   char uuid_str[UUID_STR_LEN+1];
1420   int sid = 0;
1421   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1422   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1423   return sid;
1424 }
1425 static void
1426 stratcon_datastore_journal(struct sockaddr *remote,
1427                            const char *remote_cn, const char *line) {
1428   interim_journal_t *ij = NULL;
1429   char uuid_str[UUID_STR_LEN+1], *cp;
1430   const char *fqdn = NULL, *dsn = NULL;
1431   int storagenode_id = 0;
1432   uuid_t checkid;
1433   if(!line) return;
1434   /* if it is a UUID based thing, find the storage node */
1435   switch(*line) {
1436     case 'C':
1437     case 'S':
1438     case 'M':
1439       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1440         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1441         if(!uuid_parse(uuid_str, checkid)) {
1442           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1443                                     &storagenode_id, NULL, &fqdn, &dsn);
1444           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1445         }
1446       }
1447       break;
1448     case 'n':
1449       ij = interim_journal_get(remote,remote_cn,0,NULL);
1450       break;
1451     default:
1452       break;
1453   }
1454   if(!ij) {
1455     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1456   }
1457   else {
1458     int len;
1459     len = write(ij->fd, line, strlen(line));
1460     if(len < 0) {
1461       noitL(noit_error, "write to %s failed: %s\n",
1462             ij->filename, strerror(errno));
1463     }
1464   }
1465   return;
1466 }
1467 static noit_hash_table *
1468 stratcon_datastore_journal_remove(struct sockaddr *remote,
1469                                   const char *remote_cn) {
1470   void *vhash = NULL;
1471   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1472     /* pluck it out */
1473     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1474   }
1475   else {
1476     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1477           remote_cn);
1478     abort();
1479   }
1480   return vhash;
1481 }
1482 void
1483 stratcon_datastore_push(stratcon_datastore_op_t op,
1484                         struct sockaddr *remote,
1485                         const char *remote_cn, void *operand,
1486                         eventer_t completion) {
1487   conn_pool *cpool;
1488   syncset_t *syncset;
1489   eventer_t e;
1490   ds_rt_detail *rtdetail;
1491   struct datastore_onlooker_list *nnode;
1492
1493   for(nnode = onlookers; nnode; nnode = nnode->next)
1494     nnode->dispatch(op,remote,remote_cn,operand);
1495
1496   switch(op) {
1497     case DS_OP_INSERT:
1498       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1499       break;
1500     case DS_OP_CHKPT:
1501       e = eventer_alloc();
1502       syncset = calloc(1, sizeof(*syncset));
1503       e->mask = EVENTER_ASYNCH;
1504       e->callback = stratcon_datastore_journal_sync;
1505       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1506       syncset->completion = completion;
1507       e->closure = syncset;
1508       eventer_add(e);
1509       break;
1510     case DS_OP_FIND_COMPLETE:
1511       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1512       rtdetail = calloc(1, sizeof(*rtdetail));
1513       rtdetail->rt = operand;
1514       rtdetail->completion_event = completion;
1515       e = eventer_alloc();
1516       e->mask = EVENTER_ASYNCH;
1517       e->callback = stratcon_datastore_asynch_lookup;
1518       e->closure = rtdetail;
1519       eventer_add_asynch(cpool->jobq, e);
1520       break;
1521   }
1522 }
1523
1524 int
1525 stratcon_datastore_saveconfig(void *unused) {
1526   int rv = -1;
1527   char *buff;
1528   ds_single_detail _d = { 0 }, *d = &_d;
1529   conn_q *cq;
1530   char ipv4_str[32];
1531   struct in_addr r, l;
1532
1533   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1534   memset(&l, 0, sizeof(l));
1535   noit_getip_ipv4(r, &l);
1536   /* Ignore the error.. what are we going to do anyway */
1537   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1538     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1539
1540   cq = get_conn_q_for_metanode();
1541
1542   if(stratcon_database_connect(cq) == 0) {
1543     char time_as_str[20];
1544     size_t len;
1545     buff = noit_conf_xml_in_mem(&len);
1546     if(!buff) goto bad_row;
1547
1548     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1549     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1550     DECLARE_PARAM_STR("", 0);
1551     DECLARE_PARAM_STR("stratcond", 9);
1552     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1553     DECLARE_PARAM_STR(buff, len);
1554     free(buff);
1555
1556     GET_QUERY(config_insert);
1557     PG_EXEC(config_insert);
1558     PQclear(d->res);
1559     rv = 0;
1560
1561     bad_row:
1562       free_params(d);
1563   }
1564   release_conn_q(cq);
1565   return rv;
1566 }
1567
1568 void
1569 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1570                                                struct sockaddr *,
1571                                                const char *, void *)) {
1572   struct datastore_onlooker_list *nnode;
1573   nnode = calloc(1, sizeof(*nnode));
1574   nnode->dispatch = f;
1575   nnode->next = onlookers;
1576   while(noit_atomic_casptr((volatile void **)&onlookers,
1577                            nnode, nnode->next) != (void *)nnode->next)
1578     nnode->next = onlookers;
1579 }
1580 static void
1581 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1582                                          char *id_str, char *file) {
1583   char path[PATH_MAX];
1584   interim_journal_t *ij;
1585   eventer_t ingest;
1586
1587   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1588            basejpath, remote_str, remote_cn, id_str, file);
1589   ij = calloc(1, sizeof(*ij));
1590   ij->fd = open(path, O_RDONLY);
1591   if(ij->fd < 0) {
1592     noitL(noit_error, "cannot open journal '%s': %s\n",
1593           path, strerror(errno));
1594     free(ij);
1595     return;
1596   }
1597   close(ij->fd);
1598   ij->fd = -1;
1599   ij->filename = strdup(path);
1600   ij->remote_str = strdup(remote_str);
1601   ij->remote_cn = strdup(remote_cn);
1602   ij->storagenode_id = atoi(id_str);
1603   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1604                                        ij->fqdn);
1605   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1606   ingest = eventer_alloc();
1607   ingest->mask = EVENTER_ASYNCH;
1608   ingest->callback = stratcon_datastore_asynch_execute;
1609   ingest->closure = ij;
1610   eventer_add_asynch(ij->cpool->jobq, ingest);
1611 }
1612 static void
1613 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1614   char path[PATH_MAX];
1615   DIR *root;
1616   struct dirent *de, *entry;
1617   int i = 0, cnt = 0;
1618   char **entries;
1619   int size = 0;
1620
1621   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1622            first ? "/" : "", first ? first : "",
1623            second ? "/" : "", second ? second : "",
1624            third ? "/" : "", third ? third : "");
1625 #ifdef _PC_NAME_MAX
1626   size = pathconf(path, _PC_NAME_MAX);
1627 #endif
1628   size = MIN(size, PATH_MAX + 128);
1629   de = alloca(size);
1630   root = opendir(path);
1631   if(!root) return;
1632   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) cnt++;
1633   closedir(root);
1634   root = opendir(path);
1635   if(!root) return;
1636   entries = malloc(sizeof(*entries) * cnt);
1637   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) {
1638     if(i < cnt) {
1639       entries[i++] = strdup(entry->d_name);
1640     }
1641   }
1642   closedir(root);
1643   cnt = i; /* could have changed, directories are fickle */
1644   qsort(entries, i, sizeof(*entries),
1645         (int (*)(const void *, const void *))strcasecmp);
1646   for(i=0; i<cnt; i++) {
1647     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1648     noitL(ds_deb, "Processing L%d entry '%s'\n",
1649           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1650     if(!first)
1651       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1652     else if(!second)
1653       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1654     else if(!third)
1655       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1656     else if(strlen(entries[i]) == 16)
1657       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1658   }
1659 }
1660 static void
1661 stratcon_datastore_sweep_journals() {
1662   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1663 }
1664
1665 int
1666 stratcon_datastore_ingest_all_storagenode_info() {
1667   int i, cnt = 0;
1668   ds_single_detail _d = { 0 }, *d = &_d;
1669   conn_q *cq;
1670   cq = get_conn_q_for_metanode();
1671
1672   while(stratcon_database_connect(cq)) {
1673     noitL(noit_error, "Error connecting to database\n");
1674     sleep(1);
1675   }
1676
1677   GET_QUERY(all_storage);
1678   PG_EXEC(all_storage);
1679   cnt = PQntuples(d->res);
1680   for(i=0; i<cnt; i++) {
1681     void *vinfo;
1682     char *tmpint, *fqdn, *dsn;
1683     int storagenode_id;
1684     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1685     storagenode_id = atoi(tmpint);
1686     PG_GET_STR_COL(fqdn, i, "fqdn");
1687     PG_GET_STR_COL(dsn, i, "dsn");
1688     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1689     storagenode_id = tmpint ? atoi(tmpint) : 0;
1690
1691     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1692                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1693       storagenode_info *info;
1694       info = calloc(1, sizeof(*info));
1695       info->storagenode_id = storagenode_id;
1696       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1697       info->dsn = dsn ? strdup(dsn) : NULL;
1698       noit_hash_store(&storagenode_to_info_cache,
1699                       (void *)&info->storagenode_id, sizeof(int), info);
1700       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1701             info->storagenode_id,
1702             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1703     }
1704   }
1705   PQclear(d->res);
1706  bad_row:
1707   free_params(d);
1708
1709   release_conn_q(cq);
1710   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1711   return cnt;
1712 }
1713 int
1714 stratcon_datastore_ingest_all_check_info() {
1715   int i, cnt, loaded = 0;
1716   ds_single_detail _d = { 0 }, *d = &_d;
1717   conn_q *cq;
1718   cq = get_conn_q_for_metanode();
1719
1720   while(stratcon_database_connect(cq)) {
1721     noitL(noit_error, "Error connecting to database\n");
1722     sleep(1);
1723   }
1724
1725   GET_QUERY(check_mapall);
1726   PG_EXEC(check_mapall);
1727   cnt = PQntuples(d->res);
1728   for(i=0; i<cnt; i++) {
1729     void *vinfo;
1730     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1731     int sid, storagenode_id;
1732     uuid_info *uuidinfo;
1733     PG_GET_STR_COL(uuid_str, i, "id");
1734     if(!uuid_str) continue;
1735     PG_GET_STR_COL(tmpint, i, "sid");
1736     if(!tmpint) continue;
1737     sid = atoi(tmpint);
1738     PG_GET_STR_COL(fqdn, i, "fqdn");
1739     PG_GET_STR_COL(dsn, i, "dsn");
1740     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1741     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1742     storagenode_id = tmpint ? atoi(tmpint) : 0;
1743
1744     uuidinfo = calloc(1, sizeof(*uuidinfo));
1745     uuidinfo->uuid_str = strdup(uuid_str);
1746     uuidinfo->remote_cn = strdup(remote_cn);
1747     uuidinfo->storagenode_id = storagenode_id;
1748     uuidinfo->sid = sid;
1749     noit_hash_store(&uuid_to_info_cache,
1750                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1751     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1752           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1753     loaded++;
1754     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1755                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1756       storagenode_info *info;
1757       info = calloc(1, sizeof(*info));
1758       info->storagenode_id = storagenode_id;
1759       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1760       info->dsn = dsn ? strdup(dsn) : NULL;
1761       noit_hash_store(&storagenode_to_info_cache,
1762                       (void *)&info->storagenode_id, sizeof(int), info);
1763       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1764             info->storagenode_id,
1765             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1766     }
1767   }
1768   PQclear(d->res);
1769  bad_row:
1770   free_params(d);
1771
1772   release_conn_q(cq);
1773   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1774   return loaded;
1775 }
1776
1777 static int
1778 rest_get_noit_config(noit_http_rest_closure_t *restc,
1779                      int npats, char **pats) {
1780   noit_http_session_ctx *ctx = restc->http_ctx;
1781   ds_single_detail *d;
1782   int row_count = 0;
1783   const char *xml = NULL;
1784   conn_q *cq = NULL;
1785
1786   if(npats != 0) {
1787     noit_http_response_server_error(ctx, "text/xml");
1788     noit_http_response_end(ctx);
1789     return 0;
1790   }
1791   d = calloc(1, sizeof(*d));
1792   GET_QUERY(config_get);
1793   cq = get_conn_q_for_metanode();
1794   if(!cq) {
1795     noit_http_response_server_error(ctx, "text/xml");
1796     goto bad_row;
1797   }
1798
1799   DECLARE_PARAM_STR(restc->remote_cn,
1800                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1801   PG_EXEC(config_get);
1802   row_count = PQntuples(d->res);
1803   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1804
1805   if(xml == NULL) {
1806     char buff[1024];
1807     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1808                                  "<row_count>%d</row_count></error>\n",
1809              restc->remote_cn, row_count);
1810     noit_http_response_append(ctx, buff, strlen(buff));
1811     noit_http_response_not_found(ctx, "text/xml");
1812   }
1813   else {
1814     noit_http_response_append(ctx, xml, strlen(xml));
1815     noit_http_response_ok(ctx, "text/xml");
1816   }
1817  bad_row:
1818   free_params((ds_single_detail *)d);
1819   d->nparams = 0;
1820   if(cq) release_conn_q(cq);
1821
1822   noit_http_response_end(ctx);
1823   return 0;
1824 }
1825
1826 void
1827 stratcon_datastore_init() {
1828   pthread_mutex_init(&ds_conns_lock, NULL);
1829   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1830   ds_err = noit_log_stream_find("error/datastore");
1831   ds_deb = noit_log_stream_find("debug/datastore");
1832   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1833   ingest_err = noit_log_stream_find("error/ingest");
1834   if(!ds_err) ds_err = noit_error;
1835   if(!ingest_err) ingest_err = noit_error;
1836   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1837                            &basejpath)) {
1838     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1839     exit(-1);
1840   }
1841   stratcon_datastore_ingest_all_check_info();
1842   stratcon_datastore_ingest_all_storagenode_info();
1843   stratcon_datastore_sweep_journals();
1844
1845   assert(noit_http_rest_register_auth(
1846     "GET", "/noits/", "^config$", rest_get_noit_config,
1847              noit_http_rest_client_cert_auth
1848   ) == 0);
1849 }
1850
Note: See TracBrowser for help on using the browser.