root/src/stratcon_datastore.c

Revision d2dd72fa3f0404e6651f95cdc95214380a5630a2, 50.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

partitioning by day now -- so says the Treat, refs #150

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_realtime_http.h"
41 #include "stratcon_iep.h"
42 #include "noit_conf.h"
43 #include "noit_check.h"
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <netinet/in.h>
47 #include <sys/un.h>
48 #include <dirent.h>
49 #include <arpa/inet.h>
50 #include <sys/mman.h>
51 #include <libpq-fe.h>
52 #include <zlib.h>
53 #include <assert.h>
54 #include <errno.h>
55
56 #define DECL_STMT(codename,confname) \
57 static char *codename = NULL; \
58 static const char *codename##_conf = "/stratcon/database/statements/" #confname
59
60 DECL_STMT(storage_post_connect, storagepostconnect);
61 DECL_STMT(metanode_post_connect, metanodepostconnect);
62 DECL_STMT(find_storage, findstoragenode);
63 DECL_STMT(all_storage, allstoragenodes);
64 DECL_STMT(check_map, mapchecktostoragenode);
65 DECL_STMT(check_mapall, mapallchecks);
66 DECL_STMT(check_loadall, allchecks);
67 DECL_STMT(check_find, findcheck);
68 DECL_STMT(check_insert, check);
69 DECL_STMT(status_insert, status);
70 DECL_STMT(metric_insert_numeric, metric_numeric);
71 DECL_STMT(metric_insert_text, metric_text);
72 DECL_STMT(config_insert, config);
73
74 static noit_log_stream_t ds_err = NULL;
75 static noit_log_stream_t ds_deb = NULL;
76 static noit_log_stream_t ingest_err = NULL;
77
78 static struct datastore_onlooker_list {
79   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
80                    const char *, void *);
81   struct datastore_onlooker_list *next;
82 } *onlookers = NULL;
83
84 #define GET_QUERY(a) do { \
85   if(a == NULL) \
86     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
87       goto bad_row; \
88 } while(0)
89
90 struct conn_q;
91
92 typedef struct {
93   char            *queue_name; /* the key fqdn+remote_sn */
94   eventer_jobq_t  *jobq;
95   struct conn_q   *head;
96   pthread_mutex_t  lock;
97   pthread_cond_t   cv;
98   int              ttl;
99   int              in_pool;
100   int              outstanding;
101   int              max_allocated;
102   int              max_in_pool;
103 } conn_pool;
104 typedef struct conn_q {
105   time_t           last_use;
106   char            *dsn;        /* Pg connect string */
107   char            *remote_str; /* the IP of the noit*/
108   char            *remote_cn;  /* the Cert CN of the noit */
109   char            *fqdn;       /* the fqdn of the storage node */
110   conn_pool       *pool;
111   struct conn_q   *next;
112   /* Postgres specific stuff */
113   PGconn          *dbh;
114 } conn_q;
115
116
117 #define MAX_PARAMS 8
118 #define POSTGRES_PARTS \
119   PGresult *res; \
120   int rv; \
121   time_t whence; \
122   int nparams; \
123   int metric_type; \
124   char *paramValues[MAX_PARAMS]; \
125   int paramLengths[MAX_PARAMS]; \
126   int paramFormats[MAX_PARAMS]; \
127   int paramAllocd[MAX_PARAMS];
128
129 typedef struct ds_single_detail {
130   POSTGRES_PARTS
131 } ds_single_detail;
132 typedef struct {
133   /* Postgres specific stuff */
134   POSTGRES_PARTS
135   struct realtime_tracker *rt;
136   conn_q *cq; /* connection on which to perform this job */
137   eventer_t completion_event; /* This event should be registered if non NULL */
138 } ds_rt_detail;
139 typedef struct ds_line_detail {
140   /* Postgres specific stuff */
141   POSTGRES_PARTS
142   char *data;
143   int problematic;
144   struct ds_line_detail *next;
145 } ds_line_detail;
146
147 typedef struct {
148   noit_hash_table *ws;
149   eventer_t completion;
150 } syncset_t;
151
152 typedef struct {
153   char *remote_str;
154   char *remote_cn;
155   char *fqdn;
156   int storagenode_id;
157   int fd;
158   char *filename;
159   conn_pool *cpool;
160 } interim_journal_t;
161
162 static int stratcon_database_connect(conn_q *cq);
163 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
164 static int storage_node_quick_lookup(const char *uuid_str,
165                                      const char *remote_cn,
166                                      int *sid_out, int *storagenode_id_out,
167                                      const char **remote_cn_out,
168                                      const char **fqdn_out,
169                                      const char **dsn_out);
170
171 static void
172 free_params(ds_single_detail *d) {
173   int i;
174   for(i=0; i<d->nparams; i++)
175     if(d->paramAllocd[i] && d->paramValues[i])
176       free(d->paramValues[i]);
177 }
178
179 char *basejpath = NULL;
180 pthread_mutex_t ds_conns_lock;
181 noit_hash_table ds_conns;
182 noit_hash_table working_sets;
183
184 /* the fqdn cache needs to be thread safe */
185 typedef struct {
186   char *uuid_str;
187   char *remote_cn;
188   int storagenode_id;
189   int sid;
190 } uuid_info;
191 typedef struct {
192   int storagenode_id;
193   char *fqdn;
194   char *dsn;
195 } storagenode_info;
196 noit_hash_table uuid_to_info_cache;
197 pthread_mutex_t storagenode_to_info_cache_lock;
198 noit_hash_table storagenode_to_info_cache;
199
200 int
201 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
202   char name[128] = "";
203   buff[0] = '\0';
204   if(remote) {
205     int len = 0;
206     switch(remote->sa_family) {
207       case AF_INET:
208         len = sizeof(struct sockaddr_in);
209         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
210                   name, len);
211         break;
212       case AF_INET6:
213        len = sizeof(struct sockaddr_in6);
214         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
215                   name, len);
216        break;
217       case AF_UNIX:
218         len = SUN_LEN(((struct sockaddr_un *)remote));
219         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
220         break;
221       default: return 0;
222     }
223   }
224   strlcpy(buff, name, blen);
225   return strlen(buff);
226 }
227
228 /* Thread-safe connection pools */
229
230 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
231 static void
232 release_conn_q_forceable(conn_q *cq, int forcefree) {
233   int putback = 0;
234   cq->last_use = time(NULL);
235   pthread_mutex_lock(&cq->pool->lock);
236   cq->pool->outstanding--;
237   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
238     putback = 1;
239     cq->next = cq->pool->head;
240     cq->pool->head = cq;
241     cq->pool->in_pool++;
242   }
243   pthread_mutex_unlock(&cq->pool->lock);
244   noitL(ds_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
245         putback ? "to pool" : "and destroy", cq->pool->queue_name);
246   pthread_cond_signal(&cq->pool->cv);
247   if(putback) return;
248
249   /* Not put back, release it */
250   if(cq->dbh) PQfinish(cq->dbh);
251   if(cq->remote_str) free(cq->remote_str);
252   if(cq->remote_cn) free(cq->remote_cn);
253   if(cq->fqdn) free(cq->fqdn);
254   if(cq->dsn) free(cq->dsn);
255   free(cq);
256 }
257 static void
258 ttl_purge_conn_pool(conn_pool *pool) {
259   int old_cnt, new_cnt;
260   time_t now = time(NULL);
261   conn_q *cq, *prev = NULL, *iter;
262   /* because we always replace on the head and update the last_use time when
263      doing so, we know they are ordered LRU on the end.  So, once we hit an
264      old one, we know all the others are old too.
265    */
266   if(!pool->head) return; /* hack short circuit for no locks */
267   pthread_mutex_lock(&pool->lock);
268   old_cnt = pool->in_pool;
269   cq = pool->head;
270   while(cq) {
271     if(cq->last_use + cq->pool->ttl < now) {
272       if(prev) prev->next = NULL;
273       else pool->head = NULL;
274       break;
275     }
276     prev = cq;
277     cq = cq->next;
278   }
279   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
280   /* Fix accounting */
281   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
282   new_cnt = pool->in_pool;
283   pthread_mutex_unlock(&pool->lock);
284
285   /* Force release these without holding the lock */
286   while(cq) {
287     prev = cq;
288     cq = cq->next;
289     release_conn_q_forceable(cq, 1);
290   }
291   if(old_cnt != new_cnt)
292     noitL(ds_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
293           pool->queue_name);
294 }
295 static void
296 release_conn_q(conn_q *cq) {
297   ttl_purge_conn_pool(cq->pool);
298   release_conn_q_forceable(cq, 0);
299 }
300 static conn_pool *
301 get_conn_pool_for_remote(const char *remote_str,
302                          const char *remote_cn, const char *fqdn) {
303   void *vcpool;
304   conn_pool *cpool = NULL;
305   char queue_name[256] = "datastore_";
306   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
307            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
308            fqdn ? fqdn : "default",
309            remote_cn ? remote_cn : "default");
310   pthread_mutex_lock(&ds_conns_lock);
311   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
312                         strlen(queue_name), &vcpool))
313     cpool = vcpool;
314   pthread_mutex_unlock(&ds_conns_lock);
315   if(!cpool) {
316     vcpool = cpool = calloc(1, sizeof(*cpool));
317     cpool->queue_name = strdup(queue_name);
318     pthread_mutex_init(&cpool->lock, NULL);
319     pthread_cond_init(&cpool->cv, NULL);
320     cpool->in_pool = 0;
321     cpool->outstanding = 0;
322     cpool->max_in_pool = 1;
323     cpool->max_allocated = 1;
324     pthread_mutex_lock(&ds_conns_lock);
325     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
326                         cpool)) {
327       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
328                          strlen(queue_name), &vcpool);
329     }
330     pthread_mutex_unlock(&ds_conns_lock);
331     if(vcpool != cpool) {
332       /* someone beat us to it */
333       free(cpool->queue_name);
334       pthread_mutex_destroy(&cpool->lock);
335       pthread_cond_destroy(&cpool->cv);
336       free(cpool);
337     }
338     else {
339       int i;
340       /* Our job to setup the pool */
341       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
342       eventer_jobq_init(cpool->jobq, queue_name);
343       cpool->jobq->backq = eventer_default_backq();
344       /* Add one thread */
345       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
346         eventer_jobq_increase_concurrency(cpool->jobq);
347     }
348     cpool = vcpool;
349   }
350   return cpool;
351 }
352 static conn_q *
353 get_conn_q_for_remote(const char *remote_str,
354                       const char *remote_cn, const char *fqdn,
355                       const char *dsn) {
356   conn_pool *cpool;
357   conn_q *cq;
358   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
359   noitL(ds_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
360         cpool->queue_name);
361   pthread_mutex_lock(&cpool->lock);
362  again:
363   if(cpool->head) {
364     assert(cpool->in_pool > 0);
365     cq = cpool->head;
366     cpool->head = cq->next;
367     cpool->in_pool--;
368     cpool->outstanding++;
369     cq->next = NULL;
370     pthread_mutex_unlock(&cpool->lock);
371     return cq;
372   }
373   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
374     noitL(ds_deb, "[%p] over-subscribed, waiting [%s]\n",
375           (void *)pthread_self(), cpool->queue_name);
376     pthread_cond_wait(&cpool->cv, &cpool->lock);
377     noitL(ds_deb, "[%p] waking up and trying again [%s]\n",
378           (void *)pthread_self(), cpool->queue_name);
379     goto again;
380   }
381   else {
382     cpool->outstanding++;
383     pthread_mutex_unlock(&cpool->lock);
384   }
385  
386   cq = calloc(1, sizeof(*cq));
387   cq->pool = cpool;
388   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
389   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
390   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
391   cq->dsn = dsn ? strdup(dsn) : NULL;
392   return cq;
393 }
394 static conn_q *
395 get_conn_q_for_metanode() {
396   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
397 }
398
399 typedef enum {
400   DS_EXEC_SUCCESS = 0,
401   DS_EXEC_ROW_FAILED = 1,
402   DS_EXEC_TXN_FAILED = 2,
403 } execute_outcome_t;
404
405 static char *
406 __noit__strndup(const char *src, int len) {
407   int slen;
408   char *dst;
409   for(slen = 0; slen < len; slen++)
410     if(src[slen] == '\0') break;
411   dst = malloc(slen + 1);
412   memcpy(dst, src, slen);
413   dst[slen] = '\0';
414   return dst;
415 }
416 #define DECLARE_PARAM_STR(str, len) do { \
417   d->paramValues[d->nparams] = __noit__strndup(str, len); \
418   d->paramLengths[d->nparams] = len; \
419   d->paramFormats[d->nparams] = 0; \
420   d->paramAllocd[d->nparams] = 1; \
421   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
422     free(d->paramValues[d->nparams]); \
423     d->paramValues[d->nparams] = NULL; \
424     d->paramLengths[d->nparams] = 0; \
425     d->paramAllocd[d->nparams] = 0; \
426   } \
427   d->nparams++; \
428 } while(0)
429 #define DECLARE_PARAM_INT(i) do { \
430   int buffer__len; \
431   char buffer__[32]; \
432   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
433   buffer__len = strlen(buffer__); \
434   DECLARE_PARAM_STR(buffer__, buffer__len); \
435 } while(0)
436
437 #define PG_GET_STR_COL(dest, row, name) do { \
438   int colnum = PQfnumber(d->res, name); \
439   dest = NULL; \
440   if (colnum >= 0) \
441     dest = PQgetisnull(d->res, row, colnum) \
442          ? NULL : PQgetvalue(d->res, row, colnum); \
443 } while(0)
444
445 #define PG_EXEC(cmd) do { \
446   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
447                         (const char * const *)d->paramValues, \
448                         d->paramLengths, d->paramFormats, 0); \
449   d->rv = PQresultStatus(d->res); \
450   if(d->rv != PGRES_COMMAND_OK && \
451      d->rv != PGRES_TUPLES_OK) { \
452     noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s'\n", \
453           d->rv, PQresultErrorMessage(d->res), cmd); \
454     PQclear(d->res); \
455     goto bad_row; \
456   } \
457 } while(0)
458
459 #define PG_TM_EXEC(cmd, whence) do { \
460   time_t __w = whence; \
461   char cmdbuf[4096]; \
462   struct tm tbuf, *tm; \
463   tm = gmtime_r(&__w, &tbuf); \
464   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
465   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
466                         (const char * const *)d->paramValues, \
467                         d->paramLengths, d->paramFormats, 0); \
468   d->rv = PQresultStatus(d->res); \
469   if(d->rv != PGRES_COMMAND_OK && \
470      d->rv != PGRES_TUPLES_OK) { \
471     noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \
472           d->rv, PQresultErrorMessage(d->res), cmdbuf, \
473           (long long unsigned)whence); \
474     PQclear(d->res); \
475     goto bad_row; \
476   } \
477 } while(0)
478
479 static int
480 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
481                                     struct timeval *now) {
482   ds_single_detail *d;
483   int i, row_count = 0, good = 0;
484   char buff[1024];
485   conn_q *cq;
486   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
487   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
488
489   cq = get_conn_q_for_metanode();
490   stratcon_database_connect(cq);
491   d = calloc(1, sizeof(*d));
492   GET_QUERY(check_loadall);
493   PG_EXEC(check_loadall);
494   row_count = PQntuples(d->res);
495  
496   for(i=0; i<row_count; i++) {
497     int rv;
498     int8_t family;
499     struct sockaddr *sin;
500     struct sockaddr_in sin4 = { .sin_family = AF_INET };
501     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
502     char *remote, *id, *target, *module, *name;
503     PG_GET_STR_COL(remote, i, "remote_address");
504     PG_GET_STR_COL(id, i, "id");
505     PG_GET_STR_COL(target, i, "target");
506     PG_GET_STR_COL(module, i, "module");
507     PG_GET_STR_COL(name, i, "name");
508     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
509
510     family = AF_INET;
511     sin = (struct sockaddr *)&sin4;
512     rv = inet_pton(family, remote, &sin4.sin_addr);
513     if(rv != 1) {
514       family = AF_INET6;
515       sin = (struct sockaddr *)&sin6;
516       rv = inet_pton(family, remote, &sin6.sin6_addr);
517       if(rv != 1) {
518         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
519         sin = NULL;
520       }
521     }
522
523     /* stratcon_iep_line_processor takes an allocated operand and frees it */
524     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
525     good++;
526   }
527   noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count);
528  bad_row:
529   free_params((ds_single_detail *)d);
530   free(d);
531   release_conn_q(cq);
532   return 0;
533 }
534 void
535 stratcon_datastore_iep_check_preload() {
536   eventer_t e;
537   conn_pool *cpool;
538
539   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
540   e = eventer_alloc();
541   e->mask = EVENTER_ASYNCH;
542   e->callback = stratcon_datastore_asynch_drive_iep;
543   e->closure = NULL;
544   eventer_add_asynch(cpool->jobq, e);
545 }
546 execute_outcome_t
547 stratcon_datastore_find(ds_rt_detail *d) {
548   conn_q *cq;
549   char *val;
550   int row_count;
551   struct realtime_tracker *node;
552
553   for(node = d->rt; node; node = node->next) {
554     char uuid_str[UUID_STR_LEN+1];
555     const char *fqdn, *dsn, *remote_cn;
556     int storagenode_id;
557
558     uuid_unparse_lower(node->checkid, uuid_str);
559     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
560                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
561       continue;
562
563     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
564     stratcon_database_connect(cq);
565
566     GET_QUERY(check_find);
567     DECLARE_PARAM_INT(node->sid);
568     PG_EXEC(check_find);
569     row_count = PQntuples(d->res);
570     if(row_count != 1) {
571       PQclear(d->res);
572       goto bad_row;
573     }
574
575     /* Get the check uuid */
576     PG_GET_STR_COL(val, 0, "id");
577     if(!val) {
578       PQclear(d->res);
579       goto bad_row;
580     }
581     if(uuid_parse(val, node->checkid)) {
582       PQclear(d->res);
583       goto bad_row;
584     }
585  
586     /* Get the remote_address (which noit owns this) */
587     PG_GET_STR_COL(val, 0, "remote_address");
588     if(!val) {
589       PQclear(d->res);
590       goto bad_row;
591     }
592     node->noit = strdup(val);
593  
594    bad_row:
595     free_params((ds_single_detail *)d);
596     d->nparams = 0;
597     release_conn_q(cq);
598   }
599   return DS_EXEC_SUCCESS;
600 }
601 execute_outcome_t
602 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
603                            ds_line_detail *d) {
604   int type, len, sid;
605   char *final_buff;
606   uLong final_len, actual_final_len;
607   char *token;
608   char raddr_blank[1] = "";
609   const char *raddr;
610
611   type = d->data[0];
612   raddr = r ? r : raddr_blank;
613
614   /* Parse the log line, but only if we haven't already */
615   if(!d->nparams) {
616     char *scp, *ecp;
617
618     scp = d->data;
619 #define PROCESS_NEXT_FIELD(t,l) do { \
620   if(!*scp) goto bad_row; \
621   ecp = strchr(scp, '\t'); \
622   if(!ecp) goto bad_row; \
623   token = scp; \
624   len = (ecp-scp); \
625   scp = ecp + 1; \
626 } while(0)
627 #define PROCESS_LAST_FIELD(t,l) do { \
628   if(!*scp) ecp = scp; \
629   else { \
630     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
631     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
632   } \
633   t = scp; \
634   l = (ecp-scp); \
635 } while(0)
636
637     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
638     switch(type) {
639       /* See noit_check_log.c for log description */
640       case 'n':
641         DECLARE_PARAM_STR(raddr, strlen(raddr));
642         DECLARE_PARAM_STR("noitd",5); /* node_type */
643         PROCESS_NEXT_FIELD(token,len);
644         d->whence = (time_t)strtoul(token, NULL, 10);
645         DECLARE_PARAM_STR(token,len); /* timestamp */
646
647         /* This is the expected uncompressed len */
648         PROCESS_NEXT_FIELD(token,len);
649         final_len = atoi(token);
650         final_buff = malloc(final_len);
651         if(!final_buff) goto bad_row;
652  
653         /* The last token is b64 endoded and compressed.
654          * we need to decode it, declare it and then free it.
655          */
656         PROCESS_LAST_FIELD(token, len);
657         /* We can in-place decode this */
658         len = noit_b64_decode((char *)token, len,
659                               (unsigned char *)token, len);
660         if(len <= 0) {
661           noitL(noit_error, "noitd config base64 decoding error.\n");
662           free(final_buff);
663           goto bad_row;
664         }
665         actual_final_len = final_len;
666         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
667                               (unsigned char *)token, len)) {
668           noitL(noit_error, "noitd config decompression failure.\n");
669           free(final_buff);
670           goto bad_row;
671         }
672         if(final_len != actual_final_len) {
673           noitL(noit_error, "noitd config decompression error.\n");
674           free(final_buff);
675           goto bad_row;
676         }
677         DECLARE_PARAM_STR(final_buff, final_len);
678         free(final_buff);
679         break;
680       case 'C':
681         DECLARE_PARAM_STR(raddr, strlen(raddr));
682         PROCESS_NEXT_FIELD(token,len);
683         DECLARE_PARAM_STR(token,len); /* timestamp */
684         d->whence = (time_t)strtoul(token, NULL, 10);
685         PROCESS_NEXT_FIELD(token, len);
686         sid = uuid_to_sid(token, remote_cn);
687         if(sid == 0) goto bad_row;
688         DECLARE_PARAM_INT(sid); /* sid */
689         DECLARE_PARAM_STR(token,len); /* uuid */
690         PROCESS_NEXT_FIELD(token, len);
691         DECLARE_PARAM_STR(token,len); /* target */
692         PROCESS_NEXT_FIELD(token, len);
693         DECLARE_PARAM_STR(token,len); /* module */
694         PROCESS_LAST_FIELD(token, len);
695         DECLARE_PARAM_STR(token,len); /* name */
696         break;
697       case 'M':
698         PROCESS_NEXT_FIELD(token,len);
699         DECLARE_PARAM_STR(token,len); /* timestamp */
700         d->whence = (time_t)strtoul(token, NULL, 10);
701         PROCESS_NEXT_FIELD(token, len);
702         sid = uuid_to_sid(token, remote_cn);
703         if(sid == 0) goto bad_row;
704         DECLARE_PARAM_INT(sid); /* sid */
705         PROCESS_NEXT_FIELD(token, len);
706         DECLARE_PARAM_STR(token,len); /* name */
707         PROCESS_NEXT_FIELD(token,len);
708         d->metric_type = *token;
709         PROCESS_LAST_FIELD(token,len);
710         DECLARE_PARAM_STR(token,len); /* value */
711         break;
712       case 'S':
713         PROCESS_NEXT_FIELD(token,len);
714         DECLARE_PARAM_STR(token,len); /* timestamp */
715         d->whence = (time_t)strtoul(token, NULL, 10);
716         PROCESS_NEXT_FIELD(token, len);
717         sid = uuid_to_sid(token, remote_cn);
718         if(sid == 0) goto bad_row;
719         DECLARE_PARAM_INT(sid); /* sid */
720         PROCESS_NEXT_FIELD(token, len);
721         DECLARE_PARAM_STR(token,len); /* state */
722         PROCESS_NEXT_FIELD(token, len);
723         DECLARE_PARAM_STR(token,len); /* availability */
724         PROCESS_NEXT_FIELD(token, len);
725         DECLARE_PARAM_STR(token,len); /* duration */
726         PROCESS_LAST_FIELD(token,len);
727         DECLARE_PARAM_STR(token,len); /* status */
728         break;
729       default:
730         goto bad_row;
731     }
732
733   }
734
735   /* Now execute the query */
736   switch(type) {
737     case 'n':
738       GET_QUERY(config_insert);
739       PG_EXEC(config_insert);
740       PQclear(d->res);
741       break;
742     case 'C':
743       GET_QUERY(check_insert);
744       PG_EXEC(check_insert);
745       PQclear(d->res);
746       break;
747     case 'S':
748       GET_QUERY(status_insert);
749       PG_TM_EXEC(status_insert, d->whence);
750       PQclear(d->res);
751       break;
752     case 'M':
753       switch(d->metric_type) {
754         case METRIC_INT32:
755         case METRIC_UINT32:
756         case METRIC_INT64:
757         case METRIC_UINT64:
758         case METRIC_DOUBLE:
759           GET_QUERY(metric_insert_numeric);
760           PG_TM_EXEC(metric_insert_numeric, d->whence);
761           PQclear(d->res);
762           break;
763         case METRIC_STRING:
764           GET_QUERY(metric_insert_text);
765           PG_TM_EXEC(metric_insert_text, d->whence);
766           PQclear(d->res);
767           break;
768         default:
769           goto bad_row;
770       }
771       break;
772     default:
773       /* should never get here */
774       goto bad_row;
775   }
776   return DS_EXEC_SUCCESS;
777  bad_row:
778   return DS_EXEC_ROW_FAILED;
779 }
780 static int
781 stratcon_database_post_connect(conn_q *cq) {
782   int rv = 0;
783   ds_single_detail _d = { 0 }, *d = &_d;
784   if(cq->remote_str) {
785     char *remote_str, *remote_cn;
786     /* This is the silly way we get null's in through our declare_param_str */
787     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
788     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
789     /* This is a storage node, it gets the storage node post_connect */
790     GET_QUERY(storage_post_connect);
791     rv = -1; /* now we're serious */
792     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
793     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
794     PG_EXEC(storage_post_connect);
795     PQclear(d->res);
796     rv = 0;
797   }
798   else {
799     /* Metanode post_connect */
800     GET_QUERY(metanode_post_connect);
801     rv = -1; /* now we're serious */
802     PG_EXEC(metanode_post_connect);
803     PQclear(d->res);
804     rv = 0;
805   }
806  bad_row:
807   free_params(d);
808   if(rv == -1) {
809     /* Post-connect intentions are serious and fatal */
810     PQfinish(cq->dbh);
811     cq->dbh = NULL;
812   }
813   return rv;
814 }
815 static int
816 stratcon_database_connect(conn_q *cq) {
817   char *dsn, dsn_meta[512];
818   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
819   const char *k, *v;
820   int klen;
821   noit_hash_table *t;
822
823   dsn_meta[0] = '\0';
824   if(!cq->dsn) {
825     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
826     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
827       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
828       strlcat(dsn_meta, k, sizeof(dsn_meta));
829       strlcat(dsn_meta, "=", sizeof(dsn_meta));
830       strlcat(dsn_meta, v, sizeof(dsn_meta));
831     }
832     noit_hash_destroy(t, free, free);
833     free(t);
834     dsn = dsn_meta;
835   }
836   else dsn = cq->dsn;
837
838   if(cq->dbh) {
839     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
840     PQreset(cq->dbh);
841     if(stratcon_database_post_connect(cq)) return -1;
842     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
843     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
844           dsn, PQerrorMessage(cq->dbh));
845     return -1;
846   }
847
848   cq->dbh = PQconnectdb(dsn);
849   if(!cq->dbh) return -1;
850   if(stratcon_database_post_connect(cq)) return -1;
851   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
852   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
853         dsn, PQerrorMessage(cq->dbh));
854   return -1;
855 }
856 static int
857 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
858                                 const char *name) {
859   int rv = -1;
860   PGresult *res;
861   char cmd[128];
862   strlcpy(cmd, p, sizeof(cmd));
863   strlcat(cmd, name, sizeof(cmd));
864   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
865   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
866   PQclear(res);
867   return rv;
868 }
869 static int
870 stratcon_datastore_do(conn_q *cq, const char *cmd) {
871   PGresult *res;
872   int rv = -1;
873   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
874   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
875   PQclear(res);
876   return rv;
877 }
878 #define BUSTED(cq) do { \
879   PQfinish((cq)->dbh); \
880   (cq)->dbh = NULL; \
881   goto full_monty; \
882 } while(0)
883 #define SAVEPOINT(name) do { \
884   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
885   last_sp = current; \
886 } while(0)
887 #define ROLLBACK_TO_SAVEPOINT(name) do { \
888   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
889     BUSTED(cq); \
890   last_sp = NULL; \
891 } while(0)
892 #define RELEASE_SAVEPOINT(name) do { \
893   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
894     BUSTED(cq); \
895   last_sp = NULL; \
896 } while(0)
897 int
898 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
899                                  struct timeval *now) {
900   ds_rt_detail *dsjd = closure;
901   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
902   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
903
904   assert(dsjd->rt);
905   stratcon_datastore_find(dsjd);
906   if(dsjd->completion_event)
907     eventer_add(dsjd->completion_event);
908
909   free_params((ds_single_detail *)dsjd);
910   free(dsjd);
911   return 0;
912 }
913 static const char *
914 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
915   void *vinfo;
916   const char *dsn = NULL, *fqdn = NULL;
917   int found = 0;
918   storagenode_info *info = NULL;
919   pthread_mutex_lock(&storagenode_to_info_cache_lock);
920   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
921                         &vinfo)) {
922     found = 1;
923     info = vinfo;
924   }
925   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
926   if(found) {
927     if(fqdn_out) *fqdn_out = info->fqdn;
928     return info->dsn;
929   }
930
931   if(!found && can_use_db) {
932     ds_single_detail *d;
933     conn_q *cq;
934     int row_count;
935     /* Look it up and store it */
936     d = calloc(1, sizeof(*d));
937     cq = get_conn_q_for_metanode();
938     GET_QUERY(find_storage);
939     DECLARE_PARAM_INT(id);
940     PG_EXEC(find_storage);
941     row_count = PQntuples(d->res);
942     if(row_count) {
943       PG_GET_STR_COL(dsn, 0, "dsn");
944       PG_GET_STR_COL(fqdn, 0, "fqdn");
945     }
946     PQclear(d->res);
947    bad_row:
948     free_params(d);
949     free(d);
950     release_conn_q(cq);
951   }
952   if(fqdn) {
953     info = calloc(1, sizeof(*info));
954     info->fqdn = strdup(fqdn);
955     if(fqdn_out) *fqdn_out = info->fqdn;
956     info->dsn = dsn ? strdup(dsn) : NULL;
957     info->storagenode_id = id;
958     pthread_mutex_lock(&storagenode_to_info_cache_lock);
959     noit_hash_store(&storagenode_to_info_cache,
960                     (void *)&info->storagenode_id, sizeof(int), info);
961     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
962   }
963   return info ? info->dsn : NULL;
964 }
965 static ds_line_detail *
966 build_insert_batch(interim_journal_t *ij) {
967   int rv;
968   off_t len;
969   const char *buff, *cp, *lcp;
970   struct stat st;
971   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
972
973   if(ij->fd < 0) {
974     ij->fd = open(ij->filename, O_RDONLY);
975     if(ij->fd < 0) {
976       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
977             ij->filename, strerror(errno));
978       assert(ij->fd >= 0);
979     }
980   }
981   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
982   assert(rv != -1);
983   len = st.st_size;
984   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
985   if(buff == (void *)-1) {
986     noitL(noit_error, "mmap(%s) => %s\n", ij->filename, strerror(errno));
987     assert(buff != (void *)-1);
988   }
989   lcp = buff;
990   while(lcp < (buff + len) &&
991         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
992     next = calloc(1, sizeof(*next));
993     next->data = malloc(cp - lcp + 1);
994     memcpy(next->data, lcp, cp - lcp);
995     next->data[cp - lcp] = '\0';
996     if(!head) head = next;
997     if(last) last->next = next;
998     last = next;
999     lcp = cp + 1;
1000   }
1001   munmap((void *)buff, len);
1002   close(ij->fd);
1003   return head;
1004 }
1005 static void
1006 interim_journal_remove(interim_journal_t *ij) {
1007   unlink(ij->filename);
1008   if(ij->filename) free(ij->filename);
1009   if(ij->remote_str) free(ij->remote_str);
1010   if(ij->remote_cn) free(ij->remote_cn);
1011   if(ij->fqdn) free(ij->fqdn);
1012 }
1013 int
1014 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1015                                   struct timeval *now) {
1016   int i;
1017   interim_journal_t *ij;
1018   ds_line_detail *head, *current, *last_sp;
1019   const char *dsn;
1020   conn_q *cq;
1021   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1022   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1023
1024   ij = closure;
1025   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1026   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1027                              ij->fqdn, dsn);
1028   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1029         ij->remote_str, ij->remote_cn, ij->fqdn);
1030  full_monty:
1031   /* Make sure we have a connection */
1032   i = 1;
1033   while(stratcon_database_connect(cq)) {
1034     noitL(noit_error, "Error connecting to database\n");
1035     sleep(i);
1036     i *= 2;
1037     i = MIN(i, 16);
1038   }
1039
1040   head = build_insert_batch(ij);
1041   current = head;
1042   last_sp = NULL;
1043   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1044   while(current) {
1045     execute_outcome_t rv;
1046     if(current->data) {
1047       if(!last_sp) SAVEPOINT("batch");
1048  
1049       if(current->problematic) {
1050         RELEASE_SAVEPOINT("batch");
1051         current = current->next;
1052         continue;
1053       }
1054       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1055                                       current);
1056       switch(rv) {
1057         case DS_EXEC_SUCCESS:
1058           current = current->next;
1059           break;
1060         case DS_EXEC_ROW_FAILED:
1061           /* rollback to savepoint, mark this record as bad and start again */
1062           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1063           current->problematic = 1;
1064           current = last_sp;
1065           ROLLBACK_TO_SAVEPOINT("batch");
1066           break;
1067         case DS_EXEC_TXN_FAILED:
1068           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1069           BUSTED(cq);
1070       }
1071     }
1072   }
1073   if(last_sp) RELEASE_SAVEPOINT("batch");
1074   if(stratcon_datastore_do(cq, "COMMIT")) {
1075     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1076     BUSTED(cq);
1077   }
1078   /* Cleanup the mess */
1079   while(head) {
1080     ds_line_detail *tofree;
1081     tofree = head;
1082     head = head->next;
1083     if(tofree->data) free(tofree->data);
1084     free_params((ds_single_detail *)tofree);
1085     free(tofree);
1086   }
1087   interim_journal_remove(ij);
1088   release_conn_q(cq);
1089   return 0;
1090 }
1091 static int
1092 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1093                                 struct timeval *now) {
1094   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1095   const char *k;
1096   int klen;
1097   void *vij;
1098   interim_journal_t *ij;
1099   syncset_t *syncset = closure;
1100
1101   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1102   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1103
1104   noitL(ds_deb, "Syncing journal sets...\n");
1105   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1106     eventer_t ingest;
1107     ij = vij;
1108     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1109           ij->remote_str, ij->remote_cn, ij->fqdn);
1110     fsync(ij->fd);
1111     close(ij->fd);
1112     ij->fd = -1;
1113     ingest = eventer_alloc();
1114     ingest->mask = EVENTER_ASYNCH;
1115     ingest->callback = stratcon_datastore_asynch_execute;
1116     ingest->closure = ij;
1117     eventer_add_asynch(ij->cpool->jobq, ingest);
1118   }
1119   noit_hash_destroy(syncset->ws, free, NULL);
1120   free(syncset->ws);
1121   eventer_add(syncset->completion);
1122   free(syncset);
1123   return 0;
1124 }
1125 static interim_journal_t *
1126 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1127                     int storagenode_id, const char *fqdn_in) {
1128   void *vhash, *vij;
1129   noit_hash_table *working_set;
1130   interim_journal_t *ij;
1131   struct timeval now;
1132   char jpath[PATH_MAX];
1133   char remote_str[128];
1134   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1135   const char *fqdn = fqdn_in ? fqdn_in : "default";
1136
1137   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1138   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1139
1140   /* Lookup the working set */
1141   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1142     working_set = calloc(1, sizeof(*working_set));
1143     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1144                     working_set);
1145   }
1146   else
1147     working_set = vhash;
1148
1149   /* Lookup the interim journal within the working set */
1150   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1151     ij = calloc(1, sizeof(*ij));
1152     gettimeofday(&now, NULL);
1153     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1154              basejpath, remote_str, remote_cn, storagenode_id,
1155              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1156     ij->remote_str = strdup(remote_str);
1157     ij->remote_cn = strdup(remote_cn);
1158     ij->fqdn = strdup(fqdn);
1159     ij->storagenode_id = storagenode_id;
1160     ij->filename = strdup(jpath);
1161     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1162                                          ij->fqdn);
1163     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1164     if(ij->fd < 0 && errno == ENOENT) {
1165       if(mkdir_for_file(ij->filename, 0750)) {
1166         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1167               ij->filename, strerror(errno));
1168         exit(-1);
1169       }
1170       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1171     }
1172     if(ij->fd < 0) {
1173       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1174             ij->filename, strerror(errno));
1175       exit(-1);
1176     }
1177     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1178   }
1179   else
1180     ij = vij;
1181
1182   return ij;
1183 }
1184 static int
1185 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1186                           int *sid_out, int *storagenode_id_out,
1187                           const char **remote_cn_out,
1188                           const char **fqdn_out, const char **dsn_out) {
1189   /* only called from the main thread -- no safety issues */
1190   void *vuuidinfo, *vinfo;
1191   uuid_info *uuidinfo;
1192   storagenode_info *info = NULL;
1193   char *fqdn = NULL;
1194   char *dsn = NULL;
1195   int storagenode_id = 0, sid = 0;
1196   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1197                          &vuuidinfo)) {
1198     int row_count = 0;
1199     char *tmpint;
1200     ds_single_detail *d;
1201     conn_q *cq;
1202
1203     /* We can't do a database lookup without the remote_cn */
1204     if(!remote_cn) return -1;
1205
1206     d = calloc(1, sizeof(*d));
1207     cq = get_conn_q_for_metanode();
1208     if(stratcon_database_connect(cq) == 0) {
1209       /* Blocking call to service the cache miss */
1210       GET_QUERY(check_map);
1211       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1212       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1213       PG_EXEC(check_map);
1214       row_count = PQntuples(d->res);
1215       if(row_count != 1) {
1216         PQclear(d->res);
1217         goto bad_row;
1218       }
1219       PG_GET_STR_COL(tmpint, 0, "sid");
1220       sid = atoi(tmpint);
1221       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1222       if(tmpint) storagenode_id = atoi(tmpint);
1223       PG_GET_STR_COL(fqdn, 0, "fqdn");
1224       PG_GET_STR_COL(dsn, 0, "dsn");
1225       PQclear(d->res);
1226     }
1227    bad_row:
1228     free_params((ds_single_detail *)d);
1229     free(d);
1230     release_conn_q(cq);
1231     if(row_count != 1) return -1;
1232     /* Place in cache */
1233     if(fqdn) fqdn = strdup(fqdn);
1234     uuidinfo = calloc(1, sizeof(*uuidinfo));
1235     uuidinfo->sid = sid;
1236     uuidinfo->uuid_str = strdup(uuid_str);
1237     uuidinfo->remote_cn = strdup(remote_cn);
1238     noit_hash_store(&uuid_to_info_cache,
1239                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1240     /* Also, we may have just witnessed a new storage node, store it */
1241     if(storagenode_id) {
1242       int needs_free = 0;
1243       info = calloc(1, sizeof(*info));
1244       info->storagenode_id = storagenode_id;
1245       info->dsn = dsn ? strdup(dsn) : NULL;
1246       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1247       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1248       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1249                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1250         /* hack to save memory -- we *never* remove from these caches,
1251            so we can use the same fqdn value in the above cache for the key
1252            in the cache below -- (no strdup) */
1253         noit_hash_store(&storagenode_to_info_cache,
1254                         (void *)&info->storagenode_id, sizeof(int), info);
1255       }
1256       else needs_free = 1;
1257       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1258       if(needs_free) {
1259         if(info->dsn) free(info->dsn);
1260         if(info->fqdn) free(info->fqdn);
1261         free(info);
1262       }
1263     }
1264   }
1265   else
1266     uuidinfo = vuuidinfo;
1267
1268   if(storagenode_id) {
1269     if(uuidinfo &&
1270        ((!dsn && dsn_out) || (!fqdn && fqdn_out))) {
1271       /* we don't have dsn and we actually want it */
1272       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1273       if(noit_hash_retrieve(&storagenode_to_info_cache,
1274                             (void *)&storagenode_id, sizeof(int), &vinfo))
1275         info = vinfo;
1276       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1277     }
1278   }
1279
1280   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1281   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1282   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1283   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1284   if(sid_out) *sid_out = uuidinfo->sid;
1285   return 0;
1286 }
1287 static int
1288 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1289   char uuid_str[UUID_STR_LEN+1];
1290   int sid = 0;
1291   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1292   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1293   return sid;
1294 }
1295 static void
1296 stratcon_datastore_journal(struct sockaddr *remote,
1297                            const char *remote_cn, const char *line) {
1298   interim_journal_t *ij = NULL;
1299   char uuid_str[UUID_STR_LEN+1], *cp;
1300   const char *fqdn, *dsn;
1301   int storagenode_id = 0;
1302   uuid_t checkid;
1303   if(!line) return;
1304   /* if it is a UUID based thing, find the storage node */
1305   switch(*line) {
1306     case 'C':
1307     case 'S':
1308     case 'M':
1309       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1310         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1311         if(!uuid_parse(uuid_str, checkid)) {
1312           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1313                                     &storagenode_id, NULL, &fqdn, &dsn);
1314           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1315         }
1316       }
1317       break;
1318     case 'n':
1319       ij = interim_journal_get(remote,remote_cn,0,NULL);
1320       break;
1321     default:
1322       break;
1323   }
1324   if(!ij && fqdn) {
1325     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1326   }
1327   else {
1328     int len;
1329     len = write(ij->fd, line, strlen(line));
1330     if(len < 0) {
1331       noitL(noit_error, "write to %s failed: %s\n",
1332             ij->filename, strerror(errno));
1333     }
1334   }
1335   return;
1336 }
1337 static noit_hash_table *
1338 stratcon_datastore_journal_remove(struct sockaddr *remote,
1339                                   const char *remote_cn) {
1340   void *vhash = NULL;
1341   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1342     /* pluck it out */
1343     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1344   }
1345   else {
1346     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1347           remote_cn);
1348     abort();
1349   }
1350   return vhash;
1351 }
1352 void
1353 stratcon_datastore_push(stratcon_datastore_op_t op,
1354                         struct sockaddr *remote,
1355                         const char *remote_cn, void *operand,
1356                         eventer_t completion) {
1357   conn_pool *cpool;
1358   syncset_t *syncset;
1359   eventer_t e;
1360   ds_rt_detail *rtdetail;
1361   struct datastore_onlooker_list *nnode;
1362
1363   for(nnode = onlookers; nnode; nnode = nnode->next)
1364     nnode->dispatch(op,remote,remote_cn,operand);
1365
1366   switch(op) {
1367     case DS_OP_INSERT:
1368       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1369       break;
1370     case DS_OP_CHKPT:
1371       e = eventer_alloc();
1372       syncset = calloc(1, sizeof(*syncset));
1373       e->mask = EVENTER_ASYNCH;
1374       e->callback = stratcon_datastore_journal_sync;
1375       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1376       syncset->completion = completion;
1377       e->closure = syncset;
1378       eventer_add(e);
1379       break;
1380     case DS_OP_FIND_COMPLETE:
1381       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1382       rtdetail = calloc(1, sizeof(*rtdetail));
1383       rtdetail->rt = operand;
1384       rtdetail->completion_event = completion;
1385       e = eventer_alloc();
1386       e->mask = EVENTER_ASYNCH;
1387       e->callback = stratcon_datastore_asynch_lookup;
1388       e->closure = rtdetail;
1389       eventer_add_asynch(cpool->jobq, e);
1390       break;
1391   }
1392 }
1393
1394 int
1395 stratcon_datastore_saveconfig(void *unused) {
1396   int rv = -1;
1397   char *buff;
1398   ds_single_detail _d = { 0 }, *d = &_d;
1399   conn_q *cq;
1400   cq = get_conn_q_for_metanode();
1401
1402   if(stratcon_database_connect(cq) == 0) {
1403     char time_as_str[20];
1404     size_t len;
1405     buff = noit_conf_xml_in_mem(&len);
1406     if(!buff) goto bad_row;
1407
1408     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1409     DECLARE_PARAM_STR("0.0.0.0", 7);
1410     DECLARE_PARAM_STR("stratcond", 9);
1411     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1412     DECLARE_PARAM_STR(buff, len);
1413     free(buff);
1414
1415     GET_QUERY(config_insert);
1416     PG_EXEC(config_insert);
1417     PQclear(d->res);
1418     rv = 0;
1419
1420     bad_row:
1421       free_params(d);
1422   }
1423   release_conn_q(cq);
1424   return rv;
1425 }
1426
1427 void
1428 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1429                                                struct sockaddr *,
1430                                                const char *, void *)) {
1431   struct datastore_onlooker_list *nnode;
1432   nnode = calloc(1, sizeof(*nnode));
1433   nnode->dispatch = f;
1434   nnode->next = onlookers;
1435   while(noit_atomic_casptr((volatile void **)&onlookers, nnode, nnode->next) != (void *)nnode->next)
1436     nnode->next = onlookers;
1437 }
1438 static void
1439 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1440                                          char *id_str, char *file) {
1441   char path[PATH_MAX];
1442   interim_journal_t *ij;
1443   eventer_t ingest;
1444
1445   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1446            basejpath, remote_str, remote_cn, id_str, file);
1447   ij = calloc(1, sizeof(*ij));
1448   ij->fd = open(path, O_RDONLY);
1449   if(ij->fd < 0) {
1450     noitL(noit_error, "cannot open journal '%s': %s\n",
1451           path, strerror(errno));
1452     free(ij);
1453     return;
1454   }
1455   close(ij->fd);
1456   ij->fd = -1;
1457   ij->filename = strdup(path);
1458   ij->remote_str = strdup(remote_str);
1459   ij->remote_cn = strdup(remote_cn);
1460   ij->storagenode_id = atoi(id_str);
1461   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1462                                        ij->fqdn);
1463   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1464   ingest = eventer_alloc();
1465   ingest->mask = EVENTER_ASYNCH;
1466   ingest->callback = stratcon_datastore_asynch_execute;
1467   ingest->closure = ij;
1468   eventer_add_asynch(ij->cpool->jobq, ingest);
1469 }
1470 static void
1471 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1472   char path[PATH_MAX];
1473   DIR *root;
1474   struct dirent de, *entry;
1475   int i = 0, cnt = 0;
1476   char **entries;
1477
1478   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1479            first ? "/" : "", first ? first : "",
1480            second ? "/" : "", second ? second : "",
1481            third ? "/" : "", third ? third : "");
1482   root = opendir(path);
1483   if(!root) return;
1484   while(readdir_r(root, &de, &entry) == 0 && entry != NULL) cnt++;
1485   rewinddir(root);
1486   entries = malloc(sizeof(*entries) * cnt);
1487   while(readdir_r(root, &de, &entry) == 0 && entry != NULL) {
1488     if(i < cnt) {
1489       entries[i++] = strdup(entry->d_name);
1490     }
1491   }
1492   closedir(root);
1493   cnt = i; /* could have changed, directories are fickle */
1494   qsort(entries, i, sizeof(*entries),
1495         (int (*)(const void *, const void *))strcasecmp);
1496   for(i=0; i<cnt; i++) {
1497     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1498     if(!first)
1499       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1500     else if(!second)
1501       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1502     else if(!third)
1503       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1504     else if(strlen(entries[i]) == 16)
1505       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1506   }
1507 }
1508 static void
1509 stratcon_datastore_sweep_journals() {
1510   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1511 }
1512
1513 int
1514 stratcon_datastore_ingest_all_storagenode_info() {
1515   int i, cnt;
1516   ds_single_detail _d = { 0 }, *d = &_d;
1517   conn_q *cq;
1518   cq = get_conn_q_for_metanode();
1519
1520   while(stratcon_database_connect(cq)) {
1521     noitL(noit_error, "Error connecting to database\n");
1522     sleep(1);
1523   }
1524
1525   GET_QUERY(all_storage);
1526   PG_EXEC(all_storage);
1527   cnt = PQntuples(d->res);
1528   for(i=0; i<cnt; i++) {
1529     void *vinfo;
1530     char *tmpint, *fqdn, *dsn;
1531     int storagenode_id;
1532     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1533     storagenode_id = atoi(tmpint);
1534     PG_GET_STR_COL(fqdn, i, "fqdn");
1535     PG_GET_STR_COL(dsn, i, "dsn");
1536     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1537     storagenode_id = tmpint ? atoi(tmpint) : 0;
1538
1539     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1540                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1541       storagenode_info *info;
1542       info = calloc(1, sizeof(*info));
1543       info->storagenode_id = storagenode_id;
1544       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1545       info->dsn = dsn ? strdup(dsn) : NULL;
1546       noit_hash_store(&storagenode_to_info_cache,
1547                       (void *)&info->storagenode_id, sizeof(int), info);
1548     }
1549   }
1550   PQclear(d->res);
1551  bad_row:
1552   free_params(d);
1553
1554   release_conn_q(cq);
1555   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1556   return cnt;
1557 }
1558 int
1559 stratcon_datastore_ingest_all_check_info() {
1560   int i, cnt, loaded = 0;
1561   ds_single_detail _d = { 0 }, *d = &_d;
1562   conn_q *cq;
1563   cq = get_conn_q_for_metanode();
1564
1565   while(stratcon_database_connect(cq)) {
1566     noitL(noit_error, "Error connecting to database\n");
1567     sleep(1);
1568   }
1569
1570   GET_QUERY(check_mapall);
1571   PG_EXEC(check_mapall);
1572   cnt = PQntuples(d->res);
1573   for(i=0; i<cnt; i++) {
1574     void *vinfo;
1575     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1576     int sid, storagenode_id;
1577     uuid_info *uuidinfo;
1578     PG_GET_STR_COL(uuid_str, i, "id");
1579     if(!uuid_str) continue;
1580     PG_GET_STR_COL(tmpint, i, "sid");
1581     if(!tmpint) continue;
1582     sid = atoi(tmpint);
1583     PG_GET_STR_COL(fqdn, i, "fqdn");
1584     PG_GET_STR_COL(dsn, i, "dsn");
1585     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1586     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1587     storagenode_id = tmpint ? atoi(tmpint) : 0;
1588
1589     uuidinfo = calloc(1, sizeof(*uuidinfo));
1590     uuidinfo->uuid_str = strdup(uuid_str);
1591     uuidinfo->remote_cn = strdup(remote_cn);
1592     uuidinfo->sid = sid;
1593     noit_hash_store(&uuid_to_info_cache,
1594                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1595     loaded++;
1596     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1597                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1598       storagenode_info *info;
1599       info = calloc(1, sizeof(*info));
1600       info->storagenode_id = storagenode_id;
1601       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1602       info->dsn = dsn ? strdup(dsn) : NULL;
1603       noit_hash_store(&storagenode_to_info_cache,
1604                       (void *)&info->storagenode_id, sizeof(int), info);
1605     }
1606   }
1607   PQclear(d->res);
1608  bad_row:
1609   free_params(d);
1610
1611   release_conn_q(cq);
1612   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1613   return loaded;
1614 }
1615 void
1616 stratcon_datastore_init() {
1617   pthread_mutex_init(&ds_conns_lock, NULL);
1618   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1619   ds_err = noit_log_stream_find("error/datastore");
1620   ds_deb = noit_log_stream_find("debug/datastore");
1621   ingest_err = noit_log_stream_find("error/ingest");
1622   if(!ds_err) ds_err = noit_error;
1623   if(!ingest_err) ingest_err = noit_error;
1624   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1625                            &basejpath)) {
1626     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1627     exit(-1);
1628   }
1629   stratcon_datastore_ingest_all_check_info();
1630   stratcon_datastore_ingest_all_storagenode_info();
1631   stratcon_datastore_sweep_journals();
1632 }
Note: See TracBrowser for help on using the browser.