root/src/stratcon_datastore.c

Revision 9773c9f2c8f4413f348fed53ebb19681bfdfd074, 57.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

if I don't know the remote_cn, it should return it to me should I pass in null

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "utils/noit_getip.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_realtime_http.h"
42 #include "stratcon_iep.h"
43 #include "noit_conf.h"
44 #include "noit_check.h"
45 #include "noit_rest.h"
46 #include <unistd.h>
47 #include <fcntl.h>
48 #include <netinet/in.h>
49 #include <sys/un.h>
50 #include <dirent.h>
51 #include <arpa/inet.h>
52 #include <sys/mman.h>
53 #include <libpq-fe.h>
54 #include <zlib.h>
55 #include <assert.h>
56 #include <errno.h>
57
58 #define DECL_STMT(codename,confname) \
59 static char *codename = NULL; \
60 static const char *codename##_conf = "/stratcon/database/statements/" #confname
61
62 DECL_STMT(storage_post_connect, storagepostconnect);
63 DECL_STMT(metanode_post_connect, metanodepostconnect);
64 DECL_STMT(find_storage, findstoragenode);
65 DECL_STMT(all_storage, allstoragenodes);
66 DECL_STMT(check_map, mapchecktostoragenode);
67 DECL_STMT(check_mapall, mapallchecks);
68 DECL_STMT(check_loadall, allchecks);
69 DECL_STMT(check_find, findcheck);
70 DECL_STMT(check_insert, check);
71 DECL_STMT(status_insert, status);
72 DECL_STMT(metric_insert_numeric, metric_numeric);
73 DECL_STMT(metric_insert_text, metric_text);
74 DECL_STMT(config_insert, config);
75 DECL_STMT(config_get, findconfig);
76
77 static noit_log_stream_t ds_err = NULL;
78 static noit_log_stream_t ds_deb = NULL;
79 static noit_log_stream_t ds_pool_deb = NULL;
80 static noit_log_stream_t ingest_err = NULL;
81
82 static int ds_system_enabled = 1;
83 int stratcon_datastore_get_enabled() { return ds_system_enabled; }
84 void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; }
85
86 static struct datastore_onlooker_list {
87   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
88                    const char *, void *);
89   struct datastore_onlooker_list *next;
90 } *onlookers = NULL;
91
92 #define GET_QUERY(a) do { \
93   if(a == NULL) \
94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
95       goto bad_row; \
96 } while(0)
97
98 struct conn_q;
99
100 typedef struct {
101   char            *queue_name; /* the key fqdn+remote_sn */
102   eventer_jobq_t  *jobq;
103   struct conn_q   *head;
104   pthread_mutex_t  lock;
105   pthread_cond_t   cv;
106   int              ttl;
107   int              in_pool;
108   int              outstanding;
109   int              max_allocated;
110   int              max_in_pool;
111 } conn_pool;
112 typedef struct conn_q {
113   time_t           last_use;
114   char            *dsn;        /* Pg connect string */
115   char            *remote_str; /* the IP of the noit*/
116   char            *remote_cn;  /* the Cert CN of the noit */
117   char            *fqdn;       /* the fqdn of the storage node */
118   conn_pool       *pool;
119   struct conn_q   *next;
120   /* Postgres specific stuff */
121   PGconn          *dbh;
122 } conn_q;
123
124
125 #define MAX_PARAMS 8
126 #define POSTGRES_PARTS \
127   PGresult *res; \
128   int rv; \
129   time_t whence; \
130   int nparams; \
131   int metric_type; \
132   char *paramValues[MAX_PARAMS]; \
133   int paramLengths[MAX_PARAMS]; \
134   int paramFormats[MAX_PARAMS]; \
135   int paramAllocd[MAX_PARAMS];
136
137 typedef struct ds_single_detail {
138   POSTGRES_PARTS
139 } ds_single_detail;
140 typedef struct {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   struct realtime_tracker *rt;
144   conn_q *cq; /* connection on which to perform this job */
145   eventer_t completion_event; /* This event should be registered if non NULL */
146 } ds_rt_detail;
147 typedef struct ds_line_detail {
148   /* Postgres specific stuff */
149   POSTGRES_PARTS
150   char *data;
151   int problematic;
152   struct ds_line_detail *next;
153 } ds_line_detail;
154
155 typedef struct {
156   noit_hash_table *ws;
157   eventer_t completion;
158 } syncset_t;
159
160 typedef struct {
161   char *remote_str;
162   char *remote_cn;
163   char *fqdn;
164   int storagenode_id;
165   int fd;
166   char *filename;
167   conn_pool *cpool;
168 } interim_journal_t;
169
170 static int stratcon_database_connect(conn_q *cq);
171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
172 static int storage_node_quick_lookup(const char *uuid_str,
173                                      const char *remote_cn,
174                                      int *sid_out, int *storagenode_id_out,
175                                      const char **remote_cn_out,
176                                      const char **fqdn_out,
177                                      const char **dsn_out);
178
179 static void
180 free_params(ds_single_detail *d) {
181   int i;
182   for(i=0; i<d->nparams; i++)
183     if(d->paramAllocd[i] && d->paramValues[i])
184       free(d->paramValues[i]);
185 }
186
187 char *basejpath = NULL;
188 pthread_mutex_t ds_conns_lock;
189 noit_hash_table ds_conns;
190 noit_hash_table working_sets;
191
192 /* the fqdn cache needs to be thread safe */
193 typedef struct {
194   char *uuid_str;
195   char *remote_cn;
196   int storagenode_id;
197   int sid;
198 } uuid_info;
199 typedef struct {
200   int storagenode_id;
201   char *fqdn;
202   char *dsn;
203 } storagenode_info;
204 noit_hash_table uuid_to_info_cache;
205 pthread_mutex_t storagenode_to_info_cache_lock;
206 noit_hash_table storagenode_to_info_cache;
207
208 int
209 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
210   char name[128] = "";
211   buff[0] = '\0';
212   if(remote) {
213     int len = 0;
214     switch(remote->sa_family) {
215       case AF_INET:
216         len = sizeof(struct sockaddr_in);
217         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
218                   name, len);
219         break;
220       case AF_INET6:
221        len = sizeof(struct sockaddr_in6);
222         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
223                   name, len);
224        break;
225       case AF_UNIX:
226         len = SUN_LEN(((struct sockaddr_un *)remote));
227         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
228         break;
229       default: return 0;
230     }
231   }
232   strlcpy(buff, name, blen);
233   return strlen(buff);
234 }
235
236 /* Thread-safe connection pools */
237
238 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
239 static void
240 release_conn_q_forceable(conn_q *cq, int forcefree) {
241   int putback = 0;
242   cq->last_use = time(NULL);
243   pthread_mutex_lock(&cq->pool->lock);
244   cq->pool->outstanding--;
245   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
246     putback = 1;
247     cq->next = cq->pool->head;
248     cq->pool->head = cq;
249     cq->pool->in_pool++;
250   }
251   pthread_mutex_unlock(&cq->pool->lock);
252   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
253         putback ? "to pool" : "and destroy", cq->pool->queue_name);
254   pthread_cond_signal(&cq->pool->cv);
255   if(putback) return;
256
257   /* Not put back, release it */
258   if(cq->dbh) PQfinish(cq->dbh);
259   if(cq->remote_str) free(cq->remote_str);
260   if(cq->remote_cn) free(cq->remote_cn);
261   if(cq->fqdn) free(cq->fqdn);
262   if(cq->dsn) free(cq->dsn);
263   free(cq);
264 }
265 static void
266 ttl_purge_conn_pool(conn_pool *pool) {
267   int old_cnt, new_cnt;
268   time_t now = time(NULL);
269   conn_q *cq, *prev = NULL, *iter;
270   /* because we always replace on the head and update the last_use time when
271      doing so, we know they are ordered LRU on the end.  So, once we hit an
272      old one, we know all the others are old too.
273    */
274   if(!pool->head) return; /* hack short circuit for no locks */
275   pthread_mutex_lock(&pool->lock);
276   old_cnt = pool->in_pool;
277   cq = pool->head;
278   while(cq) {
279     if(cq->last_use + cq->pool->ttl < now) {
280       if(prev) prev->next = NULL;
281       else pool->head = NULL;
282       break;
283     }
284     prev = cq;
285     cq = cq->next;
286   }
287   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
288   /* Fix accounting */
289   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
290   new_cnt = pool->in_pool;
291   pthread_mutex_unlock(&pool->lock);
292
293   /* Force release these without holding the lock */
294   while(cq) {
295     prev = cq;
296     cq = cq->next;
297     release_conn_q_forceable(cq, 1);
298   }
299   if(old_cnt != new_cnt)
300     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
301           pool->queue_name);
302 }
303 static void
304 release_conn_q(conn_q *cq) {
305   ttl_purge_conn_pool(cq->pool);
306   release_conn_q_forceable(cq, 0);
307 }
308 static conn_pool *
309 get_conn_pool_for_remote(const char *remote_str,
310                          const char *remote_cn, const char *fqdn) {
311   void *vcpool;
312   conn_pool *cpool = NULL;
313   char queue_name[256] = "datastore_";
314   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
315            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
316            fqdn ? fqdn : "default",
317            remote_cn ? remote_cn : "default");
318   pthread_mutex_lock(&ds_conns_lock);
319   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
320                         strlen(queue_name), &vcpool))
321     cpool = vcpool;
322   pthread_mutex_unlock(&ds_conns_lock);
323   if(!cpool) {
324     vcpool = cpool = calloc(1, sizeof(*cpool));
325     cpool->queue_name = strdup(queue_name);
326     pthread_mutex_init(&cpool->lock, NULL);
327     pthread_cond_init(&cpool->cv, NULL);
328     cpool->in_pool = 0;
329     cpool->outstanding = 0;
330     cpool->max_in_pool = 1;
331     cpool->max_allocated = 1;
332     pthread_mutex_lock(&ds_conns_lock);
333     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
334                         cpool)) {
335       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
336                          strlen(queue_name), &vcpool);
337     }
338     pthread_mutex_unlock(&ds_conns_lock);
339     if(vcpool != cpool) {
340       /* someone beat us to it */
341       free(cpool->queue_name);
342       pthread_mutex_destroy(&cpool->lock);
343       pthread_cond_destroy(&cpool->cv);
344       free(cpool);
345     }
346     else {
347       int i;
348       /* Our job to setup the pool */
349       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
350       eventer_jobq_init(cpool->jobq, queue_name);
351       cpool->jobq->backq = eventer_default_backq();
352       /* Add one thread */
353       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
354         eventer_jobq_increase_concurrency(cpool->jobq);
355     }
356     cpool = vcpool;
357   }
358   return cpool;
359 }
360 static conn_q *
361 get_conn_q_for_remote(const char *remote_str,
362                       const char *remote_cn, const char *fqdn,
363                       const char *dsn) {
364   conn_pool *cpool;
365   conn_q *cq;
366   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
367   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
368         cpool->queue_name);
369   pthread_mutex_lock(&cpool->lock);
370  again:
371   if(cpool->head) {
372     assert(cpool->in_pool > 0);
373     cq = cpool->head;
374     cpool->head = cq->next;
375     cpool->in_pool--;
376     cpool->outstanding++;
377     cq->next = NULL;
378     pthread_mutex_unlock(&cpool->lock);
379     return cq;
380   }
381   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
382     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
383           (void *)pthread_self(), cpool->queue_name);
384     pthread_cond_wait(&cpool->cv, &cpool->lock);
385     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
386           (void *)pthread_self(), cpool->queue_name);
387     goto again;
388   }
389   else {
390     cpool->outstanding++;
391     pthread_mutex_unlock(&cpool->lock);
392   }
393  
394   cq = calloc(1, sizeof(*cq));
395   cq->pool = cpool;
396   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
397   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
398   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
399   cq->dsn = dsn ? strdup(dsn) : NULL;
400   return cq;
401 }
402 static conn_q *
403 get_conn_q_for_metanode() {
404   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
405 }
406
407 typedef enum {
408   DS_EXEC_SUCCESS = 0,
409   DS_EXEC_ROW_FAILED = 1,
410   DS_EXEC_TXN_FAILED = 2,
411 } execute_outcome_t;
412
413 #define DECLARE_PARAM_STR(str, len) do { \
414   d->paramValues[d->nparams] = noit__strndup(str, len); \
415   d->paramLengths[d->nparams] = len; \
416   d->paramFormats[d->nparams] = 0; \
417   d->paramAllocd[d->nparams] = 1; \
418   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
419     free(d->paramValues[d->nparams]); \
420     d->paramValues[d->nparams] = NULL; \
421     d->paramLengths[d->nparams] = 0; \
422     d->paramAllocd[d->nparams] = 0; \
423   } \
424   d->nparams++; \
425 } while(0)
426 #define DECLARE_PARAM_INT(i) do { \
427   int buffer__len; \
428   char buffer__[32]; \
429   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
430   buffer__len = strlen(buffer__); \
431   DECLARE_PARAM_STR(buffer__, buffer__len); \
432 } while(0)
433
434 #define PG_GET_STR_COL(dest, row, name) do { \
435   int colnum = PQfnumber(d->res, name); \
436   dest = NULL; \
437   if (colnum >= 0) \
438     dest = PQgetisnull(d->res, row, colnum) \
439          ? NULL : PQgetvalue(d->res, row, colnum); \
440 } while(0)
441
442 #define PG_EXEC(cmd) do { \
443   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
444                         (const char * const *)d->paramValues, \
445                         d->paramLengths, d->paramFormats, 0); \
446   d->rv = PQresultStatus(d->res); \
447   if(d->rv != PGRES_COMMAND_OK && \
448      d->rv != PGRES_TUPLES_OK) { \
449     const char *pgerr = PQresultErrorMessage(d->res); \
450     const char *pgerr_end = strchr(pgerr, '\n'); \
451     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
452     noitL(ds_err, "[%s] stratcon datasource bad (%d): %.*s\n", \
453           cq->fqdn ? cq->fqdn : "metanode", d->rv, \
454           (int)(pgerr_end - pgerr), pgerr); \
455     PQclear(d->res); \
456     goto bad_row; \
457   } \
458 } while(0)
459
460 #define PG_TM_EXEC(cmd, whence) do { \
461   time_t __w = whence; \
462   char cmdbuf[4096]; \
463   struct tm tbuf, *tm; \
464   tm = gmtime_r(&__w, &tbuf); \
465   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
466   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
467                         (const char * const *)d->paramValues, \
468                         d->paramLengths, d->paramFormats, 0); \
469   d->rv = PQresultStatus(d->res); \
470   if(d->rv != PGRES_COMMAND_OK && \
471      d->rv != PGRES_TUPLES_OK) { \
472     const char *pgerr = PQresultErrorMessage(d->res); \
473     const char *pgerr_end = strchr(pgerr, '\n'); \
474     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
475     noitL(ds_err, "stratcon datasource bad (%d): %.*s time: %llu\n", \
476           d->rv, (int)(pgerr_end - pgerr), pgerr, \
477           (long long unsigned)whence); \
478     PQclear(d->res); \
479     goto bad_row; \
480   } \
481 } while(0)
482
483 static void *
484 stratcon_datastore_check_loadall(void *vsn) {
485   storagenode_info *sn = vsn;
486   ds_single_detail *d;
487   int i, row_count = 0, good = 0;
488   char buff[1024];
489   conn_q *cq = NULL;
490
491   d = calloc(1, sizeof(*d));
492   GET_QUERY(check_loadall);
493   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
494   i = 0;
495   while(stratcon_database_connect(cq)) {
496     if(i++ > 4) {
497       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
498       release_conn_q(cq);
499       return (void *)(vpsized_int)good;
500     }
501     sleep(1);
502   }
503   PG_EXEC(check_loadall);
504   row_count = PQntuples(d->res);
505  
506   for(i=0; i<row_count; i++) {
507     int rv;
508     int8_t family;
509     struct sockaddr *sin;
510     struct sockaddr_in sin4 = { .sin_family = AF_INET };
511     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
512     char *remote, *id, *target, *module, *name;
513     PG_GET_STR_COL(remote, i, "remote_address");
514     PG_GET_STR_COL(id, i, "id");
515     PG_GET_STR_COL(target, i, "target");
516     PG_GET_STR_COL(module, i, "module");
517     PG_GET_STR_COL(name, i, "name");
518     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
519
520     family = AF_INET;
521     sin = (struct sockaddr *)&sin4;
522     rv = inet_pton(family, remote, &sin4.sin_addr);
523     if(rv != 1) {
524       family = AF_INET6;
525       sin = (struct sockaddr *)&sin6;
526       rv = inet_pton(family, remote, &sin6.sin6_addr);
527       if(rv != 1) {
528         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
529         sin = NULL;
530       }
531     }
532
533     /* stratcon_iep_line_processor takes an allocated operand and frees it */
534     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
535     good++;
536   }
537   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
538         good, row_count, sn->fqdn);
539  bad_row:
540   free_params((ds_single_detail *)d);
541   free(d);
542   if(cq) release_conn_q(cq);
543   return (void *)(vpsized_int)good;
544 }
545 static int
546 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
547                                     struct timeval *now) {
548   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
549   pthread_t *jobs = NULL;
550   int nodes, i = 0, tcnt = 0;
551   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
552   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
553
554   pthread_mutex_lock(&storagenode_to_info_cache_lock);
555   nodes = storagenode_to_info_cache.size;
556   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
557   sns = calloc(MAX(1,nodes), sizeof(*sns));
558   if(nodes == 0) sns[nodes++] = &self;
559   else {
560     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
561     const char *k;
562     void *v;
563     int klen;
564     while(noit_hash_next(&storagenode_to_info_cache,
565                          &iter, &k, &klen, &v)) {
566       sns[i++] = (storagenode_info *)v;
567     }
568   }
569   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
570
571   for(i=0; i<nodes; i++) {
572     if(pthread_create(&jobs[i], NULL,
573                       stratcon_datastore_check_loadall, sns[i]) != 0) {
574       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
575     }
576   }
577   for(i=0; i<nodes; i++) {
578     void *good;
579     pthread_join(jobs[i], &good);
580     tcnt += (int)(vpsized_int)good;
581   }
582   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
583   return 0;
584 }
585 void
586 stratcon_datastore_iep_check_preload() {
587   eventer_t e;
588   conn_pool *cpool;
589
590   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
591   e = eventer_alloc();
592   e->mask = EVENTER_ASYNCH;
593   e->callback = stratcon_datastore_asynch_drive_iep;
594   e->closure = NULL;
595   eventer_add_asynch(cpool->jobq, e);
596 }
597 execute_outcome_t
598 stratcon_datastore_find(ds_rt_detail *d) {
599   conn_q *cq;
600   char *val;
601   int row_count;
602   struct realtime_tracker *node;
603
604   for(node = d->rt; node; node = node->next) {
605     char uuid_str[UUID_STR_LEN+1];
606     const char *fqdn, *dsn, *remote_cn;
607     int storagenode_id;
608
609     uuid_unparse_lower(node->checkid, uuid_str);
610     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
611                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
612       continue;
613
614     noitL(noit_error, "stratcon_datastore_find <- (%d, %s)\n", node->sid, remote_cn);
615     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
616     stratcon_database_connect(cq);
617
618     GET_QUERY(check_find);
619     DECLARE_PARAM_INT(node->sid);
620     PG_EXEC(check_find);
621     row_count = PQntuples(d->res);
622     if(row_count != 1) {
623       PQclear(d->res);
624       goto bad_row;
625     }
626
627     /* Get the check uuid */
628     PG_GET_STR_COL(val, 0, "id");
629     if(!val) {
630       PQclear(d->res);
631       goto bad_row;
632     }
633     if(uuid_parse(val, node->checkid)) {
634       PQclear(d->res);
635       goto bad_row;
636     }
637  
638     /* Get the remote_address (which noit owns this) */
639     PG_GET_STR_COL(val, 0, "remote_address");
640     if(!val) {
641       PQclear(d->res);
642       goto bad_row;
643     }
644     node->noit = strdup(val);
645  
646    bad_row:
647     free_params((ds_single_detail *)d);
648     d->nparams = 0;
649     release_conn_q(cq);
650   }
651   return DS_EXEC_SUCCESS;
652 }
653 execute_outcome_t
654 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
655                            ds_line_detail *d) {
656   int type, len, sid;
657   char *final_buff;
658   uLong final_len, actual_final_len;
659   char *token;
660   char raddr_blank[1] = "";
661   const char *raddr;
662
663   type = d->data[0];
664   raddr = r ? r : raddr_blank;
665
666   /* Parse the log line, but only if we haven't already */
667   if(!d->nparams) {
668     char *scp, *ecp;
669
670     scp = d->data;
671 #define PROCESS_NEXT_FIELD(t,l) do { \
672   if(!*scp) goto bad_row; \
673   ecp = strchr(scp, '\t'); \
674   if(!ecp) goto bad_row; \
675   token = scp; \
676   len = (ecp-scp); \
677   scp = ecp + 1; \
678 } while(0)
679 #define PROCESS_LAST_FIELD(t,l) do { \
680   if(!*scp) ecp = scp; \
681   else { \
682     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
683     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
684   } \
685   t = scp; \
686   l = (ecp-scp); \
687 } while(0)
688
689     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
690     switch(type) {
691       /* See noit_check_log.c for log description */
692       case 'n':
693         DECLARE_PARAM_STR(raddr, strlen(raddr));
694         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
695         DECLARE_PARAM_STR("noitd",5); /* node_type */
696         PROCESS_NEXT_FIELD(token,len);
697         d->whence = (time_t)strtoul(token, NULL, 10);
698         DECLARE_PARAM_STR(token,len); /* timestamp */
699
700         /* This is the expected uncompressed len */
701         PROCESS_NEXT_FIELD(token,len);
702         final_len = atoi(token);
703         final_buff = malloc(final_len);
704         if(!final_buff) goto bad_row;
705  
706         /* The last token is b64 endoded and compressed.
707          * we need to decode it, declare it and then free it.
708          */
709         PROCESS_LAST_FIELD(token, len);
710         /* We can in-place decode this */
711         len = noit_b64_decode((char *)token, len,
712                               (unsigned char *)token, len);
713         if(len <= 0) {
714           noitL(noit_error, "noitd config base64 decoding error.\n");
715           free(final_buff);
716           goto bad_row;
717         }
718         actual_final_len = final_len;
719         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
720                               (unsigned char *)token, len)) {
721           noitL(noit_error, "noitd config decompression failure.\n");
722           free(final_buff);
723           goto bad_row;
724         }
725         if(final_len != actual_final_len) {
726           noitL(noit_error, "noitd config decompression error.\n");
727           free(final_buff);
728           goto bad_row;
729         }
730         DECLARE_PARAM_STR(final_buff, final_len);
731         free(final_buff);
732         break;
733       case 'C':
734         DECLARE_PARAM_STR(raddr, strlen(raddr));
735         PROCESS_NEXT_FIELD(token,len);
736         DECLARE_PARAM_STR(token,len); /* timestamp */
737         d->whence = (time_t)strtoul(token, NULL, 10);
738         PROCESS_NEXT_FIELD(token, len);
739         sid = uuid_to_sid(token, remote_cn);
740         if(sid == 0) goto bad_row;
741         DECLARE_PARAM_INT(sid); /* sid */
742         DECLARE_PARAM_STR(token,len); /* uuid */
743         PROCESS_NEXT_FIELD(token, len);
744         DECLARE_PARAM_STR(token,len); /* target */
745         PROCESS_NEXT_FIELD(token, len);
746         DECLARE_PARAM_STR(token,len); /* module */
747         PROCESS_LAST_FIELD(token, len);
748         DECLARE_PARAM_STR(token,len); /* name */
749         break;
750       case 'M':
751         PROCESS_NEXT_FIELD(token,len);
752         DECLARE_PARAM_STR(token,len); /* timestamp */
753         d->whence = (time_t)strtoul(token, NULL, 10);
754         PROCESS_NEXT_FIELD(token, len);
755         sid = uuid_to_sid(token, remote_cn);
756         if(sid == 0) goto bad_row;
757         DECLARE_PARAM_INT(sid); /* sid */
758         PROCESS_NEXT_FIELD(token, len);
759         DECLARE_PARAM_STR(token,len); /* name */
760         PROCESS_NEXT_FIELD(token,len);
761         d->metric_type = *token;
762         PROCESS_LAST_FIELD(token,len);
763         DECLARE_PARAM_STR(token,len); /* value */
764         break;
765       case 'S':
766         PROCESS_NEXT_FIELD(token,len);
767         DECLARE_PARAM_STR(token,len); /* timestamp */
768         d->whence = (time_t)strtoul(token, NULL, 10);
769         PROCESS_NEXT_FIELD(token, len);
770         sid = uuid_to_sid(token, remote_cn);
771         if(sid == 0) goto bad_row;
772         DECLARE_PARAM_INT(sid); /* sid */
773         PROCESS_NEXT_FIELD(token, len);
774         DECLARE_PARAM_STR(token,len); /* state */
775         PROCESS_NEXT_FIELD(token, len);
776         DECLARE_PARAM_STR(token,len); /* availability */
777         PROCESS_NEXT_FIELD(token, len);
778         DECLARE_PARAM_STR(token,len); /* duration */
779         PROCESS_LAST_FIELD(token,len);
780         DECLARE_PARAM_STR(token,len); /* status */
781         break;
782       default:
783         goto bad_row;
784     }
785
786   }
787
788   /* Now execute the query */
789   switch(type) {
790     case 'n':
791       GET_QUERY(config_insert);
792       PG_EXEC(config_insert);
793       PQclear(d->res);
794       break;
795     case 'C':
796       GET_QUERY(check_insert);
797       PG_TM_EXEC(check_insert, d->whence);
798       PQclear(d->res);
799       break;
800     case 'S':
801       GET_QUERY(status_insert);
802       PG_TM_EXEC(status_insert, d->whence);
803       PQclear(d->res);
804       break;
805     case 'M':
806       switch(d->metric_type) {
807         case METRIC_INT32:
808         case METRIC_UINT32:
809         case METRIC_INT64:
810         case METRIC_UINT64:
811         case METRIC_DOUBLE:
812           GET_QUERY(metric_insert_numeric);
813           PG_TM_EXEC(metric_insert_numeric, d->whence);
814           PQclear(d->res);
815           break;
816         case METRIC_STRING:
817           GET_QUERY(metric_insert_text);
818           PG_TM_EXEC(metric_insert_text, d->whence);
819           PQclear(d->res);
820           break;
821         default:
822           goto bad_row;
823       }
824       break;
825     default:
826       /* should never get here */
827       goto bad_row;
828   }
829   return DS_EXEC_SUCCESS;
830  bad_row:
831   return DS_EXEC_ROW_FAILED;
832 }
833 static int
834 stratcon_database_post_connect(conn_q *cq) {
835   int rv = 0;
836   ds_single_detail _d = { 0 }, *d = &_d;
837   if(cq->fqdn) {
838     char *remote_str, *remote_cn;
839     /* This is the silly way we get null's in through our declare_param_str */
840     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
841     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
842     /* This is a storage node, it gets the storage node post_connect */
843     GET_QUERY(storage_post_connect);
844     rv = -1; /* now we're serious */
845     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
846     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
847     PG_EXEC(storage_post_connect);
848     PQclear(d->res);
849     rv = 0;
850   }
851   else {
852     /* Metanode post_connect */
853     GET_QUERY(metanode_post_connect);
854     rv = -1; /* now we're serious */
855     PG_EXEC(metanode_post_connect);
856     PQclear(d->res);
857     rv = 0;
858   }
859  bad_row:
860   free_params(d);
861   if(rv == -1) {
862     /* Post-connect intentions are serious and fatal */
863     PQfinish(cq->dbh);
864     cq->dbh = NULL;
865   }
866   return rv;
867 }
868 static int
869 stratcon_database_connect(conn_q *cq) {
870   char *dsn, dsn_meta[512];
871   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
872   const char *k, *v;
873   int klen;
874   noit_hash_table *t;
875
876   dsn_meta[0] = '\0';
877   if(!cq->dsn) {
878     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
879     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
880       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
881       strlcat(dsn_meta, k, sizeof(dsn_meta));
882       strlcat(dsn_meta, "=", sizeof(dsn_meta));
883       strlcat(dsn_meta, v, sizeof(dsn_meta));
884     }
885     noit_hash_destroy(t, free, free);
886     free(t);
887     dsn = dsn_meta;
888   }
889   else {
890     char options[32];
891     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta));
892     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user",
893                                options, sizeof(options))) {
894       strlcat(dsn_meta, " ", sizeof(dsn_meta));
895       strlcat(dsn_meta, "user", sizeof(dsn_meta));
896       strlcat(dsn_meta, "=", sizeof(dsn_meta));
897       strlcat(dsn_meta, options, sizeof(dsn_meta));
898     }
899     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password",
900                                options, sizeof(options))) {
901       strlcat(dsn_meta, " ", sizeof(dsn_meta));
902       strlcat(dsn_meta, "password", sizeof(dsn_meta));
903       strlcat(dsn_meta, "=", sizeof(dsn_meta));
904       strlcat(dsn_meta, options, sizeof(dsn_meta));
905     }
906     dsn = dsn_meta;
907   }
908
909   if(cq->dbh) {
910     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
911     PQreset(cq->dbh);
912     if(PQstatus(cq->dbh) != CONNECTION_OK) {
913       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
914             dsn, PQerrorMessage(cq->dbh));
915       return -1;
916     }
917     if(stratcon_database_post_connect(cq)) return -1;
918     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
919     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
920           dsn, PQerrorMessage(cq->dbh));
921     return -1;
922   }
923
924   cq->dbh = PQconnectdb(dsn);
925   if(!cq->dbh) return -1;
926   if(PQstatus(cq->dbh) != CONNECTION_OK) {
927     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
928           dsn, PQerrorMessage(cq->dbh));
929     return -1;
930   }
931   if(stratcon_database_post_connect(cq)) return -1;
932   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
933   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
934         dsn, PQerrorMessage(cq->dbh));
935   return -1;
936 }
937 static int
938 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
939                                 const char *name) {
940   int rv = -1;
941   PGresult *res;
942   char cmd[128];
943   strlcpy(cmd, p, sizeof(cmd));
944   strlcat(cmd, name, sizeof(cmd));
945   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
946   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
947   PQclear(res);
948   return rv;
949 }
950 static int
951 stratcon_datastore_do(conn_q *cq, const char *cmd) {
952   PGresult *res;
953   int rv = -1;
954   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
955   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
956   PQclear(res);
957   return rv;
958 }
959 #define BUSTED(cq) do { \
960   PQfinish((cq)->dbh); \
961   (cq)->dbh = NULL; \
962   goto full_monty; \
963 } while(0)
964 #define SAVEPOINT(name) do { \
965   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
966   last_sp = current; \
967 } while(0)
968 #define ROLLBACK_TO_SAVEPOINT(name) do { \
969   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
970     BUSTED(cq); \
971   last_sp = NULL; \
972 } while(0)
973 #define RELEASE_SAVEPOINT(name) do { \
974   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
975     BUSTED(cq); \
976   last_sp = NULL; \
977 } while(0)
978 int
979 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
980                                  struct timeval *now) {
981   ds_rt_detail *dsjd = closure;
982   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
983   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
984
985   assert(dsjd->rt);
986   stratcon_datastore_find(dsjd);
987   if(dsjd->completion_event)
988     eventer_add(dsjd->completion_event);
989
990   free_params((ds_single_detail *)dsjd);
991   free(dsjd);
992   return 0;
993 }
994 static const char *
995 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
996   void *vinfo;
997   const char *dsn = NULL, *fqdn = NULL;
998   int found = 0;
999   storagenode_info *info = NULL;
1000   pthread_mutex_lock(&storagenode_to_info_cache_lock);
1001   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
1002                         &vinfo)) {
1003     found = 1;
1004     info = vinfo;
1005   }
1006   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1007   if(found) {
1008     if(fqdn_out) *fqdn_out = info->fqdn;
1009     return info->dsn;
1010   }
1011
1012   if(!found && can_use_db) {
1013     ds_single_detail *d;
1014     conn_q *cq;
1015     int row_count;
1016     /* Look it up and store it */
1017     d = calloc(1, sizeof(*d));
1018     cq = get_conn_q_for_metanode();
1019     GET_QUERY(find_storage);
1020     DECLARE_PARAM_INT(id);
1021     PG_EXEC(find_storage);
1022     row_count = PQntuples(d->res);
1023     if(row_count) {
1024       PG_GET_STR_COL(dsn, 0, "dsn");
1025       PG_GET_STR_COL(fqdn, 0, "fqdn");
1026     }
1027     PQclear(d->res);
1028    bad_row:
1029     free_params(d);
1030     free(d);
1031     release_conn_q(cq);
1032   }
1033   if(fqdn) {
1034     info = calloc(1, sizeof(*info));
1035     info->fqdn = strdup(fqdn);
1036     if(fqdn_out) *fqdn_out = info->fqdn;
1037     info->dsn = dsn ? strdup(dsn) : NULL;
1038     info->storagenode_id = id;
1039     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1040     noit_hash_store(&storagenode_to_info_cache,
1041                     (void *)&info->storagenode_id, sizeof(int), info);
1042     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1043   }
1044   return info ? info->dsn : NULL;
1045 }
1046 static ds_line_detail *
1047 build_insert_batch(interim_journal_t *ij) {
1048   int rv;
1049   off_t len;
1050   const char *buff, *cp, *lcp;
1051   struct stat st;
1052   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1053
1054   if(ij->fd < 0) {
1055     ij->fd = open(ij->filename, O_RDONLY);
1056     if(ij->fd < 0) {
1057       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1058             ij->filename, strerror(errno));
1059       assert(ij->fd >= 0);
1060     }
1061   }
1062   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1063   if(rv == -1) {
1064       noitL(noit_error, "Cannot stat interim journal '%s': %s\n",
1065             ij->filename, strerror(errno));
1066     assert(rv != -1);
1067   }
1068   len = st.st_size;
1069   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1070   if(buff == (void *)-1) {
1071     noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd,
1072           ij->filename, strerror(errno));
1073     assert(buff != (void *)-1);
1074   }
1075   lcp = buff;
1076   while(lcp < (buff + len) &&
1077         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1078     next = calloc(1, sizeof(*next));
1079     next->data = malloc(cp - lcp + 1);
1080     memcpy(next->data, lcp, cp - lcp);
1081     next->data[cp - lcp] = '\0';
1082     if(!head) head = next;
1083     if(last) last->next = next;
1084     last = next;
1085     lcp = cp + 1;
1086   }
1087   munmap((void *)buff, len);
1088   close(ij->fd);
1089   return head;
1090 }
1091 static void
1092 interim_journal_remove(interim_journal_t *ij) {
1093   unlink(ij->filename);
1094   if(ij->filename) free(ij->filename);
1095   if(ij->remote_str) free(ij->remote_str);
1096   if(ij->remote_cn) free(ij->remote_cn);
1097   if(ij->fqdn) free(ij->fqdn);
1098 }
1099 int
1100 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1101                                   struct timeval *now) {
1102   int i, total, success, sp_total, sp_success;
1103   interim_journal_t *ij;
1104   ds_line_detail *head = NULL, *current, *last_sp;
1105   const char *dsn;
1106   conn_q *cq;
1107   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1108   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1109
1110   ij = closure;
1111   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1112   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1113   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1114                              ij->fqdn, dsn);
1115   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1116         ij->remote_str, ij->remote_cn, ij->fqdn);
1117  full_monty:
1118   /* Make sure we have a connection */
1119   i = 1;
1120   while(stratcon_database_connect(cq)) {
1121     noitL(noit_error, "Error connecting to database: %s\n",
1122           ij->fqdn ? ij->fqdn : "(null)");
1123     sleep(i);
1124     i *= 2;
1125     i = MIN(i, 16);
1126   }
1127
1128   if(head == NULL) head = build_insert_batch(ij);
1129   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1130         ij->remote_str ? ij->remote_str : "(null)",
1131         ij->remote_cn ? ij->remote_cn : "(null)",
1132         ij->fqdn ? ij->fqdn : "(null)");
1133   current = head;
1134   last_sp = NULL;
1135   total = success = sp_total = sp_success = 0;
1136   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1137   while(current) {
1138     execute_outcome_t rv;
1139     if(current->data) {
1140       if(!last_sp) {
1141         SAVEPOINT("batch");
1142         sp_success = success;
1143         sp_total = total;
1144       }
1145  
1146       if(current->problematic) {
1147         RELEASE_SAVEPOINT("batch");
1148         current = current->next;
1149         total++;
1150         continue;
1151       }
1152       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1153                                       current);
1154       switch(rv) {
1155         case DS_EXEC_SUCCESS:
1156           total++;
1157           success++;
1158           current = current->next;
1159           break;
1160         case DS_EXEC_ROW_FAILED:
1161           /* rollback to savepoint, mark this record as bad and start again */
1162           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1163           current->problematic = 1;
1164           current = last_sp;
1165           success = sp_success;
1166           total = sp_total;
1167           ROLLBACK_TO_SAVEPOINT("batch");
1168           break;
1169         case DS_EXEC_TXN_FAILED:
1170           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1171           BUSTED(cq);
1172       }
1173     }
1174   }
1175   if(last_sp) RELEASE_SAVEPOINT("batch");
1176   if(stratcon_datastore_do(cq, "COMMIT")) {
1177     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1178     BUSTED(cq);
1179   }
1180   /* Cleanup the mess */
1181   while(head) {
1182     ds_line_detail *tofree;
1183     tofree = head;
1184     head = head->next;
1185     if(tofree->data) free(tofree->data);
1186     free_params((ds_single_detail *)tofree);
1187     free(tofree);
1188   }
1189   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1190         ij->remote_str ? ij->remote_str : "(null)",
1191         ij->remote_cn ? ij->remote_cn : "(null)",
1192         ij->fqdn ? ij->fqdn : "(null)", success, total);
1193   interim_journal_remove(ij);
1194   release_conn_q(cq);
1195   return 0;
1196 }
1197 static int
1198 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1199                                 struct timeval *now) {
1200   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1201   const char *k;
1202   int klen;
1203   void *vij;
1204   interim_journal_t *ij;
1205   syncset_t *syncset = closure;
1206
1207   if((mask & EVENTER_ASYNCH) == EVENTER_ASYNCH) {
1208     eventer_add(syncset->completion);
1209     eventer_trigger(syncset->completion, EVENTER_READ | EVENTER_WRITE);
1210     free(syncset);
1211     return 0;
1212   }
1213   if(!((mask & EVENTER_ASYNCH_WORK) == EVENTER_ASYNCH_WORK)) return 0;
1214
1215   noitL(ds_deb, "Syncing journal sets...\n");
1216   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1217     eventer_t ingest;
1218     ij = vij;
1219     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1220           ij->remote_str, ij->remote_cn, ij->fqdn);
1221     fsync(ij->fd);
1222     close(ij->fd);
1223     ij->fd = -1;
1224     ingest = eventer_alloc();
1225     ingest->mask = EVENTER_ASYNCH;
1226     ingest->callback = stratcon_datastore_asynch_execute;
1227     ingest->closure = ij;
1228     eventer_add_asynch(ij->cpool->jobq, ingest);
1229   }
1230   noit_hash_destroy(syncset->ws, free, NULL);
1231   free(syncset->ws);
1232   return 0;
1233 }
1234 static interim_journal_t *
1235 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1236                     int storagenode_id, const char *fqdn_in) {
1237   void *vhash, *vij;
1238   noit_hash_table *working_set;
1239   interim_journal_t *ij;
1240   struct timeval now;
1241   char jpath[PATH_MAX];
1242   char remote_str[128];
1243   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1244   const char *fqdn = fqdn_in ? fqdn_in : "default";
1245
1246   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1247   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1248
1249   /* Lookup the working set */
1250   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1251     working_set = calloc(1, sizeof(*working_set));
1252     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1253                     working_set);
1254   }
1255   else
1256     working_set = vhash;
1257
1258   /* Lookup the interim journal within the working set */
1259   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1260     ij = calloc(1, sizeof(*ij));
1261     gettimeofday(&now, NULL);
1262     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1263              basejpath, remote_str, remote_cn, storagenode_id,
1264              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1265     ij->remote_str = strdup(remote_str);
1266     ij->remote_cn = strdup(remote_cn);
1267     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1268     ij->storagenode_id = storagenode_id;
1269     ij->filename = strdup(jpath);
1270     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1271                                          ij->fqdn);
1272     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1273     if(ij->fd < 0 && errno == ENOENT) {
1274       if(mkdir_for_file(ij->filename, 0750)) {
1275         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1276               ij->filename, strerror(errno));
1277         exit(-1);
1278       }
1279       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1280     }
1281     if(ij->fd < 0) {
1282       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1283             ij->filename, strerror(errno));
1284       exit(-1);
1285     }
1286     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1287   }
1288   else
1289     ij = vij;
1290
1291   return ij;
1292 }
1293 static int
1294 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1295                           int *sid_out, int *storagenode_id_out,
1296                           const char **remote_cn_out,
1297                           const char **fqdn_out, const char **dsn_out) {
1298   /* only called from the main thread -- no safety issues */
1299   void *vuuidinfo, *vinfo;
1300   uuid_info *uuidinfo;
1301   storagenode_info *info = NULL;
1302   char *fqdn = NULL;
1303   char *dsn = NULL;
1304   char *new_remote_cn = NULL;
1305   int storagenode_id = 0, sid = 0;
1306   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1307                          &vuuidinfo)) {
1308     int row_count = 0;
1309     char *tmpint;
1310     ds_single_detail *d;
1311     conn_q *cq;
1312
1313     /* We can't do a database lookup without the remote_cn */
1314     if(!remote_cn) {
1315       if(stratcon_datastore_get_enabled()) {
1316         /* We have an authoritatively maintained cache, we don't do lookups */
1317         return -1;
1318       }
1319       else
1320         remote_cn = "[[null]]";
1321     }
1322
1323     d = calloc(1, sizeof(*d));
1324     cq = get_conn_q_for_metanode();
1325     if(stratcon_database_connect(cq) == 0) {
1326       /* Blocking call to service the cache miss */
1327       GET_QUERY(check_map);
1328       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1329       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1330       PG_EXEC(check_map);
1331       row_count = PQntuples(d->res);
1332       if(row_count != 1) {
1333         PQclear(d->res);
1334         goto bad_row;
1335       }
1336       PG_GET_STR_COL(tmpint, 0, "sid");
1337       if(!tmpint) {
1338         row_count = 0;
1339         PQclear(d->res);
1340         goto bad_row;
1341       }
1342       sid = atoi(tmpint);
1343       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1344       if(tmpint) storagenode_id = atoi(tmpint);
1345       PG_GET_STR_COL(fqdn, 0, "fqdn");
1346       PG_GET_STR_COL(dsn, 0, "dsn");
1347       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn");
1348       PQclear(d->res);
1349     }
1350    bad_row:
1351     free_params((ds_single_detail *)d);
1352     free(d);
1353     release_conn_q(cq);
1354     if(row_count != 1) {
1355       return -1;
1356     }
1357     /* Place in cache */
1358     if(fqdn) fqdn = strdup(fqdn);
1359     uuidinfo = calloc(1, sizeof(*uuidinfo));
1360     uuidinfo->sid = sid;
1361     uuidinfo->uuid_str = strdup(uuid_str);
1362     uuidinfo->storagenode_id = storagenode_id;
1363     uuidinfo->remote_cn = strdup(new_remote_cn);
1364     noit_hash_store(&uuid_to_info_cache,
1365                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1366     /* Also, we may have just witnessed a new storage node, store it */
1367     if(storagenode_id) {
1368       int needs_free = 0;
1369       info = calloc(1, sizeof(*info));
1370       info->storagenode_id = storagenode_id;
1371       info->dsn = dsn ? strdup(dsn) : NULL;
1372       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1373       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1374       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1375                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1376         /* hack to save memory -- we *never* remove from these caches,
1377            so we can use the same fqdn value in the above cache for the key
1378            in the cache below -- (no strdup) */
1379         noit_hash_store(&storagenode_to_info_cache,
1380                         (void *)&info->storagenode_id, sizeof(int), info);
1381       }
1382       else needs_free = 1;
1383       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1384       if(needs_free) {
1385         if(info->dsn) free(info->dsn);
1386         if(info->fqdn) free(info->fqdn);
1387         free(info);
1388       }
1389     }
1390   }
1391   else
1392     uuidinfo = vuuidinfo;
1393
1394   if(uuidinfo && uuidinfo->storagenode_id) {
1395     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1396       /* we don't have dsn and we actually want it */
1397       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1398       if(noit_hash_retrieve(&storagenode_to_info_cache,
1399                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1400                             &vinfo))
1401         info = vinfo;
1402       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1403     }
1404   }
1405
1406   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1407   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1408   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1409   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1410   if(sid_out) *sid_out = uuidinfo->sid;
1411   return 0;
1412 }
1413 static int
1414 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1415   char uuid_str[UUID_STR_LEN+1];
1416   int sid = 0;
1417   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1418   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1419   return sid;
1420 }
1421 static void
1422 stratcon_datastore_journal(struct sockaddr *remote,
1423                            const char *remote_cn, const char *line) {
1424   interim_journal_t *ij = NULL;
1425   char uuid_str[UUID_STR_LEN+1], *cp;
1426   const char *fqdn = NULL, *dsn = NULL;
1427   int storagenode_id = 0;
1428   uuid_t checkid;
1429   if(!line) return;
1430   /* if it is a UUID based thing, find the storage node */
1431   switch(*line) {
1432     case 'C':
1433     case 'S':
1434     case 'M':
1435       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1436         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1437         if(!uuid_parse(uuid_str, checkid)) {
1438           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1439                                     &storagenode_id, NULL, &fqdn, &dsn);
1440           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1441         }
1442       }
1443       break;
1444     case 'n':
1445       ij = interim_journal_get(remote,remote_cn,0,NULL);
1446       break;
1447     default:
1448       break;
1449   }
1450   if(!ij) {
1451     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1452   }
1453   else {
1454     int len;
1455     len = write(ij->fd, line, strlen(line));
1456     if(len < 0) {
1457       noitL(noit_error, "write to %s failed: %s\n",
1458             ij->filename, strerror(errno));
1459     }
1460   }
1461   return;
1462 }
1463 static noit_hash_table *
1464 stratcon_datastore_journal_remove(struct sockaddr *remote,
1465                                   const char *remote_cn) {
1466   void *vhash = NULL;
1467   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1468     /* pluck it out */
1469     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1470   }
1471   else {
1472     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1473           remote_cn);
1474     abort();
1475   }
1476   return vhash;
1477 }
1478 void
1479 stratcon_datastore_push(stratcon_datastore_op_t op,
1480                         struct sockaddr *remote,
1481                         const char *remote_cn, void *operand,
1482                         eventer_t completion) {
1483   conn_pool *cpool;
1484   syncset_t *syncset;
1485   eventer_t e;
1486   ds_rt_detail *rtdetail;
1487   struct datastore_onlooker_list *nnode;
1488
1489   for(nnode = onlookers; nnode; nnode = nnode->next)
1490     nnode->dispatch(op,remote,remote_cn,operand);
1491
1492   switch(op) {
1493     case DS_OP_INSERT:
1494       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1495       break;
1496     case DS_OP_CHKPT:
1497       e = eventer_alloc();
1498       syncset = calloc(1, sizeof(*syncset));
1499       e->mask = EVENTER_ASYNCH;
1500       e->callback = stratcon_datastore_journal_sync;
1501       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1502       syncset->completion = completion;
1503       e->closure = syncset;
1504       eventer_add(e);
1505       break;
1506     case DS_OP_FIND_COMPLETE:
1507       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1508       rtdetail = calloc(1, sizeof(*rtdetail));
1509       rtdetail->rt = operand;
1510       rtdetail->completion_event = completion;
1511       e = eventer_alloc();
1512       e->mask = EVENTER_ASYNCH;
1513       e->callback = stratcon_datastore_asynch_lookup;
1514       e->closure = rtdetail;
1515       eventer_add_asynch(cpool->jobq, e);
1516       break;
1517   }
1518 }
1519
1520 int
1521 stratcon_datastore_saveconfig(void *unused) {
1522   int rv = -1;
1523   char *buff;
1524   ds_single_detail _d = { 0 }, *d = &_d;
1525   conn_q *cq;
1526   char ipv4_str[32];
1527   struct in_addr r, l;
1528
1529   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1);
1530   memset(&l, 0, sizeof(l));
1531   noit_getip_ipv4(r, &l);
1532   /* Ignore the error.. what are we going to do anyway */
1533   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL)
1534     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str));
1535
1536   cq = get_conn_q_for_metanode();
1537
1538   if(stratcon_database_connect(cq) == 0) {
1539     char time_as_str[20];
1540     size_t len;
1541     buff = noit_conf_xml_in_mem(&len);
1542     if(!buff) goto bad_row;
1543
1544     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1545     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str));
1546     DECLARE_PARAM_STR("", 0);
1547     DECLARE_PARAM_STR("stratcond", 9);
1548     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1549     DECLARE_PARAM_STR(buff, len);
1550     free(buff);
1551
1552     GET_QUERY(config_insert);
1553     PG_EXEC(config_insert);
1554     PQclear(d->res);
1555     rv = 0;
1556
1557     bad_row:
1558       free_params(d);
1559   }
1560   release_conn_q(cq);
1561   return rv;
1562 }
1563
1564 void
1565 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1566                                                struct sockaddr *,
1567                                                const char *, void *)) {
1568   struct datastore_onlooker_list *nnode;
1569   nnode = calloc(1, sizeof(*nnode));
1570   nnode->dispatch = f;
1571   nnode->next = onlookers;
1572   while(noit_atomic_casptr((volatile void **)&onlookers,
1573                            nnode, nnode->next) != (void *)nnode->next)
1574     nnode->next = onlookers;
1575 }
1576 static void
1577 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1578                                          char *id_str, char *file) {
1579   char path[PATH_MAX];
1580   interim_journal_t *ij;
1581   eventer_t ingest;
1582
1583   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1584            basejpath, remote_str, remote_cn, id_str, file);
1585   ij = calloc(1, sizeof(*ij));
1586   ij->fd = open(path, O_RDONLY);
1587   if(ij->fd < 0) {
1588     noitL(noit_error, "cannot open journal '%s': %s\n",
1589           path, strerror(errno));
1590     free(ij);
1591     return;
1592   }
1593   close(ij->fd);
1594   ij->fd = -1;
1595   ij->filename = strdup(path);
1596   ij->remote_str = strdup(remote_str);
1597   ij->remote_cn = strdup(remote_cn);
1598   ij->storagenode_id = atoi(id_str);
1599   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1600                                        ij->fqdn);
1601   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1602   ingest = eventer_alloc();
1603   ingest->mask = EVENTER_ASYNCH;
1604   ingest->callback = stratcon_datastore_asynch_execute;
1605   ingest->closure = ij;
1606   eventer_add_asynch(ij->cpool->jobq, ingest);
1607 }
1608 static void
1609 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1610   char path[PATH_MAX];
1611   DIR *root;
1612   struct dirent *de, *entry;
1613   int i = 0, cnt = 0;
1614   char **entries;
1615   int size = 0;
1616
1617   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1618            first ? "/" : "", first ? first : "",
1619            second ? "/" : "", second ? second : "",
1620            third ? "/" : "", third ? third : "");
1621 #ifdef _PC_NAME_MAX
1622   size = pathconf(path, _PC_NAME_MAX);
1623 #endif
1624   size = MIN(size, PATH_MAX + 128);
1625   de = alloca(size);
1626   root = opendir(path);
1627   if(!root) return;
1628   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) cnt++;
1629   closedir(root);
1630   root = opendir(path);
1631   if(!root) return;
1632   entries = malloc(sizeof(*entries) * cnt);
1633   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) {
1634     if(i < cnt) {
1635       entries[i++] = strdup(entry->d_name);
1636     }
1637   }
1638   closedir(root);
1639   cnt = i; /* could have changed, directories are fickle */
1640   qsort(entries, i, sizeof(*entries),
1641         (int (*)(const void *, const void *))strcasecmp);
1642   for(i=0; i<cnt; i++) {
1643     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1644     noitL(ds_deb, "Processing L%d entry '%s'\n",
1645           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1646     if(!first)
1647       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1648     else if(!second)
1649       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1650     else if(!third)
1651       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1652     else if(strlen(entries[i]) == 16)
1653       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1654   }
1655 }
1656 static void
1657 stratcon_datastore_sweep_journals() {
1658   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1659 }
1660
1661 int
1662 stratcon_datastore_ingest_all_storagenode_info() {
1663   int i, cnt = 0;
1664   ds_single_detail _d = { 0 }, *d = &_d;
1665   conn_q *cq;
1666   cq = get_conn_q_for_metanode();
1667
1668   while(stratcon_database_connect(cq)) {
1669     noitL(noit_error, "Error connecting to database\n");
1670     sleep(1);
1671   }
1672
1673   GET_QUERY(all_storage);
1674   PG_EXEC(all_storage);
1675   cnt = PQntuples(d->res);
1676   for(i=0; i<cnt; i++) {
1677     void *vinfo;
1678     char *tmpint, *fqdn, *dsn;
1679     int storagenode_id;
1680     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1681     storagenode_id = atoi(tmpint);
1682     PG_GET_STR_COL(fqdn, i, "fqdn");
1683     PG_GET_STR_COL(dsn, i, "dsn");
1684     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1685     storagenode_id = tmpint ? atoi(tmpint) : 0;
1686
1687     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1688                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1689       storagenode_info *info;
1690       info = calloc(1, sizeof(*info));
1691       info->storagenode_id = storagenode_id;
1692       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1693       info->dsn = dsn ? strdup(dsn) : NULL;
1694       noit_hash_store(&storagenode_to_info_cache,
1695                       (void *)&info->storagenode_id, sizeof(int), info);
1696       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1697             info->storagenode_id,
1698             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1699     }
1700   }
1701   PQclear(d->res);
1702  bad_row:
1703   free_params(d);
1704
1705   release_conn_q(cq);
1706   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1707   return cnt;
1708 }
1709 int
1710 stratcon_datastore_ingest_all_check_info() {
1711   int i, cnt, loaded = 0;
1712   ds_single_detail _d = { 0 }, *d = &_d;
1713   conn_q *cq;
1714   cq = get_conn_q_for_metanode();
1715
1716   while(stratcon_database_connect(cq)) {
1717     noitL(noit_error, "Error connecting to database\n");
1718     sleep(1);
1719   }
1720
1721   GET_QUERY(check_mapall);
1722   PG_EXEC(check_mapall);
1723   cnt = PQntuples(d->res);
1724   for(i=0; i<cnt; i++) {
1725     void *vinfo;
1726     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1727     int sid, storagenode_id;
1728     uuid_info *uuidinfo;
1729     PG_GET_STR_COL(uuid_str, i, "id");
1730     if(!uuid_str) continue;
1731     PG_GET_STR_COL(tmpint, i, "sid");
1732     if(!tmpint) continue;
1733     sid = atoi(tmpint);
1734     PG_GET_STR_COL(fqdn, i, "fqdn");
1735     PG_GET_STR_COL(dsn, i, "dsn");
1736     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1737     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1738     storagenode_id = tmpint ? atoi(tmpint) : 0;
1739
1740     uuidinfo = calloc(1, sizeof(*uuidinfo));
1741     uuidinfo->uuid_str = strdup(uuid_str);
1742     uuidinfo->remote_cn = strdup(remote_cn);
1743     uuidinfo->storagenode_id = storagenode_id;
1744     uuidinfo->sid = sid;
1745     noit_hash_store(&uuid_to_info_cache,
1746                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1747     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1748           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1749     loaded++;
1750     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1751                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1752       storagenode_info *info;
1753       info = calloc(1, sizeof(*info));
1754       info->storagenode_id = storagenode_id;
1755       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1756       info->dsn = dsn ? strdup(dsn) : NULL;
1757       noit_hash_store(&storagenode_to_info_cache,
1758                       (void *)&info->storagenode_id, sizeof(int), info);
1759       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1760             info->storagenode_id,
1761             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1762     }
1763   }
1764   PQclear(d->res);
1765  bad_row:
1766   free_params(d);
1767
1768   release_conn_q(cq);
1769   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1770   return loaded;
1771 }
1772
1773 static int
1774 rest_get_noit_config(noit_http_rest_closure_t *restc,
1775                      int npats, char **pats) {
1776   noit_http_session_ctx *ctx = restc->http_ctx;
1777   ds_single_detail *d;
1778   int row_count = 0;
1779   const char *xml = NULL;
1780   conn_q *cq = NULL;
1781
1782   if(npats != 0) {
1783     noit_http_response_server_error(ctx, "text/xml");
1784     noit_http_response_end(ctx);
1785     return 0;
1786   }
1787   d = calloc(1, sizeof(*d));
1788   GET_QUERY(config_get);
1789   cq = get_conn_q_for_metanode();
1790   if(!cq) {
1791     noit_http_response_server_error(ctx, "text/xml");
1792     goto bad_row;
1793   }
1794
1795   DECLARE_PARAM_STR(restc->remote_cn,
1796                     restc->remote_cn ? strlen(restc->remote_cn) : 0);
1797   PG_EXEC(config_get);
1798   row_count = PQntuples(d->res);
1799   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config");
1800
1801   if(xml == NULL) {
1802     char buff[1024];
1803     snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>"
1804                                  "<row_count>%d</row_count></error>\n",
1805              restc->remote_cn, row_count);
1806     noit_http_response_append(ctx, buff, strlen(buff));
1807     noit_http_response_not_found(ctx, "text/xml");
1808   }
1809   else {
1810     noit_http_response_append(ctx, xml, strlen(xml));
1811     noit_http_response_ok(ctx, "text/xml");
1812   }
1813  bad_row:
1814   free_params((ds_single_detail *)d);
1815   d->nparams = 0;
1816   if(cq) release_conn_q(cq);
1817
1818   noit_http_response_end(ctx);
1819   return 0;
1820 }
1821
1822 void
1823 stratcon_datastore_init() {
1824   pthread_mutex_init(&ds_conns_lock, NULL);
1825   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1826   ds_err = noit_log_stream_find("error/datastore");
1827   ds_deb = noit_log_stream_find("debug/datastore");
1828   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1829   ingest_err = noit_log_stream_find("error/ingest");
1830   if(!ds_err) ds_err = noit_error;
1831   if(!ingest_err) ingest_err = noit_error;
1832   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1833                            &basejpath)) {
1834     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1835     exit(-1);
1836   }
1837   stratcon_datastore_ingest_all_check_info();
1838   stratcon_datastore_ingest_all_storagenode_info();
1839   stratcon_datastore_sweep_journals();
1840
1841   assert(noit_http_rest_register_auth(
1842     "GET", "/noits/", "^config$", rest_get_noit_config,
1843              noit_http_rest_client_cert_auth
1844   ) == 0);
1845 }
1846
Note: See TracBrowser for help on using the browser.