root/src/stratcon_datastore.c

Revision cc568bba655ac2820520a6c1131e25839dfe0379, 47.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

name this sensibly

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_realtime_http.h"
41 #include "stratcon_iep.h"
42 #include "noit_conf.h"
43 #include "noit_check.h"
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <netinet/in.h>
47 #include <sys/un.h>
48 #include <dirent.h>
49 #include <arpa/inet.h>
50 #include <sys/mman.h>
51 #include <libpq-fe.h>
52 #include <zlib.h>
53 #include <assert.h>
54 #include <errno.h>
55
56 #define DECL_STMT(codename,confname) \
57 static char *codename = NULL; \
58 static const char *codename##_conf = "/stratcon/database/statements/" #confname
59
60 DECL_STMT(storage_post_connect, storagepostconnect);
61 DECL_STMT(metanode_post_connect, metanodepostconnect);
62 DECL_STMT(find_storage, findstoragenode);
63 DECL_STMT(all_storage, allstoragenodes);
64 DECL_STMT(check_map, mapchecktostoragenode);
65 DECL_STMT(check_mapall, mapallchecks);
66 DECL_STMT(check_loadall, allchecks);
67 DECL_STMT(check_find, findcheck);
68 DECL_STMT(check_insert, check);
69 DECL_STMT(status_insert, status);
70 DECL_STMT(metric_insert_numeric, metric_numeric);
71 DECL_STMT(metric_insert_text, metric_text);
72 DECL_STMT(config_insert, config);
73
74 static noit_log_stream_t ds_err = NULL;
75 static noit_log_stream_t ingest_err = NULL;
76
77 static struct datastore_onlooker_list {
78   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
79                    const char *, void *);
80   struct datastore_onlooker_list *next;
81 } *onlookers = NULL;
82
83 #define GET_QUERY(a) do { \
84   if(a == NULL) \
85     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
86       goto bad_row; \
87 } while(0)
88
89 struct conn_q;
90
91 typedef struct {
92   char            *queue_name; /* the key fqdn+remote_sn */
93   eventer_jobq_t  *jobq;
94   struct conn_q   *head;
95   pthread_mutex_t  lock;
96   pthread_cond_t   cv;
97   int              ttl;
98   int              in_pool;
99   int              outstanding;
100   int              max_allocated;
101   int              max_in_pool;
102 } conn_pool;
103 typedef struct conn_q {
104   time_t           last_use;
105   char            *dsn;        /* Pg connect string */
106   char            *remote_str; /* the IP of the noit*/
107   char            *remote_cn;  /* the Cert CN of the noit */
108   char            *fqdn;       /* the fqdn of the storage node */
109   conn_pool       *pool;
110   struct conn_q   *next;
111   /* Postgres specific stuff */
112   PGconn          *dbh;
113 } conn_q;
114
115
116 #define MAX_PARAMS 8
117 #define POSTGRES_PARTS \
118   PGresult *res; \
119   int rv; \
120   time_t whence; \
121   int nparams; \
122   int metric_type; \
123   char *paramValues[MAX_PARAMS]; \
124   int paramLengths[MAX_PARAMS]; \
125   int paramFormats[MAX_PARAMS]; \
126   int paramAllocd[MAX_PARAMS];
127
128 typedef struct ds_single_detail {
129   POSTGRES_PARTS
130 } ds_single_detail;
131 typedef struct {
132   /* Postgres specific stuff */
133   POSTGRES_PARTS
134   struct realtime_tracker *rt;
135   conn_q *cq; /* connection on which to perform this job */
136   eventer_t completion_event; /* This event should be registered if non NULL */
137 } ds_rt_detail;
138 typedef struct ds_line_detail {
139   /* Postgres specific stuff */
140   POSTGRES_PARTS
141   char *data;
142   int problematic;
143   struct ds_line_detail *next;
144 } ds_line_detail;
145
146 typedef struct {
147   noit_hash_table *ws;
148   eventer_t completion;
149 } syncset_t;
150
151 typedef struct {
152   char *remote_str;
153   char *remote_cn;
154   char *fqdn;
155   int storagenode_id;
156   int fd;
157   char *filename;
158   conn_pool *cpool;
159 } interim_journal_t;
160
161 static int stratcon_database_connect(conn_q *cq);
162
163 static void
164 free_params(ds_single_detail *d) {
165   int i;
166   for(i=0; i<d->nparams; i++)
167     if(d->paramAllocd[i] && d->paramValues[i])
168       free(d->paramValues[i]);
169 }
170
171 char *basejpath = NULL;
172 pthread_mutex_t ds_conns_lock;
173 noit_hash_table ds_conns;
174 noit_hash_table working_sets;
175
176 /* the fqdn cache needs to be thread safe */
177 typedef struct {
178   char *uuid_str;
179   int storagenode_id;
180   int sid;
181 } uuid_info;
182 typedef struct {
183   int storagenode_id;
184   char *fqdn;
185   char *dsn;
186 } storagenode_info;
187 noit_hash_table uuid_to_info_cache;
188 pthread_mutex_t storagenode_to_info_cache_lock;
189 noit_hash_table storagenode_to_info_cache;
190
191 int
192 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
193   char name[128] = "";
194   buff[0] = '\0';
195   if(remote) {
196     int len = 0;
197     switch(remote->sa_family) {
198       case AF_INET:
199         len = sizeof(struct sockaddr_in);
200         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
201                   name, len);
202         break;
203       case AF_INET6:
204        len = sizeof(struct sockaddr_in6);
205         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
206                   name, len);
207        break;
208       case AF_UNIX:
209         len = SUN_LEN(((struct sockaddr_un *)remote));
210         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
211         break;
212       default: return 0;
213     }
214   }
215   strlcpy(buff, name, blen);
216   return strlen(buff);
217 }
218
219 /* Thread-safe connection pools */
220
221 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
222 static void
223 release_conn_q_forceable(conn_q *cq, int forcefree) {
224   int putback = 0;
225   cq->last_use = time(NULL);
226   pthread_mutex_lock(&cq->pool->lock);
227   cq->pool->outstanding--;
228   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
229     putback = 1;
230     cq->next = cq->pool->head;
231     cq->pool->head = cq;
232     cq->pool->in_pool++;
233   }
234   pthread_mutex_unlock(&cq->pool->lock);
235   noitL(noit_debug, "[%p] release %s [%s]\n", (void *)pthread_self(),
236         putback ? "to pool" : "and destroy", cq->pool->queue_name);
237   pthread_cond_signal(&cq->pool->cv);
238   if(putback) return;
239
240   /* Not put back, release it */
241   if(cq->dbh) PQfinish(cq->dbh);
242   if(cq->remote_str) free(cq->remote_str);
243   if(cq->remote_cn) free(cq->remote_cn);
244   if(cq->fqdn) free(cq->fqdn);
245   if(cq->dsn) free(cq->dsn);
246   free(cq);
247 }
248 static void
249 ttl_purge_conn_pool(conn_pool *pool) {
250   int old_cnt, new_cnt;
251   time_t now = time(NULL);
252   conn_q *cq, *prev = NULL, *iter;
253   /* because we always replace on the head and update the last_use time when
254      doing so, we know they are ordered LRU on the end.  So, once we hit an
255      old one, we know all the others are old too.
256    */
257   if(!pool->head) return; /* hack short circuit for no locks */
258   pthread_mutex_lock(&pool->lock);
259   old_cnt = pool->in_pool;
260   cq = pool->head;
261   while(cq) {
262     if(cq->last_use + cq->pool->ttl < now) {
263       if(prev) prev->next = NULL;
264       else pool->head = NULL;
265       break;
266     }
267     prev = cq;
268     cq = cq->next;
269   }
270   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
271   /* Fix accounting */
272   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
273   new_cnt = pool->in_pool;
274   pthread_mutex_unlock(&pool->lock);
275
276   /* Force release these without holding the lock */
277   while(cq) {
278     prev = cq;
279     cq = cq->next;
280     release_conn_q_forceable(cq, 1);
281   }
282   if(old_cnt != new_cnt)
283     noitL(noit_debug, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
284           pool->queue_name);
285 }
286 static void
287 release_conn_q(conn_q *cq) {
288   ttl_purge_conn_pool(cq->pool);
289   release_conn_q_forceable(cq, 0);
290 }
291 static conn_pool *
292 get_conn_pool_for_remote(const char *remote_str,
293                          const char *remote_cn, const char *fqdn) {
294   void *vcpool;
295   conn_pool *cpool = NULL;
296   char queue_name[256] = "datastore_";
297   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
298            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
299            fqdn ? fqdn : "default",
300            remote_cn ? remote_cn : "default");
301   pthread_mutex_lock(&ds_conns_lock);
302   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
303                         strlen(queue_name), &vcpool))
304     cpool = vcpool;
305   pthread_mutex_unlock(&ds_conns_lock);
306   if(!cpool) {
307     vcpool = cpool = calloc(1, sizeof(*cpool));
308     cpool->queue_name = strdup(queue_name);
309     pthread_mutex_init(&cpool->lock, NULL);
310     pthread_cond_init(&cpool->cv, NULL);
311     cpool->in_pool = 0;
312     cpool->outstanding = 0;
313     cpool->max_in_pool = 1;
314     cpool->max_allocated = 1;
315     pthread_mutex_lock(&ds_conns_lock);
316     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
317                         cpool)) {
318       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
319                          strlen(queue_name), &vcpool);
320     }
321     pthread_mutex_unlock(&ds_conns_lock);
322     if(vcpool != cpool) {
323       /* someone beat us to it */
324       free(cpool->queue_name);
325       pthread_mutex_destroy(&cpool->lock);
326       pthread_cond_destroy(&cpool->cv);
327       free(cpool);
328     }
329     else {
330       int i;
331       /* Our job to setup the pool */
332       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
333       eventer_jobq_init(cpool->jobq, queue_name);
334       cpool->jobq->backq = eventer_default_backq();
335       /* Add one thread */
336       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
337         eventer_jobq_increase_concurrency(cpool->jobq);
338     }
339     cpool = vcpool;
340   }
341   return cpool;
342 }
343 static conn_q *
344 get_conn_q_for_remote(const char *remote_str,
345                       const char *remote_cn, const char *fqdn,
346                       const char *dsn) {
347   conn_pool *cpool;
348   conn_q *cq;
349   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
350  again:
351   noitL(noit_debug, "[%p] requesting [%s]\n", (void *)pthread_self(),
352         cpool->queue_name);
353   pthread_mutex_lock(&cpool->lock);
354   if(cpool->head) {
355     assert(cpool->in_pool > 0);
356     cq = cpool->head;
357     cpool->head = cq->next;
358     cpool->in_pool--;
359     cpool->outstanding++;
360     cq->next = NULL;
361     pthread_mutex_unlock(&cpool->lock);
362     return cq;
363   }
364   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
365     noitL(noit_debug, "[%p] over-subscribed, waiting [%s]\n",
366           (void *)pthread_self(), cpool->queue_name);
367     pthread_cond_wait(&cpool->cv, &cpool->lock);
368     goto again;
369   }
370   else {
371     cpool->outstanding++;
372     pthread_mutex_unlock(&cpool->lock);
373   }
374  
375   cq = calloc(1, sizeof(*cq));
376   cq->pool = cpool;
377   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
378   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
379   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
380   cq->dsn = dsn ? strdup(dsn) : NULL;
381   return cq;
382 }
383 static conn_q *
384 get_conn_q_for_metanode() {
385   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
386 }
387
388 typedef enum {
389   DS_EXEC_SUCCESS = 0,
390   DS_EXEC_ROW_FAILED = 1,
391   DS_EXEC_TXN_FAILED = 2,
392 } execute_outcome_t;
393
394 static char *
395 __noit__strndup(const char *src, int len) {
396   int slen;
397   char *dst;
398   for(slen = 0; slen < len; slen++)
399     if(src[slen] == '\0') break;
400   dst = malloc(slen + 1);
401   memcpy(dst, src, slen);
402   dst[slen] = '\0';
403   return dst;
404 }
405 #define DECLARE_PARAM_STR(str, len) do { \
406   d->paramValues[d->nparams] = __noit__strndup(str, len); \
407   d->paramLengths[d->nparams] = len; \
408   d->paramFormats[d->nparams] = 0; \
409   d->paramAllocd[d->nparams] = 1; \
410   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
411     free(d->paramValues[d->nparams]); \
412     d->paramValues[d->nparams] = NULL; \
413     d->paramLengths[d->nparams] = 0; \
414     d->paramAllocd[d->nparams] = 0; \
415   } \
416   d->nparams++; \
417 } while(0)
418 #define DECLARE_PARAM_INT(i) do { \
419   int buffer__len; \
420   char buffer__[32]; \
421   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
422   buffer__len = strlen(buffer__); \
423   DECLARE_PARAM_STR(buffer__, buffer__len); \
424 } while(0)
425
426 #define PG_GET_STR_COL(dest, row, name) do { \
427   int colnum = PQfnumber(d->res, name); \
428   dest = NULL; \
429   if (colnum >= 0) \
430     dest = PQgetisnull(d->res, row, colnum) \
431          ? NULL : PQgetvalue(d->res, row, colnum); \
432 } while(0)
433
434 #define PG_EXEC(cmd) do { \
435   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
436                         (const char * const *)d->paramValues, \
437                         d->paramLengths, d->paramFormats, 0); \
438   d->rv = PQresultStatus(d->res); \
439   if(d->rv != PGRES_COMMAND_OK && \
440      d->rv != PGRES_TUPLES_OK) { \
441     noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s'\n", \
442           d->rv, PQresultErrorMessage(d->res), cmd); \
443     PQclear(d->res); \
444     goto bad_row; \
445   } \
446 } while(0)
447
448 #define PG_TM_EXEC(cmd, whence) do { \
449   time_t __w = whence; \
450   char cmdbuf[4096]; \
451   struct tm tbuf, *tm; \
452   tm = gmtime_r(&__w, &tbuf); \
453   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
454   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
455                         (const char * const *)d->paramValues, \
456                         d->paramLengths, d->paramFormats, 0); \
457   d->rv = PQresultStatus(d->res); \
458   if(d->rv != PGRES_COMMAND_OK && \
459      d->rv != PGRES_TUPLES_OK) { \
460     noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \
461           d->rv, PQresultErrorMessage(d->res), cmdbuf, \
462           (long long unsigned)whence); \
463     PQclear(d->res); \
464     goto bad_row; \
465   } \
466 } while(0)
467
468 static int
469 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
470                                     struct timeval *now) {
471   conn_q *cq = closure;
472   ds_single_detail *d;
473   int i, row_count = 0, good = 0;
474   char buff[1024];
475
476   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
477   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
478
479   stratcon_database_connect(cq);
480   d = calloc(1, sizeof(*d));
481   GET_QUERY(check_loadall);
482   PG_EXEC(check_loadall);
483   row_count = PQntuples(d->res);
484  
485   for(i=0; i<row_count; i++) {
486     int rv;
487     int8_t family;
488     struct sockaddr *sin;
489     struct sockaddr_in sin4 = { .sin_family = AF_INET };
490     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
491     char *remote, *id, *target, *module, *name;
492     PG_GET_STR_COL(remote, i, "remote_address");
493     PG_GET_STR_COL(id, i, "id");
494     PG_GET_STR_COL(target, i, "target");
495     PG_GET_STR_COL(module, i, "module");
496     PG_GET_STR_COL(name, i, "name");
497     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
498
499     family = AF_INET;
500     sin = (struct sockaddr *)&sin4;
501     rv = inet_pton(family, remote, &sin4.sin_addr);
502     if(rv != 1) {
503       family = AF_INET6;
504       sin = (struct sockaddr *)&sin6;
505       rv = inet_pton(family, remote, &sin6.sin6_addr);
506       if(rv != 1) {
507         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
508         sin = NULL;
509       }
510     }
511
512     /* stratcon_iep_line_processor takes an allocated operand and frees it */
513     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
514     good++;
515   }
516   noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count);
517  bad_row:
518   free_params((ds_single_detail *)d);
519   free(d);
520   release_conn_q(cq);
521   return 0;
522 }
523 void
524 stratcon_datastore_iep_check_preload() {
525   eventer_t e;
526   conn_q *cq;
527   cq = get_conn_q_for_metanode();
528
529   e = eventer_alloc();
530   e->mask = EVENTER_ASYNCH;
531   e->callback = stratcon_datastore_asynch_drive_iep;
532   e->closure = cq;
533   eventer_add_asynch(cq->pool->jobq, e);
534 }
535 execute_outcome_t
536 stratcon_datastore_find(conn_q *cq, ds_rt_detail *d) {
537   char *val;
538   int row_count;
539   struct realtime_tracker *node;
540
541   GET_QUERY(check_find);
542   for(node = d->rt; node; node = node->next) {
543     DECLARE_PARAM_INT(node->sid);
544     PG_EXEC(check_find);
545     row_count = PQntuples(d->res);
546     if(row_count != 1) {
547       PQclear(d->res);
548       goto bad_row;
549     }
550
551     /* Get the check uuid */
552     PG_GET_STR_COL(val, 0, "id");
553     if(!val) {
554       PQclear(d->res);
555       goto bad_row;
556     }
557     if(uuid_parse(val, node->checkid)) {
558       PQclear(d->res);
559       goto bad_row;
560     }
561  
562     /* Get the remote_address (which noit owns this) */
563     PG_GET_STR_COL(val, 0, "remote_address");
564     if(!val) {
565       PQclear(d->res);
566       goto bad_row;
567     }
568     node->noit = strdup(val);
569  
570    bad_row:
571     free_params((ds_single_detail *)d);
572     d->nparams = 0;
573   }
574   return DS_EXEC_SUCCESS;
575 }
576 execute_outcome_t
577 stratcon_datastore_execute(conn_q *cq, const char *r, ds_line_detail *d) {
578   int type, len;
579   char *final_buff;
580   uLong final_len, actual_final_len;
581   char *token;
582   char raddr_blank[1] = "";
583   const char *raddr;
584
585   type = d->data[0];
586   raddr = r ? r : raddr_blank;
587
588   /* Parse the log line, but only if we haven't already */
589   if(!d->nparams) {
590     char *scp, *ecp;
591
592     scp = d->data;
593 #define PROCESS_NEXT_FIELD(t,l) do { \
594   if(!*scp) goto bad_row; \
595   ecp = strchr(scp, '\t'); \
596   if(!ecp) goto bad_row; \
597   token = scp; \
598   len = (ecp-scp); \
599   scp = ecp + 1; \
600 } while(0)
601 #define PROCESS_LAST_FIELD(t,l) do { \
602   if(!*scp) ecp = scp; \
603   else { \
604     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
605     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
606   } \
607   t = scp; \
608   l = (ecp-scp); \
609 } while(0)
610
611     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
612     switch(type) {
613       /* See noit_check_log.c for log description */
614       case 'n':
615         DECLARE_PARAM_STR(raddr, strlen(raddr));
616         DECLARE_PARAM_STR("noitd",5); /* node_type */
617         PROCESS_NEXT_FIELD(token,len);
618         d->whence = (time_t)strtoul(token, NULL, 10);
619         DECLARE_PARAM_STR(token,len); /* timestamp */
620
621         /* This is the expected uncompressed len */
622         PROCESS_NEXT_FIELD(token,len);
623         final_len = atoi(token);
624         final_buff = malloc(final_len);
625         if(!final_buff) goto bad_row;
626  
627         /* The last token is b64 endoded and compressed.
628          * we need to decode it, declare it and then free it.
629          */
630         PROCESS_LAST_FIELD(token, len);
631         /* We can in-place decode this */
632         len = noit_b64_decode((char *)token, len,
633                               (unsigned char *)token, len);
634         if(len <= 0) {
635           noitL(noit_error, "noitd config base64 decoding error.\n");
636           free(final_buff);
637           goto bad_row;
638         }
639         actual_final_len = final_len;
640         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
641                               (unsigned char *)token, len)) {
642           noitL(noit_error, "noitd config decompression failure.\n");
643           free(final_buff);
644           goto bad_row;
645         }
646         if(final_len != actual_final_len) {
647           noitL(noit_error, "noitd config decompression error.\n");
648           free(final_buff);
649           goto bad_row;
650         }
651         DECLARE_PARAM_STR(final_buff, final_len);
652         free(final_buff);
653         break;
654       case 'C':
655         DECLARE_PARAM_STR(raddr, strlen(raddr));
656         PROCESS_NEXT_FIELD(token,len);
657         DECLARE_PARAM_STR(token,len); /* timestamp */
658         d->whence = (time_t)strtoul(token, NULL, 10);
659         PROCESS_NEXT_FIELD(token, len);
660         DECLARE_PARAM_STR(token,len); /* uuid */
661         PROCESS_NEXT_FIELD(token, len);
662         DECLARE_PARAM_STR(token,len); /* target */
663         PROCESS_NEXT_FIELD(token, len);
664         DECLARE_PARAM_STR(token,len); /* module */
665         PROCESS_LAST_FIELD(token, len);
666         DECLARE_PARAM_STR(token,len); /* name */
667         break;
668       case 'M':
669         PROCESS_NEXT_FIELD(token,len);
670         DECLARE_PARAM_STR(token,len); /* timestamp */
671         d->whence = (time_t)strtoul(token, NULL, 10);
672         PROCESS_NEXT_FIELD(token, len);
673         DECLARE_PARAM_STR(token,len); /* uuid */
674         PROCESS_NEXT_FIELD(token, len);
675         DECLARE_PARAM_STR(token,len); /* name */
676         PROCESS_NEXT_FIELD(token,len);
677         d->metric_type = *token;
678         PROCESS_LAST_FIELD(token,len);
679         DECLARE_PARAM_STR(token,len); /* value */
680         break;
681       case 'S':
682         PROCESS_NEXT_FIELD(token,len);
683         DECLARE_PARAM_STR(token,len); /* timestamp */
684         d->whence = (time_t)strtoul(token, NULL, 10);
685         PROCESS_NEXT_FIELD(token, len);
686         DECLARE_PARAM_STR(token,len); /* uuid */
687         PROCESS_NEXT_FIELD(token, len);
688         DECLARE_PARAM_STR(token,len); /* state */
689         PROCESS_NEXT_FIELD(token, len);
690         DECLARE_PARAM_STR(token,len); /* availability */
691         PROCESS_NEXT_FIELD(token, len);
692         DECLARE_PARAM_STR(token,len); /* duration */
693         PROCESS_LAST_FIELD(token,len);
694         DECLARE_PARAM_STR(token,len); /* status */
695         break;
696       default:
697         goto bad_row;
698     }
699
700   }
701
702   /* Now execute the query */
703   switch(type) {
704     case 'n':
705       GET_QUERY(config_insert);
706       PG_EXEC(config_insert);
707       PQclear(d->res);
708       break;
709     case 'C':
710       GET_QUERY(check_insert);
711       PG_EXEC(check_insert);
712       PQclear(d->res);
713       break;
714     case 'S':
715       GET_QUERY(status_insert);
716       PG_TM_EXEC(status_insert, d->whence);
717       PQclear(d->res);
718       break;
719     case 'M':
720       switch(d->metric_type) {
721         case METRIC_INT32:
722         case METRIC_UINT32:
723         case METRIC_INT64:
724         case METRIC_UINT64:
725         case METRIC_DOUBLE:
726           GET_QUERY(metric_insert_numeric);
727           PG_TM_EXEC(metric_insert_numeric, d->whence);
728           PQclear(d->res);
729           break;
730         case METRIC_STRING:
731           GET_QUERY(metric_insert_text);
732           PG_TM_EXEC(metric_insert_text, d->whence);
733           PQclear(d->res);
734           break;
735         default:
736           goto bad_row;
737       }
738       break;
739     default:
740       /* should never get here */
741       goto bad_row;
742   }
743   return DS_EXEC_SUCCESS;
744  bad_row:
745   return DS_EXEC_ROW_FAILED;
746 }
747 static int
748 stratcon_database_post_connect(conn_q *cq) {
749   int rv = 0;
750   ds_single_detail _d = { 0 }, *d = &_d;
751   if(cq->remote_str) {
752     char *remote_str, *remote_cn;
753     /* This is the silly way we get null's in through our declare_param_str */
754     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
755     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
756     /* This is a storage node, it gets the storage node post_connect */
757     GET_QUERY(storage_post_connect);
758     rv = -1; /* now we're serious */
759     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
760     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
761     PG_EXEC(storage_post_connect);
762     PQclear(d->res);
763     rv = 0;
764   }
765   else {
766     /* Metanode post_connect */
767     GET_QUERY(metanode_post_connect);
768     rv = -1; /* now we're serious */
769     PG_EXEC(metanode_post_connect);
770     PQclear(d->res);
771     rv = 0;
772   }
773  bad_row:
774   free_params(d);
775   if(rv == -1) {
776     /* Post-connect intentions are serious and fatal */
777     PQfinish(cq->dbh);
778     cq->dbh = NULL;
779   }
780   return rv;
781 }
782 static int
783 stratcon_database_connect(conn_q *cq) {
784   char *dsn, dsn_meta[512];
785   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
786   const char *k, *v;
787   int klen;
788   noit_hash_table *t;
789
790   dsn_meta[0] = '\0';
791   if(!cq->dsn) {
792     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
793     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
794       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
795       strlcat(dsn_meta, k, sizeof(dsn_meta));
796       strlcat(dsn_meta, "=", sizeof(dsn_meta));
797       strlcat(dsn_meta, v, sizeof(dsn_meta));
798     }
799     noit_hash_destroy(t, free, free);
800     free(t);
801     dsn = dsn_meta;
802   }
803   else dsn = cq->dsn;
804
805   if(cq->dbh) {
806     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
807     PQreset(cq->dbh);
808     if(stratcon_database_post_connect(cq)) return -1;
809     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
810     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
811           dsn, PQerrorMessage(cq->dbh));
812     return -1;
813   }
814
815   cq->dbh = PQconnectdb(dsn);
816   if(!cq->dbh) return -1;
817   if(stratcon_database_post_connect(cq)) return -1;
818   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
819   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
820         dsn, PQerrorMessage(cq->dbh));
821   return -1;
822 }
823 static int
824 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
825                                 const char *name) {
826   int rv = -1;
827   PGresult *res;
828   char cmd[128];
829   strlcpy(cmd, p, sizeof(cmd));
830   strlcat(cmd, name, sizeof(cmd));
831   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
832   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
833   PQclear(res);
834   return rv;
835 }
836 static int
837 stratcon_datastore_do(conn_q *cq, const char *cmd) {
838   PGresult *res;
839   int rv = -1;
840   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
841   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
842   PQclear(res);
843   return rv;
844 }
845 #define BUSTED(cq) do { \
846   PQfinish((cq)->dbh); \
847   (cq)->dbh = NULL; \
848   goto full_monty; \
849 } while(0)
850 #define SAVEPOINT(name) do { \
851   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
852   last_sp = current; \
853 } while(0)
854 #define ROLLBACK_TO_SAVEPOINT(name) do { \
855   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
856     BUSTED(cq); \
857   last_sp = NULL; \
858 } while(0)
859 #define RELEASE_SAVEPOINT(name) do { \
860   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
861     BUSTED(cq); \
862   last_sp = NULL; \
863 } while(0)
864 int
865 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
866                                  struct timeval *now) {
867   ds_rt_detail *dsjd = closure;
868   conn_q *cq = dsjd->cq;
869   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
870
871   stratcon_database_connect(cq);
872   assert(dsjd->rt);
873   stratcon_datastore_find(cq, dsjd);
874   if(dsjd->completion_event)
875     eventer_add(dsjd->completion_event);
876
877   free_params((ds_single_detail *)dsjd);
878   free(dsjd);
879   release_conn_q(cq);
880   return 0;
881 }
882 static const char *
883 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
884   void *vinfo;
885   const char *dsn = NULL, *fqdn = NULL;
886   int found = 0;
887   storagenode_info *info = NULL;
888   pthread_mutex_lock(&storagenode_to_info_cache_lock);
889   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
890                         &vinfo)) {
891     found = 1;
892     info = vinfo;
893   }
894   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
895   if(found) {
896     if(fqdn_out) *fqdn_out = info->fqdn;
897     return info->dsn;
898   }
899
900   if(!found && can_use_db) {
901     ds_single_detail *d;
902     conn_q *cq;
903     int row_count;
904     /* Look it up and store it */
905     d = calloc(1, sizeof(*d));
906     cq = get_conn_q_for_metanode();
907     GET_QUERY(find_storage);
908     DECLARE_PARAM_INT(id);
909     PG_EXEC(find_storage);
910     row_count = PQntuples(d->res);
911     if(row_count) {
912       PG_GET_STR_COL(dsn, 0, "dsn");
913       PG_GET_STR_COL(fqdn, 0, "fqdn");
914     }
915     PQclear(d->res);
916    bad_row:
917     free_params(d);
918     free(d);
919   }
920   if(fqdn) {
921     info = calloc(1, sizeof(*info));
922     info->fqdn = strdup(fqdn);
923     if(fqdn_out) *fqdn_out = info->fqdn;
924     info->dsn = dsn ? strdup(dsn) : NULL;
925     info->storagenode_id = id;
926     pthread_mutex_lock(&storagenode_to_info_cache_lock);
927     noit_hash_store(&storagenode_to_info_cache,
928                     (void *)&info->storagenode_id, sizeof(int), info);
929     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
930   }
931   return info ? info->dsn : NULL;
932 }
933 static ds_line_detail *
934 build_insert_batch(interim_journal_t *ij) {
935   int rv;
936   off_t len;
937   const char *buff, *cp, *lcp;
938   struct stat st;
939   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
940
941   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
942   assert(rv != -1);
943   len = st.st_size;
944   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
945   if(buff == (void *)-1) {
946     noitL(noit_error, "mmap(%s) => %s\n", ij->filename, strerror(errno));
947     assert(buff != (void *)-1);
948   }
949   lcp = buff;
950   while(lcp < (buff + len) &&
951         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
952     next = calloc(1, sizeof(*next));
953     next->data = malloc(cp - lcp + 1);
954     memcpy(next->data, lcp, cp - lcp);
955     next->data[cp - lcp] = '\0';
956     if(!head) head = next;
957     if(last) last->next = next;
958     last = next;
959     lcp = cp + 1;
960   }
961   munmap((void *)buff, len);
962   return head;
963 }
964 static void
965 interim_journal_remove(interim_journal_t *ij) {
966   unlink(ij->filename);
967   if(ij->filename) free(ij->filename);
968   if(ij->remote_str) free(ij->remote_str);
969   if(ij->remote_cn) free(ij->remote_cn);
970   if(ij->fqdn) free(ij->fqdn);
971 }
972 int
973 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
974                                   struct timeval *now) {
975   int i;
976   interim_journal_t *ij;
977   ds_line_detail *head, *current, *last_sp;
978   const char *dsn;
979   conn_q *cq;
980   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
981   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
982
983   ij = closure;
984   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
985   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
986                              ij->fqdn, dsn);
987   noitL(noit_debug, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
988         ij->remote_str, ij->remote_cn, ij->fqdn);
989  full_monty:
990   /* Make sure we have a connection */
991   i = 1;
992   while(stratcon_database_connect(cq)) {
993     noitL(noit_error, "Error connecting to database\n");
994     sleep(i);
995     i *= 2;
996     i = MIN(i, 16);
997   }
998
999   head = build_insert_batch(ij);
1000   current = head;
1001   last_sp = NULL;
1002   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1003   while(current) {
1004     execute_outcome_t rv;
1005     if(current->data) {
1006       if(!last_sp) SAVEPOINT("batch");
1007  
1008       if(current->problematic) {
1009         noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1010         RELEASE_SAVEPOINT("batch");
1011         current = current->next;
1012         continue;
1013       }
1014       rv = stratcon_datastore_execute(cq, cq->remote_str, current);
1015       switch(rv) {
1016         case DS_EXEC_SUCCESS:
1017           current = current->next;
1018           break;
1019         case DS_EXEC_ROW_FAILED:
1020           /* rollback to savepoint, mark this record as bad and start again */
1021           current->problematic = 1;
1022           current = last_sp;
1023           ROLLBACK_TO_SAVEPOINT("batch");
1024           break;
1025         case DS_EXEC_TXN_FAILED:
1026           BUSTED(cq);
1027       }
1028     }
1029   }
1030   if(last_sp) RELEASE_SAVEPOINT("batch");
1031   if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
1032   /* Cleanup the mess */
1033   while(head) {
1034     ds_line_detail *tofree;
1035     tofree = head;
1036     head = head->next;
1037     if(tofree->data) free(tofree->data);
1038     free_params((ds_single_detail *)tofree);
1039     free(tofree);
1040   }
1041   interim_journal_remove(ij);
1042   release_conn_q(cq);
1043   return 0;
1044 }
1045 static int
1046 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1047                                 struct timeval *now) {
1048   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1049   const char *k;
1050   int klen;
1051   void *vij;
1052   interim_journal_t *ij;
1053   syncset_t *syncset = closure;
1054
1055   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1056   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1057
1058   noitL(noit_debug, "Syncing journal sets...\n");
1059   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1060     eventer_t ingest;
1061     ij = vij;
1062     noitL(noit_debug, "Syncing journal set [%s,%s,%s]\n",
1063           ij->remote_str, ij->remote_cn, ij->fqdn);
1064     fsync(ij->fd);
1065     ingest = eventer_alloc();
1066     ingest->mask = EVENTER_ASYNCH;
1067     ingest->callback = stratcon_datastore_asynch_execute;
1068     ingest->closure = ij;
1069     eventer_add_asynch(ij->cpool->jobq, ingest);
1070   }
1071   noit_hash_destroy(syncset->ws, free, NULL);
1072   free(syncset->ws);
1073   eventer_add(syncset->completion);
1074   free(syncset);
1075   return 0;
1076 }
1077 static interim_journal_t *
1078 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1079                     int storagenode_id, const char *fqdn_in) {
1080   void *vhash, *vij;
1081   noit_hash_table *working_set;
1082   interim_journal_t *ij;
1083   struct timeval now;
1084   char jpath[PATH_MAX];
1085   char remote_str[128];
1086   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1087   const char *fqdn = fqdn_in ? fqdn_in : "default";
1088
1089   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1090   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1091
1092   /* Lookup the working set */
1093   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1094     working_set = calloc(1, sizeof(*working_set));
1095     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1096                     working_set);
1097   }
1098   else
1099     working_set = vhash;
1100
1101   /* Lookup the interim journal within the working set */
1102   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1103     ij = calloc(1, sizeof(*ij));
1104     gettimeofday(&now, NULL);
1105     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1106              basejpath, remote_str, remote_cn, storagenode_id,
1107              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1108     ij->remote_str = strdup(remote_str);
1109     ij->remote_cn = strdup(remote_cn);
1110     ij->fqdn = strdup(fqdn);
1111     ij->storagenode_id = storagenode_id;
1112     ij->filename = strdup(jpath);
1113     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1114                                          ij->fqdn);
1115     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1116     if(ij->fd < 0 && errno == ENOENT) {
1117       if(mkdir_for_file(ij->filename, 0750)) {
1118         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1119               ij->filename, strerror(errno));
1120         exit(-1);
1121       }
1122       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1123     }
1124     if(ij->fd < 0) {
1125       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1126             ij->filename, strerror(errno));
1127       exit(-1);
1128     }
1129     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1130   }
1131   else
1132     ij = vij;
1133
1134   return ij;
1135 }
1136 static void
1137 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1138                           int *sid_out, int *storagenode_id_out,
1139                           char **fqdn_out, char **dsn_out) {
1140   /* only called from the main thread -- no safety issues */
1141   void *vuuidinfo, *vinfo;
1142   uuid_info *uuidinfo;
1143   storagenode_info *info = NULL;
1144   char *fqdn = NULL;
1145   char *dsn = NULL;
1146   int storagenode_id = 0, sid = 0;
1147   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1148                          &vuuidinfo)) {
1149     int row_count;
1150     char *tmpint;
1151     ds_single_detail *d;
1152     conn_q *cq;
1153     d = calloc(1, sizeof(*d));
1154     cq = get_conn_q_for_metanode();
1155     if(stratcon_database_connect(cq) == 0) {
1156       /* Blocking call to service the cache miss */
1157       GET_QUERY(check_map);
1158       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1159       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1160       PG_EXEC(check_map);
1161       row_count = PQntuples(d->res);
1162       if(row_count != 1) {
1163         PQclear(d->res);
1164         goto bad_row;
1165       }
1166       PG_GET_STR_COL(tmpint, 0, "sid");
1167       sid = atoi(tmpint);
1168       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1169       if(tmpint) storagenode_id = atoi(tmpint);
1170       PG_GET_STR_COL(fqdn, 0, "fqdn");
1171       PG_GET_STR_COL(dsn, 0, "dsn");
1172       PQclear(d->res);
1173     }
1174    bad_row:
1175     free_params((ds_single_detail *)d);
1176     free(d);
1177     release_conn_q(cq);
1178     /* Place in cache */
1179     if(fqdn) fqdn = strdup(fqdn);
1180     uuidinfo = calloc(1, sizeof(*uuidinfo));
1181     uuidinfo->sid = sid;
1182     uuidinfo->uuid_str = strdup(uuid_str);
1183     noit_hash_store(&uuid_to_info_cache,
1184                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1185     /* Also, we may have just witnessed a new storage node, store it */
1186     if(storagenode_id) {
1187       int needs_free = 0;
1188       info = calloc(1, sizeof(*info));
1189       info->storagenode_id = storagenode_id;
1190       info->dsn = dsn ? strdup(dsn) : NULL;
1191       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1192       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1193       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1194                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1195         /* hack to save memory -- we *never* remove from these caches,
1196            so we can use the same fqdn value in the above cache for the key
1197            in the cache below -- (no strdup) */
1198         noit_hash_store(&storagenode_to_info_cache,
1199                         (void *)&info->storagenode_id, sizeof(int), info);
1200       }
1201       else needs_free = 1;
1202       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1203       if(needs_free) {
1204         if(info->dsn) free(info->dsn);
1205         if(info->fqdn) free(info->fqdn);
1206         free(info);
1207       }
1208     }
1209   }
1210   else
1211     uuidinfo = vuuidinfo;
1212
1213   if(storagenode_id) {
1214     if(uuidinfo &&
1215        ((!dsn && dsn_out) || (!fqdn && fqdn_out))) {
1216       /* we don't have dsn and we actually want it */
1217       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1218       if(noit_hash_retrieve(&storagenode_to_info_cache,
1219                             (void *)&storagenode_id, sizeof(int), &vinfo))
1220         info = vinfo;
1221       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1222     }
1223   }
1224
1225   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1226   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1227   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1228   if(sid_out) *sid_out = uuidinfo->sid;
1229 }
1230 static void
1231 stratcon_datastore_journal(struct sockaddr *remote,
1232                            const char *remote_cn, const char *line) {
1233   interim_journal_t *ij = NULL;
1234   char uuid_str[UUID_STR_LEN+1], *cp, *fqdn, *dsn;
1235   int storagenode_id = 0;
1236   uuid_t checkid;
1237   if(!line) return;
1238   /* if it is a UUID based thing, find the storage node */
1239   switch(*line) {
1240     case 'C':
1241     case 'S':
1242     case 'M':
1243       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1244         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1245         if(!uuid_parse(uuid_str, checkid)) {
1246           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1247                                     &storagenode_id, &fqdn, &dsn);
1248           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1249         }
1250       }
1251       break;
1252     case 'n':
1253       ij = interim_journal_get(remote,remote_cn,0,NULL);
1254       break;
1255     default:
1256       break;
1257   }
1258   if(!ij && fqdn) {
1259     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1260   }
1261   else {
1262     int len;
1263     len = write(ij->fd, line, strlen(line));
1264     if(len < 0) {
1265       noitL(noit_error, "write to %s failed: %s\n",
1266             ij->filename, strerror(errno));
1267     }
1268   }
1269   return;
1270 }
1271 static noit_hash_table *
1272 stratcon_datastore_journal_remove(struct sockaddr *remote,
1273                                   const char *remote_cn) {
1274   void *vhash = NULL;
1275   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1276     /* pluck it out */
1277     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1278   }
1279   else {
1280     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1281           remote_cn);
1282     abort();
1283   }
1284   return vhash;
1285 }
1286 void
1287 stratcon_datastore_push(stratcon_datastore_op_t op,
1288                         struct sockaddr *remote,
1289                         const char *remote_cn, void *operand,
1290                         eventer_t completion) {
1291   conn_q *cq;
1292   syncset_t *syncset;
1293   eventer_t e;
1294   ds_rt_detail *rtdetail;
1295   struct datastore_onlooker_list *nnode;
1296
1297   for(nnode = onlookers; nnode; nnode = nnode->next)
1298     nnode->dispatch(op,remote,remote_cn,operand);
1299
1300   switch(op) {
1301     case DS_OP_INSERT:
1302       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1303       break;
1304     case DS_OP_CHKPT:
1305       e = eventer_alloc();
1306       syncset = calloc(1, sizeof(*syncset));
1307       e->mask = EVENTER_ASYNCH;
1308       e->callback = stratcon_datastore_journal_sync;
1309       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1310       syncset->completion = completion;
1311       e->closure = syncset;
1312       eventer_add(e);
1313       break;
1314     case DS_OP_FIND_COMPLETE:
1315       cq = get_conn_q_for_metanode();
1316       rtdetail = calloc(1, sizeof(*rtdetail));
1317       rtdetail->rt = operand;
1318       rtdetail->completion_event = completion;
1319       e = eventer_alloc();
1320       e->mask = EVENTER_ASYNCH;
1321       e->callback = stratcon_datastore_asynch_lookup;
1322       e->closure = rtdetail;
1323       eventer_add_asynch(cq->pool->jobq, e);
1324       break;
1325   }
1326 }
1327
1328 int
1329 stratcon_datastore_saveconfig(void *unused) {
1330   int rv = -1;
1331   char *buff;
1332   ds_single_detail _d = { 0 }, *d = &_d;
1333   conn_q *cq;
1334   cq = get_conn_q_for_metanode();
1335
1336   if(stratcon_database_connect(cq) == 0) {
1337     char time_as_str[20];
1338     size_t len;
1339     buff = noit_conf_xml_in_mem(&len);
1340     if(!buff) goto bad_row;
1341
1342     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1343     DECLARE_PARAM_STR("0.0.0.0", 7);
1344     DECLARE_PARAM_STR("stratcond", 9);
1345     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1346     DECLARE_PARAM_STR(buff, len);
1347     free(buff);
1348
1349     GET_QUERY(config_insert);
1350     PG_EXEC(config_insert);
1351     PQclear(d->res);
1352     rv = 0;
1353
1354     bad_row:
1355       free_params(d);
1356   }
1357   release_conn_q(cq);
1358   return rv;
1359 }
1360
1361 void
1362 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1363                                                struct sockaddr *,
1364                                                const char *, void *)) {
1365   struct datastore_onlooker_list *nnode;
1366   nnode = calloc(1, sizeof(*nnode));
1367   nnode->dispatch = f;
1368   nnode->next = onlookers;
1369   while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (void *)nnode->next)
1370     nnode->next = onlookers;
1371 }
1372 static void
1373 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1374                                          char *id_str, char *file) {
1375   char path[PATH_MAX];
1376   interim_journal_t *ij;
1377   eventer_t ingest;
1378
1379   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1380            basejpath, remote_str, remote_cn, id_str, file);
1381   ij = calloc(1, sizeof(*ij));
1382   ij->fd = open(path, O_RDONLY);
1383   if(ij->fd < 0) {
1384     noitL(noit_error, "cannot open journal '%s': %s\n",
1385           path, strerror(errno));
1386     free(ij);
1387     return;
1388   }
1389   ij->filename = strdup(path);
1390   ij->remote_str = strdup(remote_str);
1391   ij->remote_cn = strdup(remote_cn);
1392   ij->storagenode_id = atoi(id_str);
1393   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1394                                        ij->fqdn);
1395
1396   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1397   ingest = eventer_alloc();
1398   ingest->mask = EVENTER_ASYNCH;
1399   ingest->callback = stratcon_datastore_asynch_execute;
1400   ingest->closure = ij;
1401   eventer_add_asynch(ij->cpool->jobq, ingest);
1402 }
1403 static void
1404 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1405   char path[PATH_MAX];
1406   DIR *root;
1407   struct dirent de, *entry;
1408   int i = 0, cnt = 0;
1409   char **entries;
1410
1411   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1412            first ? "/" : "", first ? first : "",
1413            second ? "/" : "", second ? second : "",
1414            third ? "/" : "", third ? third : "");
1415   root = opendir(path);
1416   if(!root) return;
1417   while(readdir_r(root, &de, &entry) == 0 && entry != NULL) cnt++;
1418   rewinddir(root);
1419   entries = malloc(sizeof(*entries) * cnt);
1420   while(readdir_r(root, &de, &entry) == 0 && entry != NULL) {
1421     if(i < cnt) {
1422       entries[i++] = strdup(entry->d_name);
1423     }
1424   }
1425   closedir(root);
1426   cnt = i; /* could have changed, directories are fickle */
1427   qsort(entries, i, sizeof(*entries),
1428         (int (*)(const void *, const void *))strcasecmp);
1429   for(i=0; i<cnt; i++) {
1430     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1431     if(!first)
1432       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1433     else if(!second)
1434       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1435     else if(!third)
1436       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1437     else if(strlen(entries[i]) == 16)
1438       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1439   }
1440 }
1441 static void
1442 stratcon_datastore_sweep_journals() {
1443   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1444 }
1445
1446 int
1447 stratcon_datastore_ingest_all_storagenode_info() {
1448   int i, cnt;
1449   ds_single_detail _d = { 0 }, *d = &_d;
1450   conn_q *cq;
1451   cq = get_conn_q_for_metanode();
1452
1453   while(stratcon_database_connect(cq)) {
1454     noitL(noit_error, "Error connecting to database\n");
1455     sleep(1);
1456   }
1457
1458   GET_QUERY(all_storage);
1459   PG_EXEC(all_storage);
1460   cnt = PQntuples(d->res);
1461   for(i=0; i<cnt; i++) {
1462     void *vinfo;
1463     char *tmpint, *fqdn, *dsn;
1464     int storagenode_id;
1465     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1466     storagenode_id = atoi(tmpint);
1467     PG_GET_STR_COL(fqdn, i, "fqdn");
1468     PG_GET_STR_COL(dsn, i, "dsn");
1469     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1470     storagenode_id = tmpint ? atoi(tmpint) : 0;
1471
1472     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1473                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1474       storagenode_info *info;
1475       info = calloc(1, sizeof(*info));
1476       info->storagenode_id = storagenode_id;
1477       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1478       info->dsn = dsn ? strdup(dsn) : NULL;
1479       noit_hash_store(&storagenode_to_info_cache,
1480                       (void *)&info->storagenode_id, sizeof(int), info);
1481     }
1482   }
1483   PQclear(d->res);
1484  bad_row:
1485   free_params(d);
1486
1487   release_conn_q(cq);
1488   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1489   return cnt;
1490 }
1491 int
1492 stratcon_datastore_ingest_all_check_info() {
1493   int i, cnt, loaded = 0;
1494   ds_single_detail _d = { 0 }, *d = &_d;
1495   conn_q *cq;
1496   cq = get_conn_q_for_metanode();
1497
1498   while(stratcon_database_connect(cq)) {
1499     noitL(noit_error, "Error connecting to database\n");
1500     sleep(1);
1501   }
1502
1503   GET_QUERY(check_mapall);
1504   PG_EXEC(check_mapall);
1505   cnt = PQntuples(d->res);
1506   for(i=0; i<cnt; i++) {
1507     void *vinfo;
1508     char *tmpint, *fqdn, *dsn, *uuid_str;
1509     int sid, storagenode_id;
1510     uuid_info *uuidinfo;
1511     PG_GET_STR_COL(uuid_str, i, "id");
1512     if(!uuid_str) continue;
1513     PG_GET_STR_COL(tmpint, i, "sid");
1514     if(!tmpint) continue;
1515     sid = atoi(tmpint);
1516     PG_GET_STR_COL(fqdn, i, "fqdn");
1517     PG_GET_STR_COL(dsn, i, "dsn");
1518     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1519     storagenode_id = tmpint ? atoi(tmpint) : 0;
1520
1521     uuidinfo = calloc(1, sizeof(*uuidinfo));
1522     uuidinfo->uuid_str = strdup(uuid_str);
1523     uuidinfo->sid = sid;
1524     noit_hash_store(&uuid_to_info_cache,
1525                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1526     loaded++;
1527     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1528                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1529       storagenode_info *info;
1530       info = calloc(1, sizeof(*info));
1531       info->storagenode_id = storagenode_id;
1532       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1533       info->dsn = dsn ? strdup(dsn) : NULL;
1534       noit_hash_store(&storagenode_to_info_cache,
1535                       (void *)&info->storagenode_id, sizeof(int), info);
1536     }
1537   }
1538   PQclear(d->res);
1539  bad_row:
1540   free_params(d);
1541
1542   release_conn_q(cq);
1543   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1544   return loaded;
1545 }
1546 void
1547 stratcon_datastore_init() {
1548   pthread_mutex_init(&ds_conns_lock, NULL);
1549   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1550   ds_err = noit_log_stream_find("error/datastore");
1551   ingest_err = noit_log_stream_find("error/ingest");
1552   if(!ds_err) ds_err = noit_error;
1553   if(!ingest_err) ingest_err = noit_error;
1554   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1555                            &basejpath)) {
1556     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1557     exit(-1);
1558   }
1559   stratcon_datastore_ingest_all_check_info();
1560   stratcon_datastore_ingest_all_storagenode_info();
1561   stratcon_datastore_sweep_journals();
1562 }
Note: See TracBrowser for help on using the browser.