root/src/stratcon_datastore.c

Revision e20fb1d8dba51180495faf50e6b01af016c27bbb, 57.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #284

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 /* Thread-safe connection pools */
209
210 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
211 static void
212 release_conn_q_forceable(conn_q *cq, int forcefree) {
213   int putback = 0;
214   cq->last_use = time(NULL);
215   pthread_mutex_lock(&cq->pool->lock);
216   cq->pool->outstanding--;
217   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
218     putback = 1;
219     cq->next = cq->pool->head;
220     cq->pool->head = cq;
221     cq->pool->in_pool++;
222   }
223   pthread_mutex_unlock(&cq->pool->lock);
224   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
225         putback ? "to pool" : "and destroy", cq->pool->queue_name);
226   pthread_cond_signal(&cq->pool->cv);
227   if(putback) return;
228
229   /* Not put back, release it */
230   if(cq->dbh) PQfinish(cq->dbh);
231   if(cq->remote_str) free(cq->remote_str);
232   if(cq->remote_cn) free(cq->remote_cn);
233   if(cq->fqdn) free(cq->fqdn);
234   if(cq->dsn) free(cq->dsn);
235   free(cq);
236 }
237 static void
238 ttl_purge_conn_pool(conn_pool *pool) {
239   int old_cnt, new_cnt;
240   time_t now = time(NULL);
241   conn_q *cq, *prev = NULL, *iter;
242   /* because we always replace on the head and update the last_use time when
243      doing so, we know they are ordered LRU on the end.  So, once we hit an
244      old one, we know all the others are old too.
245    */
246   if(!pool->head) return; /* hack short circuit for no locks */
247   pthread_mutex_lock(&pool->lock);
248   old_cnt = pool->in_pool;
249   cq = pool->head;
250   while(cq) {
251     if(cq->last_use + cq->pool->ttl < now) {
252       if(prev) prev->next = NULL;
253       else pool->head = NULL;
254       break;
255     }
256     prev = cq;
257     cq = cq->next;
258   }
259   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
260   /* Fix accounting */
261   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
262   new_cnt = pool->in_pool;
263   pthread_mutex_unlock(&pool->lock);
264
265   /* Force release these without holding the lock */
266   while(cq) {
267     cq = cq->next;
268     release_conn_q_forceable(cq, 1);
269   }
270   if(old_cnt != new_cnt)
271     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
272           pool->queue_name);
273 }
274 static void
275 release_conn_q(conn_q *cq) {
276   ttl_purge_conn_pool(cq->pool);
277   release_conn_q_forceable(cq, 0);
278 }
279 static conn_pool *
280 get_conn_pool_for_remote(const char *remote_str,
281                          const char *remote_cn, const char *fqdn) {
282   void *vcpool;
283   conn_pool *cpool = NULL;
284   char queue_name[256] = "datastore_";
285   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
286            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
287            fqdn ? fqdn : "default",
288            remote_cn ? remote_cn : "default");
289   pthread_mutex_lock(&ds_conns_lock);
290   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
291                         strlen(queue_name), &vcpool))
292     cpool = vcpool;
293   pthread_mutex_unlock(&ds_conns_lock);
294   if(!cpool) {
295     vcpool = cpool = calloc(1, sizeof(*cpool));
296     cpool->queue_name = strdup(queue_name);
297     pthread_mutex_init(&cpool->lock, NULL);
298     pthread_cond_init(&cpool->cv, NULL);
299     cpool->in_pool = 0;
300     cpool->outstanding = 0;
301     cpool->max_in_pool = 1;
302     cpool->max_allocated = 1;
303     pthread_mutex_lock(&ds_conns_lock);
304     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
305                         cpool)) {
306       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
307                          strlen(queue_name), &vcpool);
308     }
309     pthread_mutex_unlock(&ds_conns_lock);
310     if(vcpool != cpool) {
311       /* someone beat us to it */
312       free(cpool->queue_name);
313       pthread_mutex_destroy(&cpool->lock);
314       pthread_cond_destroy(&cpool->cv);
315       free(cpool);
316     }
317     else {
318       int i;
319       /* Our job to setup the pool */
320       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
321       eventer_jobq_init(cpool->jobq, queue_name);
322       cpool->jobq->backq = eventer_default_backq();
323       /* Add one thread */
324       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
325         eventer_jobq_increase_concurrency(cpool->jobq);
326     }
327     cpool = vcpool;
328   }
329   return cpool;
330 }
331 static conn_q *
332 get_conn_q_for_remote(const char *remote_str,
333                       const char *remote_cn, const char *fqdn,
334                       const char *dsn) {
335   conn_pool *cpool;
336   conn_q *cq;
337   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
338   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
339         cpool->queue_name);
340   pthread_mutex_lock(&cpool->lock);
341  again:
342   if(cpool->head) {
343     assert(cpool->in_pool > 0);
344     cq = cpool->head;
345     cpool->head = cq->next;
346     cpool->in_pool--;
347     cpool->outstanding++;
348     cq->next = NULL;
349     pthread_mutex_unlock(&cpool->lock);
350     return cq;
351   }
352   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
353     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
354           (void *)pthread_self(), cpool->queue_name);
355     pthread_cond_wait(&cpool->cv, &cpool->lock);
356     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
357           (void *)pthread_self(), cpool->queue_name);
358     goto again;
359   }
360   else {
361     cpool->outstanding++;
362     pthread_mutex_unlock(&cpool->lock);
363   }
364  
365   cq = calloc(1, sizeof(*cq));
366   cq->pool = cpool;
367   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
368   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
369   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
370   cq->dsn = dsn ? strdup(dsn) : NULL;
371   return cq;
372 }
373 static conn_q *
374 get_conn_q_for_metanode() {
375   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
376 }
377
378 typedef enum {
379   DS_EXEC_SUCCESS = 0,
380   DS_EXEC_ROW_FAILED = 1,
381   DS_EXEC_TXN_FAILED = 2,
382 } execute_outcome_t;
383
384 #define DECLARE_PARAM_STR(str, len) do { \
385   d->paramValues[d->nparams] = noit__strndup(str, len); \
386   d->paramLengths[d->nparams] = len; \
387   d->paramFormats[d->nparams] = 0; \
388   d->paramAllocd[d->nparams] = 1; \
389   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
390     free(d->paramValues[d->nparams]); \
391     d->paramValues[d->nparams] = NULL; \
392     d->paramLengths[d->nparams] = 0; \
393     d->paramAllocd[d->nparams] = 0; \
394   } \
395   d->nparams++; \
396 } while(0)
397 #define DECLARE_PARAM_INT(i) do { \
398   int buffer__len; \
399   char buffer__[32]; \
400   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
401   buffer__len = strlen(buffer__); \
402   DECLARE_PARAM_STR(buffer__, buffer__len); \
403 } while(0)
404
405 #define PG_GET_STR_COL(dest, row, name) do { \
406   int colnum = PQfnumber(d->res, name); \
407   dest = NULL; \
408   if (colnum >= 0) \
409     dest = PQgetisnull(d->res, row, colnum) \
410          ? NULL : PQgetvalue(d->res, row, colnum); \
411 } while(0)
412
413 #define PG_EXEC(cmd) do { \
414   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
415                         (const char * const *)d->paramValues, \
416                         d->paramLengths, d->paramFormats, 0); \
417   d->rv = PQresultStatus(d->res); \
418   if(d->rv != PGRES_COMMAND_OK && \
419      d->rv != PGRES_TUPLES_OK) { \
420     const char *pgerr = PQresultErrorMessage(d->res); \
421     const char *pgerr_end = strchr(pgerr, '\n'); \
422     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
423     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \
424           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \
425           (int)(pgerr_end - pgerr), pgerr); \
426     PQclear(d->res); \
427     goto bad_row; \
428   } \
429 } while(0)
430
431 #define PG_TM_EXEC(cmd, whence) do { \
432   time_t __w = whence; \
433   char cmdbuf[4096]; \
434   struct tm tbuf, *tm; \
435   tm = gmtime_r(&__w, &tbuf); \
436   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
437   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
438                         (const char * const *)d->paramValues, \
439                         d->paramLengths, d->paramFormats, 0); \
440   d->rv = PQresultStatus(d->res); \
441   if(d->rv != PGRES_COMMAND_OK && \
442      d->rv != PGRES_TUPLES_OK) { \
443     const char *pgerr = PQresultErrorMessage(d->res); \
444     const char *pgerr_end = strchr(pgerr, '\n'); \
445     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
446     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \
447           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \
448           (long long unsigned)whence); \
449     PQclear(d->res); \
450     goto bad_row; \
451   } \
452 } while(0)
453
454 static void *
455 stratcon_datastore_check_loadall(void *vsn) {
456   storagenode_info *sn = vsn;
457   ds_single_detail *d;
458   int i, row_count = 0, good = 0;
459   char buff[1024];
460   conn_q *cq = NULL;
461
462   d = calloc(1, sizeof(*d));
463   GET_QUERY(check_loadall);
464   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
465   i = 0;
466   while(stratcon_database_connect(cq)) {
467     if(i++ > 4) {
468       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
469       release_conn_q(cq);
470       return (void *)(vpsized_int)good;
471     }
472     sleep(1);
473   }
474   PG_EXEC(check_loadall);
475   row_count = PQntuples(d->res);
476  
477   for(i=0; i<row_count; i++) {
478     int rv;
479     int8_t family;
480     struct sockaddr *sin;
481     struct sockaddr_in sin4 = { .sin_family = AF_INET };
482     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
483     char *remote, *id, *target, *module, *name;
484     PG_GET_STR_COL(remote, i, "remote_address");
485     PG_GET_STR_COL(id, i, "id");
486     PG_GET_STR_COL(target, i, "target");
487     PG_GET_STR_COL(module, i, "module");
488     PG_GET_STR_COL(name, i, "name");
489     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
490
491     family = AF_INET;
492     sin = (struct sockaddr *)&sin4;
493     rv = inet_pton(family, remote, &sin4.sin_addr);
494     if(rv != 1) {
495       family = AF_INET6;
496       sin = (struct sockaddr *)&sin6;
497       rv = inet_pton(family, remote, &sin6.sin6_addr);
498       if(rv != 1) {
499         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
500         sin = NULL;
501       }
502     }
503
504     /* stratcon_iep_line_processor takes an allocated operand and frees it */
505     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
506     good++;
507   }
508   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
509         good, row_count, sn->fqdn);
510  bad_row:
511   free_params((ds_single_detail *)d);
512   free(d);
513   if(cq) release_conn_q(cq);
514   return (void *)(vpsized_int)good;
515 }
516 static int
517 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
518                                     struct timeval *now) {
519   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
520   pthread_t *jobs = NULL;
521   int nodes, i = 0, tcnt = 0;
522   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
523   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
524
525   pthread_mutex_lock(&storagenode_to_info_cache_lock);
526   nodes = storagenode_to_info_cache.size;
527   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
528   sns = calloc(MAX(1,nodes), sizeof(*sns));
529   if(nodes == 0) sns[nodes++] = &self;
530   else {
531     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
532     const char *k;
533     void *v;
534     int klen;
535     while(noit_hash_next(&storagenode_to_info_cache,
536                          &iter, &k, &klen, &v)) {
537       sns[i++] = (storagenode_info *)v;
538     }
539   }
540   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
541
542   for(i=0; i<nodes; i++) {
543     if(pthread_create(&jobs[i], NULL,
544                       stratcon_datastore_check_loadall, sns[i]) != 0) {
545       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
546     }
547   }
548   for(i=0; i<nodes; i++) {
549     void *good;
550     pthread_join(jobs[i], &good);
551     tcnt += (int)(vpsized_int)good;
552   }
553   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
554   return 0;
555 }
556 void
557 stratcon_datastore_iep_check_preload() {
558   eventer_t e;
559   conn_pool *cpool;
560
561   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
562   e = eventer_alloc();
563   e->mask = EVENTER_ASYNCH;
564   e->callback = stratcon_datastore_asynch_drive_iep;
565   e->closure = NULL;
566   eventer_add_asynch(cpool->jobq, e);
567 }
568 execute_outcome_t
569 stratcon_datastore_find(ds_rt_detail *d) {
570   conn_q *cq;
571   char *val;
572   int row_count;
573   struct realtime_tracker *node;
574
575   for(node = d->rt; node; node = node->next) {
576     char uuid_str[UUID_STR_LEN+1];
577     const char *fqdn, *dsn, *remote_cn;
578     char remote_ip[32];
579     int storagenode_id;
580
581     uuid_unparse_lower(node->checkid, uuid_str);
582     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
583                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
584       continue;
585
586     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n",
587           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)");
588
589     /* We might be able to find the IP from our config if someone has
590      * specified the expected cn in the noit definition.
591      */
592     if(stratcon_find_noit_ip_by_cn(remote_cn,
593                                    remote_ip, sizeof(remote_ip)) == 0) {
594       node->noit = strdup(remote_ip);
595       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit);
596       continue;
597     }
598
599     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
600     stratcon_database_connect(cq);
601
602     GET_QUERY(check_find);
603     DECLARE_PARAM_INT(node->sid);
604     PG_EXEC(check_find);
605     row_count = PQntuples(d->res);
606     if(row_count != 1) {
607       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid);
608       PQclear(d->res);
609       goto bad_row;
610     }
611
612     /* Get the remote_address (which noit owns this) */
613     PG_GET_STR_COL(val, 0, "remote_address");
614     if(!val) {
615       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn);
616       PQclear(d->res);
617       goto bad_row;
618     }
619     node->noit = strdup(val);
620     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit);
621    bad_row:
622     free_params((ds_single_detail *)d);
623     d->nparams = 0;
624     release_conn_q(cq);
625   }
626   return DS_EXEC_SUCCESS;
627 }
628 execute_outcome_t
629 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
630                            ds_line_detail *d) {
631   int type, len, sid;
632   char *final_buff;
633   uLong final_len, actual_final_len;
634   char *token;
635   char raddr_blank[1] = "";
636   const char *raddr;
637
638   type = d->data[0];
639   raddr = r ? r : raddr_blank;
640
641   /* Parse the log line, but only if we haven't already */
642   if(!d->nparams) {
643     char *scp, *ecp;
644
645     scp = d->data;
646 #define PROCESS_NEXT_FIELD(t,l) do { \
647   if(!*scp) goto bad_row; \
648   ecp = strchr(scp, '\t'); \
649   if(!ecp) goto bad_row; \
650   token = scp; \
651   len = (ecp-scp); \
652   scp = ecp + 1; \
653 } while(0)
654 #define PROCESS_LAST_FIELD(t,l) do { \
655   if(!*scp) ecp = scp; \
656   else { \
657     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
658     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
659   } \
660   t = scp; \
661   l = (ecp-scp); \
662 } while(0)
663
664     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
665     switch(type) {
666       /* See noit_check_log.c for log description */
667       case 'n':
668         DECLARE_PARAM_STR(raddr, strlen(raddr));
669         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
670         DECLARE_PARAM_STR("noitd",5); /* node_type */
671         PROCESS_NEXT_FIELD(token,len);
672         d->whence = (time_t)strtoul(token, NULL, 10);
673         DECLARE_PARAM_STR(token,len); /* timestamp */
674
675         /* This is the expected uncompressed len */
676         PROCESS_NEXT_FIELD(token,len);
677         final_len = atoi(token);
678         final_buff = malloc(final_len);
679         if(!final_buff) goto bad_row;
680  
681         /* The last token is b64 endoded and compressed.
682          * we need to decode it, declare it and then free it.
683          */
684         PROCESS_LAST_FIELD(token, len);
685         /* We can in-place decode this */
686         len = noit_b64_decode((char *)token, len,
687                               (unsigned char *)token, len);
688         if(len <= 0) {
689           noitL(noit_error, "noitd config base64 decoding error.\n");
690           free(final_buff);
691           goto bad_row;
692         }
693         actual_final_len = final_len;
694         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
695                               (unsigned char *)token, len)) {
696           noitL(noit_error, "noitd config decompression failure.\n");
697           free(final_buff);
698           goto bad_row;
699         }
700         if(final_len != actual_final_len) {
701           noitL(noit_error, "noitd config decompression error.\n");
702           free(final_buff);
703           goto bad_row;
704         }
705         DECLARE_PARAM_STR(final_buff, final_len);
706         free(final_buff);
707         break;
708       case 'C':
709         DECLARE_PARAM_STR(raddr, strlen(raddr));
710         PROCESS_NEXT_FIELD(token,len);
711         DECLARE_PARAM_STR(token,len); /* timestamp */
712         d->whence = (time_t)strtoul(token, NULL, 10);
713         PROCESS_NEXT_FIELD(token, len);
714         sid = uuid_to_sid(token, remote_cn);
715         if(sid == 0) goto bad_row;
716         DECLARE_PARAM_INT(sid); /* sid */
717         DECLARE_PARAM_STR(token,len); /* uuid */
718         PROCESS_NEXT_FIELD(token, len);
719         DECLARE_PARAM_STR(token,len); /* target */
720         PROCESS_NEXT_FIELD(token, len);
721         DECLARE_PARAM_STR(token,len); /* module */
722         PROCESS_LAST_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* name */
724         break;
725       case 'M':
726         PROCESS_NEXT_FIELD(token,len);
727         DECLARE_PARAM_STR(token,len); /* timestamp */
728         d->whence = (time_t)strtoul(token, NULL, 10);
729         PROCESS_NEXT_FIELD(token, len);
730         sid = uuid_to_sid(token, remote_cn);
731         if(sid == 0) goto bad_row;
732         DECLARE_PARAM_INT(sid); /* sid */
733         PROCESS_NEXT_FIELD(token, len);
734         DECLARE_PARAM_STR(token,len); /* name */
735         PROCESS_NEXT_FIELD(token,len);
736         d->metric_type = *token;
737         PROCESS_LAST_FIELD(token,len);
738         DECLARE_PARAM_STR(token,len); /* value */
739         break;
740       case 'S':
741         PROCESS_NEXT_FIELD(token,len);
742         DECLARE_PARAM_STR(token,len); /* timestamp */
743         d->whence = (time_t)strtoul(token, NULL, 10);
744         PROCESS_NEXT_FIELD(token, len);
745         sid = uuid_to_sid(token, remote_cn);
746         if(sid == 0) goto bad_row;
747         DECLARE_PARAM_INT(sid); /* sid */
748         PROCESS_NEXT_FIELD(token, len);
749         DECLARE_PARAM_STR(token,len); /* state */
750         PROCESS_NEXT_FIELD(token, len);
751         DECLARE_PARAM_STR(token,len); /* availability */
752         PROCESS_NEXT_FIELD(token, len);
753         DECLARE_PARAM_STR(token,len); /* duration */
754         PROCESS_LAST_FIELD(token,len);
755         DECLARE_PARAM_STR(token,len); /* status */
756         break;
757       default:
758         goto bad_row;
759     }
760
761   }
762
763   /* Now execute the query */
764   switch(type) {
765     case 'n':
766       GET_QUERY(config_insert);
767       PG_EXEC(config_insert);
768       PQclear(d->res);
769       break;
770     case 'C':
771       GET_QUERY(check_insert);
772       PG_TM_EXEC(check_insert, d->whence);
773       PQclear(d->res);
774       break;
775     case 'S':
776       GET_QUERY(status_insert);
777       PG_TM_EXEC(status_insert, d->whence);
778       PQclear(d->res);
779       break;
780     case 'M':
781       switch(d->metric_type) {
782         case METRIC_INT32:
783         case METRIC_UINT32:
784         case METRIC_INT64:
785         case METRIC_UINT64:
786         case METRIC_DOUBLE:
787           GET_QUERY(metric_insert_numeric);
788           PG_TM_EXEC(metric_insert_numeric, d->whence);
789           PQclear(d->res);
790           break;
791         case METRIC_STRING:
792           GET_QUERY(metric_insert_text);
793           PG_TM_EXEC(metric_insert_text, d->whence);
794           PQclear(d->res);
795           break;
796         default:
797           goto bad_row;
798       }
799       break;
800     default:
801       /* should never get here */
802       goto bad_row;
803   }
804   return DS_EXEC_SUCCESS;
805  bad_row:
806   return DS_EXEC_ROW_FAILED;
807 }
808 static int
809 stratcon_database_post_connect(conn_q *cq) {
810   int rv = 0;
811   ds_single_detail _d = { 0 }, *d = &_d;
812   if(cq->fqdn) {
813     char *remote_str, *remote_cn;
814     /* This is the silly way we get null's in through our declare_param_str */
815     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
816     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
817     /* This is a storage node, it gets the storage node post_connect */
818     GET_QUERY(storage_post_connect);
819     rv = -1; /* now we're serious */
820     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
821     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
822     PG_EXEC(storage_post_connect);
823     PQclear(d->res);
824     rv = 0;
825   }
826   else {
827     /* Metanode post_connect */
828     GET_QUERY(metanode_post_connect);
829     rv = -1; /* now we're serious */
830     PG_EXEC(metanode_post_connect);
831     PQclear(d->res);
832     rv = 0;
833   }
834  bad_row:
835   free_params(d);
836   if(rv == -1) {
837     /* Post-connect intentions are serious and fatal */
838     PQfinish(cq->dbh);
839     cq->dbh = NULL;
840   }
841   return rv;
842 }
843 static int
844 stratcon_database_connect(conn_q *cq) {
845   char *dsn, dsn_meta[512];
846   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
847   const char *k, *v;
848   int klen;
849   noit_hash_table *t;
850
851   dsn_meta[0] = '\0';
852   if(!cq->dsn) {
853     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
854     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
855       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
856       strlcat(dsn_meta, k, sizeof(dsn_meta));
857       strlcat(dsn_meta, "=", sizeof(dsn_meta));
858       strlcat(dsn_meta, v, sizeof(dsn_meta));
859     }
860     noit_hash_destroy(t, free, free);
861     free(t);
862     dsn = dsn_meta;
863   }
864   else {
865     char options[32];
866     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
867     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
868                                options, sizeof(options))) {
869       strlcat(dsn_meta, " ", sizeof(dsn_meta));
870       strlcat(dsn_meta, "user", sizeof(dsn_meta));
871       strlcat(dsn_meta, "=", sizeof(dsn_meta));
872       strlcat(dsn_meta, options, sizeof(dsn_meta));
873     }
874     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
875                                options, sizeof(options))) {
876       strlcat(dsn_meta, " ", sizeof(dsn_meta));
877       strlcat(dsn_meta, "password", sizeof(dsn_meta));
878       strlcat(dsn_meta, "=", sizeof(dsn_meta));
879       strlcat(dsn_meta, options, sizeof(dsn_meta));
880     }
881     dsn = dsn_meta;
882   }
883
884   if(cq->dbh) {
885     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
886     PQreset(cq->dbh);
887     if(PQstatus(cq->dbh) != CONNECTION_OK) {
888       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
889             dsn, PQerrorMessage(cq->dbh));
890       return -1;
891     }
892     if(stratcon_database_post_connect(cq)) return -1;
893     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
894     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
895           dsn, PQerrorMessage(cq->dbh));
896     return -1;
897   }
898
899   cq->dbh = PQconnectdb(dsn);
900   if(!cq->dbh) return -1;
901   if(PQstatus(cq->dbh) != CONNECTION_OK) {
902     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
903           dsn, PQerrorMessage(cq->dbh));
904     return -1;
905   }
906   if(stratcon_database_post_connect(cq)) return -1;
907   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
908   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
909         dsn, PQerrorMessage(cq->dbh));
910   return -1;
911 }
912 static int
913 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
914                                 const char *name) {
915   int rv = -1;
916   PGresult *res;
917   char cmd[128];
918   strlcpy(cmd, p, sizeof(cmd));
919   strlcat(cmd, name, sizeof(cmd));
920   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
921   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
922   PQclear(res);
923   return rv;
924 }
925 static int
926 stratcon_datastore_do(conn_q *cq, const char *cmd) {
927   PGresult *res;
928   int rv = -1;
929   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
930   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
931   PQclear(res);
932   return rv;
933 }
934 #define BUSTED(cq) do { \
935   PQfinish((cq)->dbh); \
936   (cq)->dbh = NULL; \
937   goto full_monty; \
938 } while(0)
939 #define SAVEPOINT(name) do { \
940   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
941   last_sp = current; \
942 } while(0)
943 #define ROLLBACK_TO_SAVEPOINT(name) do { \
944   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
945     BUSTED(cq); \
946   last_sp = NULL; \
947 } while(0)
948 #define RELEASE_SAVEPOINT(name) do { \
949   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
950     BUSTED(cq); \
951   last_sp = NULL; \
952 } while(0)
953 int
954 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
955                                  struct timeval *now) {
956   ds_rt_detail *dsjd = closure;
957   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
958   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
959
960   assert(dsjd->rt);
961   stratcon_datastore_find(dsjd);
962   if(dsjd->completion_event)
963     eventer_add(dsjd->completion_event);
964
965   free_params((ds_single_detail *)dsjd);
966   free(dsjd);
967   return 0;
968 }
969 static const char *
970 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
971   void *vinfo;
972   char *dsn = NULL, *fqdn = NULL;
973   int found = 0;
974   storagenode_info *info = NULL;
975   pthread_mutex_lock(&storagenode_to_info_cache_lock);
976   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
977                         &vinfo)) {
978     found = 1;
979     info = vinfo;
980   }
981   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
982   if(found) {
983     if(fqdn_out) *fqdn_out = info->fqdn;
984     return info->dsn;
985   }
986
987   if(!found && can_use_db) {
988     ds_single_detail *d;
989     conn_q *cq;
990     int row_count;
991     /* Look it up and store it */
992     d = calloc(1, sizeof(*d));
993     cq = get_conn_q_for_metanode();
994     GET_QUERY(find_storage);
995     DECLARE_PARAM_INT(id);
996     PG_EXEC(find_storage);
997     row_count = PQntuples(d->res);
998     if(row_count) {
999       PG_GET_STR_COL(dsn, 0, "dsn");
1000       PG_GET_STR_COL(fqdn, 0, "fqdn");
1001       fqdn = fqdn ? strdup(fqdn) : NULL;
1002       dsn = dsn ? strdup(dsn) : NULL;
1003     }
1004     PQclear(d->res);
1005    bad_row:
1006     free_params(d);
1007     free(d);
1008     release_conn_q(cq);
1009   }
1010   if(fqdn) {
1011     info = calloc(1, sizeof(*info));
1012     info->fqdn = fqdn;
1013     if(fqdn_out) *fqdn_out = info->fqdn;
1014     info->dsn = dsn;
1015     info->storagenode_id = id;
1016     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1017     noit_hash_store(&storagenode_to_info_cache,
1018                     (void *)&info->storagenode_id, sizeof(int), info);
1019     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1020   }
1021   return info ? info->dsn : NULL;
1022 }
1023 static ds_line_detail *
1024 build_insert_batch(interim_journal_t *ij) {
1025   int rv;
1026   off_t len;
1027   const char *buff, *cp, *lcp;
1028   struct stat st;
1029   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1030
1031   if(ij->fd < 0) {
1032     ij->fd = open(ij->filename, O_RDONLY);
1033     if(ij->fd < 0) {
1034       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1035             ij->filename, strerror(errno));
1036       assert(ij->fd >= 0);
1037     }
1038   }
1039   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1040   if(rv == -1) {
1041       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1042             ij->filename, strerror(errno));
1043     assert(rv != -1);
1044   }
1045   len = st.st_size;
1046   if(len > 0) {
1047     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1048     if(buff == (void *)-1) {
1049       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1050             ij->filename, strerror(errno));
1051       assert(buff != (void *)-1);
1052     }
1053     lcp = buff;
1054     while(lcp < (buff + len) &&
1055           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1056       next = calloc(1, sizeof(*next));
1057       next->data = malloc(cp - lcp + 1);
1058       memcpy(next->data, lcp, cp - lcp);
1059       next->data[cp - lcp] = '\0';
1060       if(!head) head = next;
1061       if(last) last->next = next;
1062       last = next;
1063       lcp = cp + 1;
1064     }
1065     munmap((void *)buff, len);
1066   }
1067   close(ij->fd);
1068   return head;
1069 }
1070 static void
1071 interim_journal_remove(interim_journal_t *ij) {
1072   unlink(ij->filename);
1073   if(ij->filename) free(ij->filename);
1074   if(ij->remote_str) free(ij->remote_str);
1075   if(ij->remote_cn) free(ij->remote_cn);
1076   if(ij->fqdn) free(ij->fqdn);
1077 }
1078 int
1079 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1080                                   struct timeval *now) {
1081   int i, total, success, sp_total, sp_success;
1082   interim_journal_t *ij;
1083   ds_line_detail *head = NULL, *current, *last_sp;
1084   const char *dsn;
1085   conn_q *cq;
1086   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1087   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1088
1089   ij = closure;
1090   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1091   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1092   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1093                              ij->fqdn, dsn);
1094   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1095         ij->remote_str, ij->remote_cn, ij->fqdn);
1096  full_monty:
1097   /* Make sure we have a connection */
1098   i = 1;
1099   while(stratcon_database_connect(cq)) {
1100     noitL(noit_error, "Error connecting to database: %s\n",
1101           ij->fqdn ? ij->fqdn : "(null)");
1102     sleep(i);
1103     i *= 2;
1104     i = MIN(i, 16);
1105   }
1106
1107   if(head == NULL) head = build_insert_batch(ij);
1108   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1109         ij->remote_str ? ij->remote_str : "(null)",
1110         ij->remote_cn ? ij->remote_cn : "(null)",
1111         ij->fqdn ? ij->fqdn : "(null)");
1112   current = head;
1113   last_sp = NULL;
1114   total = success = sp_total = sp_success = 0;
1115   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1116   while(current) {
1117     execute_outcome_t rv;
1118     if(current->data) {
1119       if(!last_sp) {
1120         SAVEPOINT("batch");
1121         sp_success = success;
1122         sp_total = total;
1123       }
1124  
1125       if(current->problematic) {
1126         RELEASE_SAVEPOINT("batch");
1127         current = current->next;
1128         total++;
1129         continue;
1130       }
1131       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1132                                       current);
1133       switch(rv) {
1134         case DS_EXEC_SUCCESS:
1135           total++;
1136           success++;
1137           current = current->next;
1138           break;
1139         case DS_EXEC_ROW_FAILED:
1140           /* rollback to savepoint, mark this record as bad and start again */
1141           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1142           current->problematic = 1;
1143           current = last_sp;
1144           success = sp_success;
1145           total = sp_total;
1146           ROLLBACK_TO_SAVEPOINT("batch");
1147           break;
1148         case DS_EXEC_TXN_FAILED:
1149           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1150           BUSTED(cq);
1151       }
1152     }
1153   }
1154   if(last_sp) RELEASE_SAVEPOINT("batch");
1155   if(stratcon_datastore_do(cq, "COMMIT")) {
1156     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1157     BUSTED(cq);
1158   }
1159   /* Cleanup the mess */
1160   while(head) {
1161     ds_line_detail *tofree;
1162     tofree = head;
1163     head = head->next;
1164     if(tofree->data) free(tofree->data);
1165     free_params((ds_single_detail *)tofree);
1166     free(tofree);
1167   }
1168   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1169         ij->remote_str ? ij->remote_str : "(null)",
1170         ij->remote_cn ? ij->remote_cn : "(null)",
1171         ij->fqdn ? ij->fqdn : "(null)", success, total);
1172   interim_journal_remove(ij);
1173   release_conn_q(cq);
1174   return 0;
1175 }
1176 static int
1177 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1178                                 struct timeval *now) {
1179   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1180   const char *k;
1181   int klen;
1182   void *vij;
1183   interim_journal_t *ij;
1184   syncset_t *syncset = closure;
1185
1186   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1187     eventer_add(syncset->completion);
1188     eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1189     free(syncset);
1190     return 0;
1191   }
1192   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1193
1194   noitL(ds_deb, "Syncing journal sets...\n");
1195   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1196     eventer_t ingest;
1197     ij = vij;
1198     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1199           ij->remote_str, ij->remote_cn, ij->fqdn);
1200     fsync(ij->fd);
1201     close(ij->fd);
1202     ij->fd = -1;
1203     ingest = eventer_alloc();
1204     ingest->mask = EVENTER_ASYNCH;
1205     ingest->callback = stratcon_datastore_asynch_execute;
1206     ingest->closure = ij;
1207     eventer_add_asynch(ij->cpool->jobq, ingest);
1208   }
1209   noit_hash_destroy(syncset->ws, free, NULL);
1210   free(syncset->ws);
1211   return 0;
1212 }
1213 static interim_journal_t *
1214 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1215                     int storagenode_id, const char *fqdn_in) {
1216   void *vhash, *vij;
1217   noit_hash_table *working_set;
1218   interim_journal_t *ij;
1219   struct timeval now;
1220   char jpath[PATH_MAX];
1221   char remote_str[128];
1222   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1223   const char *fqdn = fqdn_in ? fqdn_in : "default";
1224
1225   noit_convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1226   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1227
1228   /* Lookup the working set */
1229   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1230     working_set = calloc(1, sizeof(*working_set));
1231     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1232                     working_set);
1233   }
1234   else
1235     working_set = vhash;
1236
1237   /* Lookup the interim journal within the working set */
1238   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1239     ij = calloc(1, sizeof(*ij));
1240     gettimeofday(&now, NULL);
1241     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1242              basejpath, remote_str, remote_cn, storagenode_id,
1243              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1244     ij->remote_str = strdup(remote_str);
1245     ij->remote_cn = strdup(remote_cn);
1246     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1247     ij->storagenode_id = storagenode_id;
1248     ij->filename = strdup(jpath);
1249     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1250                                          ij->fqdn);
1251     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1252     if(ij->fd < 0 && errno == ENOENT) {
1253       if(mkdir_for_file(ij->filename, 0750)) {
1254         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1255               ij->filename, strerror(errno));
1256         exit(-1);
1257       }
1258       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1259     }
1260     if(ij->fd < 0) {
1261       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1262             ij->filename, strerror(errno));
1263       exit(-1);
1264     }
1265     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1266   }
1267   else
1268     ij = vij;
1269
1270   return ij;
1271 }
1272 static int
1273 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1274                           int *sid_out, int *storagenode_id_out,
1275                           const char **remote_cn_out,
1276                           const char **fqdn_out, const char **dsn_out) {
1277   /* only called from the main thread -- no safety issues */
1278   void *vuuidinfo, *vinfo;
1279   uuid_info *uuidinfo;
1280   storagenode_info *info = NULL;
1281   char *fqdn = NULL;
1282   char *dsn = NULL;
1283   char *new_remote_cn = NULL;
1284   int storagenode_id = 0, sid = 0;
1285   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1286                          &vuuidinfo)) {
1287     int row_count = 0;
1288     char *tmpint;
1289     ds_single_detail *d;
1290     conn_q *cq;
1291
1292     /* We can't do a database lookup without the remote_cn */
1293     if(!remote_cn) {
1294       if(stratcon_datastore_get_enabled()) {
1295         /* We have an authoritatively maintained cache, we don't do lookups */
1296         return -1;
1297       }
1298       else
1299         remote_cn = "[[null]]";
1300     }
1301
1302     d = calloc(1, sizeof(*d));
1303     cq = get_conn_q_for_metanode();
1304     if(stratcon_database_connect(cq) == 0) {
1305       /* Blocking call to service the cache miss */
1306       GET_QUERY(check_map);
1307       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1308       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1309       PG_EXEC(check_map);
1310       row_count = PQntuples(d->res);
1311       if(row_count != 1) {
1312         PQclear(d->res);
1313         goto bad_row;
1314       }
1315       PG_GET_STR_COL(tmpint, 0, "sid");
1316       if(!tmpint) {
1317         row_count = 0;
1318         PQclear(d->res);
1319         goto bad_row;
1320       }
1321       sid = atoi(tmpint);
1322       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1323       if(tmpint) storagenode_id = atoi(tmpint);
1324       PG_GET_STR_COL(fqdn, 0, "fqdn");
1325       PG_GET_STR_COL(dsn, 0, "dsn");
1326       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1327       fqdn = fqdn ? strdup(fqdn) : NULL;
1328       dsn = dsn ? strdup(dsn) : NULL;
1329       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL;
1330       PQclear(d->res);
1331     }
1332    bad_row:
1333     free_params((ds_single_detail *)d);
1334     free(d);
1335     release_conn_q(cq);
1336     if(row_count != 1) {
1337       return -1;
1338     }
1339     /* Place in cache */
1340     uuidinfo = calloc(1, sizeof(*uuidinfo));
1341     uuidinfo->sid = sid;
1342     uuidinfo->uuid_str = strdup(uuid_str);
1343     uuidinfo->storagenode_id = storagenode_id;
1344     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn);
1345     noit_hash_store(&uuid_to_info_cache,
1346                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1347     /* Also, we may have just witnessed a new storage node, store it */
1348     if(storagenode_id) {
1349       int needs_free = 0;
1350       info = calloc(1, sizeof(*info));
1351       info->storagenode_id = storagenode_id;
1352       info->dsn = dsn ? strdup(dsn) : NULL;
1353       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1354       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1355       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1356                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1357         /* hack to save memory -- we *never* remove from these caches,
1358            so we can use the same fqdn value in the above cache for the key
1359            in the cache below -- (no strdup) */
1360         noit_hash_store(&storagenode_to_info_cache,
1361                         (void *)&info->storagenode_id, sizeof(int), info);
1362       }
1363       else needs_free = 1;
1364       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1365       if(needs_free) {
1366         if(info->dsn) free(info->dsn);
1367         if(info->fqdn) free(info->fqdn);
1368         free(info);
1369       }
1370     }
1371   }
1372   else
1373     uuidinfo = vuuidinfo;
1374
1375   if(uuidinfo && uuidinfo->storagenode_id) {
1376     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1377       /* we don't have dsn and we actually want it */
1378       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1379       if(noit_hash_retrieve(&storagenode_to_info_cache,
1380                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1381                             &vinfo))
1382         info = vinfo;
1383       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1384     }
1385   }
1386
1387   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL;
1388   if(dsn_out) *dsn_out = info ? info->dsn : NULL;
1389   assert(uuidinfo);
1390   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1391   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1392   if(sid_out) *sid_out = uuidinfo->sid;
1393   if(fqdn) free(fqdn);
1394   if(dsn) free(dsn);
1395   if(new_remote_cn) free(new_remote_cn);
1396   return 0;
1397 }
1398 static int
1399 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1400   char uuid_str[UUID_STR_LEN+1];
1401   int sid = 0;
1402   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1403   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1404   return sid;
1405 }
1406 static void
1407 stratcon_datastore_journal(struct sockaddr *remote,
1408                            const char *remote_cn, const char *line) {
1409   interim_journal_t *ij = NULL;
1410   char uuid_str[UUID_STR_LEN+1], *cp;
1411   const char *fqdn = NULL, *dsn = NULL;
1412   int storagenode_id = 0;
1413   uuid_t checkid;
1414   if(!line) return;
1415   /* if it is a UUID based thing, find the storage node */
1416   switch(*line) {
1417     case 'C':
1418     case 'S':
1419     case 'M':
1420       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1421         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1422         if(!uuid_parse(uuid_str, checkid)) {
1423           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1424                                     &storagenode_id, NULL, &fqdn, &dsn);
1425           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1426         }
1427       }
1428       break;
1429     case 'n':
1430       ij = interim_journal_get(remote,remote_cn,0,NULL);
1431       break;
1432     default:
1433       break;
1434   }
1435   if(!ij) {
1436     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1437   }
1438   else {
1439     int len;
1440     len = write(ij->fd, line, strlen(line));
1441     if(len < 0) {
1442       noitL(noit_error, "write to %s failed: %s\n",
1443             ij->filename, strerror(errno));
1444     }
1445   }
1446   return;
1447 }
1448 static noit_hash_table *
1449 stratcon_datastore_journal_remove(struct sockaddr *remote,
1450                                   const char *remote_cn) {
1451   void *vhash = NULL;
1452   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1453     /* pluck it out */
1454     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1455   }
1456   else {
1457     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1458           remote_cn);
1459     abort();
1460   }
1461   return vhash;
1462 }
1463 void
1464 stratcon_datastore_push(stratcon_datastore_op_t op,
1465                         struct sockaddr *remote,
1466                         const char *remote_cn, void *operand,
1467                         eventer_t completion) {
1468   conn_pool *cpool;
1469   syncset_t *syncset;
1470   eventer_t e;
1471   ds_rt_detail *rtdetail;
1472   struct datastore_onlooker_list *nnode;
1473
1474   for(nnode = onlookers; nnode; nnode = nnode->next)
1475     nnode->dispatch(op,remote,remote_cn,operand);
1476
1477   switch(op) {
1478     case DS_OP_INSERT:
1479       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1480       break;
1481     case DS_OP_CHKPT:
1482       e = eventer_alloc();
1483       syncset = calloc(1, sizeof(*syncset));
1484       e->mask = EVENTER_ASYNCH;
1485       e->callback = stratcon_datastore_journal_sync;
1486       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1487       syncset->completion = completion;
1488       e->closure = syncset;
1489       eventer_add(e);
1490       break;
1491     case DS_OP_FIND_COMPLETE:
1492       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1493       rtdetail = calloc(1, sizeof(*rtdetail));
1494       rtdetail->rt = operand;
1495       rtdetail->completion_event = completion;
1496       e = eventer_alloc();
1497       e->mask = EVENTER_ASYNCH;
1498       e->callback = stratcon_datastore_asynch_lookup;
1499       e->closure = rtdetail;
1500       eventer_add_asynch(cpool->jobq, e);
1501       break;
1502   }
1503 }
1504
1505 int
1506 stratcon_datastore_saveconfig(void *unused) {
1507   int rv = -1;
1508   char *buff;
1509   ds_single_detail _d = { 0 }, *d = &_d;
1510   conn_q *cq;
1511   char ipv4_str[32];
1512   struct in_addr r, l;
1513
1514   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1515   memset(&l, 0, sizeof(l));
1516   noit_getip_ipv4(r, &l);
1517   /* Ignore the error.. what are we going to do anyway */
1518   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1519     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1520
1521   cq = get_conn_q_for_metanode();
1522
1523   if(stratcon_database_connect(cq) == 0) {
1524     char time_as_str[20];
1525     size_t len;
1526     buff = noit_conf_xml_in_mem(&len);
1527     if(!buff) goto bad_row;
1528
1529     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1530     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1531     DECLARE_PARAM_STR("", 0);
1532     DECLARE_PARAM_STR("stratcond", 9);
1533     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1534     DECLARE_PARAM_STR(buff, len);
1535     free(buff);
1536
1537     GET_QUERY(config_insert);
1538     PG_EXEC(config_insert);
1539     PQclear(d->res);
1540     rv = 0;
1541
1542     bad_row:
1543       free_params(d);
1544   }
1545   release_conn_q(cq);
1546   return rv;
1547 }
1548
1549 void
1550 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1551                                                struct sockaddr *,
1552                                                const char *, void *)) {
1553   struct datastore_onlooker_list *nnode;
1554   nnode = calloc(1, sizeof(*nnode));
1555   nnode->dispatch = f;
1556   nnode->next = onlookers;
1557   while(noit_atomic_casptr((volatile void **)&onlookers,
1558                            nnode, nnode->next) != (void *)nnode->next)
1559     nnode->next = onlookers;
1560 }
1561 static void
1562 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1563                                          char *id_str, char *file) {
1564   char path[PATH_MAX];
1565   interim_journal_t *ij;
1566   eventer_t ingest;
1567
1568   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1569            basejpath, remote_str, remote_cn, id_str, file);
1570   ij = calloc(1, sizeof(*ij));
1571   ij->fd = open(path, O_RDONLY);
1572   if(ij->fd < 0) {
1573     noitL(noit_error, "cannot open journal '%s': %s\n",
1574           path, strerror(errno));
1575     free(ij);
1576     return;
1577   }
1578   close(ij->fd);
1579   ij->fd = -1;
1580   ij->filename = strdup(path);
1581   ij->remote_str = strdup(remote_str);
1582   ij->remote_cn = strdup(remote_cn);
1583   ij->storagenode_id = atoi(id_str);
1584   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1585                                        ij->fqdn);
1586   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1587   ingest = eventer_alloc();
1588   ingest->mask = EVENTER_ASYNCH;
1589   ingest->callback = stratcon_datastore_asynch_execute;
1590   ingest->closure = ij;
1591   eventer_add_asynch(ij->cpool->jobq, ingest);
1592 }
1593 static void
1594 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1595   char path[PATH_MAX];
1596   DIR *root;
1597   struct dirent *de, *entry;
1598   int i = 0, cnt = 0;
1599   char **entries;
1600   int size = 0;
1601
1602   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1603            first ? "/" : "", first ? first : "",
1604            second ? "/" : "", second ? second : "",
1605            third ? "/" : "", third ? third : "");
1606 #ifdef _PC_NAME_MAX
1607   size = pathconf(path, _PC_NAME_MAX);
1608 #endif
1609   size = MIN(size, PATH_MAX + 128);
1610   de = alloca(size);
1611   root = opendir(path);
1612   if(!root) return;
1613   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++;
1614   closedir(root);
1615   root = opendir(path);
1616   if(!root) return;
1617   entries = malloc(sizeof(*entries) * cnt);
1618   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) {
1619     if(i < cnt) {
1620       entries[i++] = strdup(entry->d_name);
1621     }
1622   }
1623   closedir(root);
1624   cnt = i; /* could have changed, directories are fickle */
1625   qsort(entries, i, sizeof(*entries),
1626         (int (*)(const void *, const void *))strcasecmp);
1627   for(i=0; i<cnt; i++) {
1628     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1629     noitL(ds_deb, "Processing L%d entry '%s'\n",
1630           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1631     if(!first)
1632       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1633     else if(!second)
1634       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1635     else if(!third)
1636       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1637     else if(strlen(entries[i]) == 16)
1638       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1639   }
1640 }
1641 static void
1642 stratcon_datastore_sweep_journals() {
1643   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1644 }
1645
1646 int
1647 stratcon_datastore_ingest_all_storagenode_info() {
1648   int i, cnt = 0;
1649   ds_single_detail _d = { 0 }, *d = &_d;
1650   conn_q *cq;
1651   cq = get_conn_q_for_metanode();
1652
1653   while(stratcon_database_connect(cq)) {
1654     noitL(noit_error, "Error connecting to database\n");
1655     sleep(1);
1656   }
1657
1658   GET_QUERY(all_storage);
1659   PG_EXEC(all_storage);
1660   cnt = PQntuples(d->res);
1661   for(i=0; i<cnt; i++) {
1662     void *vinfo;
1663     char *tmpint, *fqdn, *dsn;
1664     int storagenode_id;
1665     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1666     storagenode_id = atoi(tmpint);
1667     PG_GET_STR_COL(fqdn, i, "fqdn");
1668     PG_GET_STR_COL(dsn, i, "dsn");
1669     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1670     storagenode_id = tmpint ? atoi(tmpint) : 0;
1671
1672     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1673                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1674       storagenode_info *info;
1675       info = calloc(1, sizeof(*info));
1676       info->storagenode_id = storagenode_id;
1677       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1678       info->dsn = dsn ? strdup(dsn) : NULL;
1679       noit_hash_store(&storagenode_to_info_cache,
1680                       (void *)&info->storagenode_id, sizeof(int), info);
1681       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1682             info->storagenode_id,
1683             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1684     }
1685   }
1686   PQclear(d->res);
1687  bad_row:
1688   free_params(d);
1689
1690   release_conn_q(cq);
1691   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1692   return cnt;
1693 }
1694 int
1695 stratcon_datastore_ingest_all_check_info() {
1696   int i, cnt, loaded = 0;
1697   ds_single_detail _d = { 0 }, *d = &_d;
1698   conn_q *cq;
1699   cq = get_conn_q_for_metanode();
1700
1701   while(stratcon_database_connect(cq)) {
1702     noitL(noit_error, "Error connecting to database\n");
1703     sleep(1);
1704   }
1705
1706   GET_QUERY(check_mapall);
1707   PG_EXEC(check_mapall);
1708   cnt = PQntuples(d->res);
1709   for(i=0; i<cnt; i++) {
1710     void *vinfo;
1711     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1712     int sid, storagenode_id;
1713     uuid_info *uuidinfo;
1714     PG_GET_STR_COL(uuid_str, i, "id");
1715     if(!uuid_str) continue;
1716     PG_GET_STR_COL(tmpint, i, "sid");
1717     if(!tmpint) continue;
1718     sid = atoi(tmpint);
1719     PG_GET_STR_COL(fqdn, i, "fqdn");
1720     PG_GET_STR_COL(dsn, i, "dsn");
1721     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1722     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1723     storagenode_id = tmpint ? atoi(tmpint) : 0;
1724
1725     uuidinfo = calloc(1, sizeof(*uuidinfo));
1726     uuidinfo->uuid_str = strdup(uuid_str);
1727     uuidinfo->remote_cn = strdup(remote_cn);
1728     uuidinfo->storagenode_id = storagenode_id;
1729     uuidinfo->sid = sid;
1730     noit_hash_store(&uuid_to_info_cache,
1731                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1732     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1733           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1734     loaded++;
1735     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1736                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1737       storagenode_info *info;
1738       info = calloc(1, sizeof(*info));
1739       info->storagenode_id = storagenode_id;
1740       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1741       info->dsn = dsn ? strdup(dsn) : NULL;
1742       noit_hash_store(&storagenode_to_info_cache,
1743                       (void *)&info->storagenode_id, sizeof(int), info);
1744       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1745             info->storagenode_id,
1746             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1747     }
1748   }
1749   PQclear(d->res);
1750  bad_row:
1751   free_params(d);
1752
1753   release_conn_q(cq);
1754   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1755   return loaded;
1756 }
1757
1758 static int
1759 rest_get_noit_config(noit_http_rest_closure_t *restc,
1760                      int npats, char **pats) {
1761   noit_http_session_ctx *ctx = restc->http_ctx;
1762   ds_single_detail *d;
1763   int row_count = 0;
1764   const char *xml = NULL;
1765   conn_q *cq = NULL;
1766
1767   if(npats != 0) {
1768     noit_http_response_server_error(ctx, "text/xml");
1769     noit_http_response_end(ctx);
1770     return 0;
1771   }
1772   d = calloc(1, sizeof(*d));
1773   GET_QUERY(config_get);
1774   cq = get_conn_q_for_metanode();
1775   if(!cq) {
1776     noit_http_response_server_error(ctx, "text/xml");
1777     goto bad_row;
1778   }
1779
1780   DECLARE_PARAM_STR(restc->remote_cn,
1781                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1782   PG_EXEC(config_get);
1783   row_count = PQntuples(d->res);
1784   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1785
1786   if(xml == NULL) {
1787     char buff[1024];
1788     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1789                                  "<row_count>%d</row_count></error>\n",
1790              restc->remote_cn, row_count);
1791     noit_http_response_append(ctx, buff, strlen(buff));
1792     noit_http_response_not_found(ctx, "text/xml");
1793   }
1794   else {
1795     noit_http_response_append(ctx, xml, strlen(xml));
1796     noit_http_response_ok(ctx, "text/xml");
1797   }
1798  bad_row:
1799   free_params((ds_single_detail *)d);
1800   d->nparams = 0;
1801   if(cq) release_conn_q(cq);
1802
1803   noit_http_response_end(ctx);
1804   return 0;
1805 }
1806
1807 void
1808 stratcon_datastore_init() {
1809   pthread_mutex_init(&ds_conns_lock, NULL);
1810   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1811   ds_err = noit_log_stream_find("error/datastore");
1812   ds_deb = noit_log_stream_find("debug/datastore");
1813   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1814   ingest_err = noit_log_stream_find("error/ingest");
1815   if(!ds_err) ds_err = noit_error;
1816   if(!ingest_err) ingest_err = noit_error;
1817   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1818                            &basejpath)) {
1819     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1820     exit(-1);
1821   }
1822   stratcon_datastore_ingest_all_check_info();
1823   stratcon_datastore_ingest_all_storagenode_info();
1824   stratcon_datastore_sweep_journals();
1825
1826   assert(noit_http_rest_register_auth(
1827     "GET", "/noits/", "^config$", rest_get_noit_config,
1828              noit_http_rest_client_cert_auth
1829   ) == 0);
1830 }
1831
Note: See TracBrowser for help on using the browser.