root/src/stratcon_datastore.c

Revision cdb26b49e531703260a05d626a2e65335ad2d8db, 48.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

leaking filedescriptor issue, refs #150

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_realtime_http.h"
41 #include "stratcon_iep.h"
42 #include "noit_conf.h"
43 #include "noit_check.h"
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <netinet/in.h>
47 #include <sys/un.h>
48 #include <dirent.h>
49 #include <arpa/inet.h>
50 #include <sys/mman.h>
51 #include <libpq-fe.h>
52 #include <zlib.h>
53 #include <assert.h>
54 #include <errno.h>
55
56 #define DECL_STMT(codename,confname) \
57 static char *codename = NULL; \
58 static const char *codename##_conf = "/stratcon/database/statements/" #confname
59
60 DECL_STMT(storage_post_connect, storagepostconnect);
61 DECL_STMT(metanode_post_connect, metanodepostconnect);
62 DECL_STMT(find_storage, findstoragenode);
63 DECL_STMT(all_storage, allstoragenodes);
64 DECL_STMT(check_map, mapchecktostoragenode);
65 DECL_STMT(check_mapall, mapallchecks);
66 DECL_STMT(check_loadall, allchecks);
67 DECL_STMT(check_find, findcheck);
68 DECL_STMT(check_insert, check);
69 DECL_STMT(status_insert, status);
70 DECL_STMT(metric_insert_numeric, metric_numeric);
71 DECL_STMT(metric_insert_text, metric_text);
72 DECL_STMT(config_insert, config);
73
74 static noit_log_stream_t ds_err = NULL;
75 static noit_log_stream_t ingest_err = NULL;
76
77 static struct datastore_onlooker_list {
78   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
79                    const char *, void *);
80   struct datastore_onlooker_list *next;
81 } *onlookers = NULL;
82
83 #define GET_QUERY(a) do { \
84   if(a == NULL) \
85     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
86       goto bad_row; \
87 } while(0)
88
89 struct conn_q;
90
91 typedef struct {
92   char            *queue_name; /* the key fqdn+remote_sn */
93   eventer_jobq_t  *jobq;
94   struct conn_q   *head;
95   pthread_mutex_t  lock;
96   pthread_cond_t   cv;
97   int              ttl;
98   int              in_pool;
99   int              outstanding;
100   int              max_allocated;
101   int              max_in_pool;
102 } conn_pool;
103 typedef struct conn_q {
104   time_t           last_use;
105   char            *dsn;        /* Pg connect string */
106   char            *remote_str; /* the IP of the noit*/
107   char            *remote_cn;  /* the Cert CN of the noit */
108   char            *fqdn;       /* the fqdn of the storage node */
109   conn_pool       *pool;
110   struct conn_q   *next;
111   /* Postgres specific stuff */
112   PGconn          *dbh;
113 } conn_q;
114
115
116 #define MAX_PARAMS 8
117 #define POSTGRES_PARTS \
118   PGresult *res; \
119   int rv; \
120   time_t whence; \
121   int nparams; \
122   int metric_type; \
123   char *paramValues[MAX_PARAMS]; \
124   int paramLengths[MAX_PARAMS]; \
125   int paramFormats[MAX_PARAMS]; \
126   int paramAllocd[MAX_PARAMS];
127
128 typedef struct ds_single_detail {
129   POSTGRES_PARTS
130 } ds_single_detail;
131 typedef struct {
132   /* Postgres specific stuff */
133   POSTGRES_PARTS
134   struct realtime_tracker *rt;
135   conn_q *cq; /* connection on which to perform this job */
136   eventer_t completion_event; /* This event should be registered if non NULL */
137 } ds_rt_detail;
138 typedef struct ds_line_detail {
139   /* Postgres specific stuff */
140   POSTGRES_PARTS
141   char *data;
142   int problematic;
143   struct ds_line_detail *next;
144 } ds_line_detail;
145
146 typedef struct {
147   noit_hash_table *ws;
148   eventer_t completion;
149 } syncset_t;
150
151 typedef struct {
152   char *remote_str;
153   char *remote_cn;
154   char *fqdn;
155   int storagenode_id;
156   int fd;
157   char *filename;
158   conn_pool *cpool;
159 } interim_journal_t;
160
161 static int stratcon_database_connect(conn_q *cq);
162 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
163
164 static void
165 free_params(ds_single_detail *d) {
166   int i;
167   for(i=0; i<d->nparams; i++)
168     if(d->paramAllocd[i] && d->paramValues[i])
169       free(d->paramValues[i]);
170 }
171
172 char *basejpath = NULL;
173 pthread_mutex_t ds_conns_lock;
174 noit_hash_table ds_conns;
175 noit_hash_table working_sets;
176
177 /* the fqdn cache needs to be thread safe */
178 typedef struct {
179   char *uuid_str;
180   int storagenode_id;
181   int sid;
182 } uuid_info;
183 typedef struct {
184   int storagenode_id;
185   char *fqdn;
186   char *dsn;
187 } storagenode_info;
188 noit_hash_table uuid_to_info_cache;
189 pthread_mutex_t storagenode_to_info_cache_lock;
190 noit_hash_table storagenode_to_info_cache;
191
192 int
193 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
194   char name[128] = "";
195   buff[0] = '\0';
196   if(remote) {
197     int len = 0;
198     switch(remote->sa_family) {
199       case AF_INET:
200         len = sizeof(struct sockaddr_in);
201         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
202                   name, len);
203         break;
204       case AF_INET6:
205        len = sizeof(struct sockaddr_in6);
206         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
207                   name, len);
208        break;
209       case AF_UNIX:
210         len = SUN_LEN(((struct sockaddr_un *)remote));
211         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
212         break;
213       default: return 0;
214     }
215   }
216   strlcpy(buff, name, blen);
217   return strlen(buff);
218 }
219
220 /* Thread-safe connection pools */
221
222 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
223 static void
224 release_conn_q_forceable(conn_q *cq, int forcefree) {
225   int putback = 0;
226   cq->last_use = time(NULL);
227   pthread_mutex_lock(&cq->pool->lock);
228   cq->pool->outstanding--;
229   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
230     putback = 1;
231     cq->next = cq->pool->head;
232     cq->pool->head = cq;
233     cq->pool->in_pool++;
234   }
235   pthread_mutex_unlock(&cq->pool->lock);
236   noitL(noit_debug, "[%p] release %s [%s]\n", (void *)pthread_self(),
237         putback ? "to pool" : "and destroy", cq->pool->queue_name);
238   pthread_cond_signal(&cq->pool->cv);
239   if(putback) return;
240
241   /* Not put back, release it */
242   if(cq->dbh) PQfinish(cq->dbh);
243   if(cq->remote_str) free(cq->remote_str);
244   if(cq->remote_cn) free(cq->remote_cn);
245   if(cq->fqdn) free(cq->fqdn);
246   if(cq->dsn) free(cq->dsn);
247   free(cq);
248 }
249 static void
250 ttl_purge_conn_pool(conn_pool *pool) {
251   int old_cnt, new_cnt;
252   time_t now = time(NULL);
253   conn_q *cq, *prev = NULL, *iter;
254   /* because we always replace on the head and update the last_use time when
255      doing so, we know they are ordered LRU on the end.  So, once we hit an
256      old one, we know all the others are old too.
257    */
258   if(!pool->head) return; /* hack short circuit for no locks */
259   pthread_mutex_lock(&pool->lock);
260   old_cnt = pool->in_pool;
261   cq = pool->head;
262   while(cq) {
263     if(cq->last_use + cq->pool->ttl < now) {
264       if(prev) prev->next = NULL;
265       else pool->head = NULL;
266       break;
267     }
268     prev = cq;
269     cq = cq->next;
270   }
271   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
272   /* Fix accounting */
273   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
274   new_cnt = pool->in_pool;
275   pthread_mutex_unlock(&pool->lock);
276
277   /* Force release these without holding the lock */
278   while(cq) {
279     prev = cq;
280     cq = cq->next;
281     release_conn_q_forceable(cq, 1);
282   }
283   if(old_cnt != new_cnt)
284     noitL(noit_debug, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
285           pool->queue_name);
286 }
287 static void
288 release_conn_q(conn_q *cq) {
289   ttl_purge_conn_pool(cq->pool);
290   release_conn_q_forceable(cq, 0);
291 }
292 static conn_pool *
293 get_conn_pool_for_remote(const char *remote_str,
294                          const char *remote_cn, const char *fqdn) {
295   void *vcpool;
296   conn_pool *cpool = NULL;
297   char queue_name[256] = "datastore_";
298   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
299            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
300            fqdn ? fqdn : "default",
301            remote_cn ? remote_cn : "default");
302   pthread_mutex_lock(&ds_conns_lock);
303   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
304                         strlen(queue_name), &vcpool))
305     cpool = vcpool;
306   pthread_mutex_unlock(&ds_conns_lock);
307   if(!cpool) {
308     vcpool = cpool = calloc(1, sizeof(*cpool));
309     cpool->queue_name = strdup(queue_name);
310     pthread_mutex_init(&cpool->lock, NULL);
311     pthread_cond_init(&cpool->cv, NULL);
312     cpool->in_pool = 0;
313     cpool->outstanding = 0;
314     cpool->max_in_pool = 1;
315     cpool->max_allocated = 1;
316     pthread_mutex_lock(&ds_conns_lock);
317     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
318                         cpool)) {
319       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
320                          strlen(queue_name), &vcpool);
321     }
322     pthread_mutex_unlock(&ds_conns_lock);
323     if(vcpool != cpool) {
324       /* someone beat us to it */
325       free(cpool->queue_name);
326       pthread_mutex_destroy(&cpool->lock);
327       pthread_cond_destroy(&cpool->cv);
328       free(cpool);
329     }
330     else {
331       int i;
332       /* Our job to setup the pool */
333       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
334       eventer_jobq_init(cpool->jobq, queue_name);
335       cpool->jobq->backq = eventer_default_backq();
336       /* Add one thread */
337       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
338         eventer_jobq_increase_concurrency(cpool->jobq);
339     }
340     cpool = vcpool;
341   }
342   return cpool;
343 }
344 static conn_q *
345 get_conn_q_for_remote(const char *remote_str,
346                       const char *remote_cn, const char *fqdn,
347                       const char *dsn) {
348   conn_pool *cpool;
349   conn_q *cq;
350   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
351   noitL(noit_debug, "[%p] requesting [%s]\n", (void *)pthread_self(),
352         cpool->queue_name);
353   pthread_mutex_lock(&cpool->lock);
354  again:
355   if(cpool->head) {
356     assert(cpool->in_pool > 0);
357     cq = cpool->head;
358     cpool->head = cq->next;
359     cpool->in_pool--;
360     cpool->outstanding++;
361     cq->next = NULL;
362     pthread_mutex_unlock(&cpool->lock);
363     return cq;
364   }
365   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
366     noitL(noit_debug, "[%p] over-subscribed, waiting [%s]\n",
367           (void *)pthread_self(), cpool->queue_name);
368     pthread_cond_wait(&cpool->cv, &cpool->lock);
369     noitL(noit_debug, "[%p] waking up and trying again [%s]\n",
370           (void *)pthread_self(), cpool->queue_name);
371     goto again;
372   }
373   else {
374     cpool->outstanding++;
375     pthread_mutex_unlock(&cpool->lock);
376   }
377  
378   cq = calloc(1, sizeof(*cq));
379   cq->pool = cpool;
380   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
381   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
382   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
383   cq->dsn = dsn ? strdup(dsn) : NULL;
384   return cq;
385 }
386 static conn_q *
387 get_conn_q_for_metanode() {
388   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
389 }
390
391 typedef enum {
392   DS_EXEC_SUCCESS = 0,
393   DS_EXEC_ROW_FAILED = 1,
394   DS_EXEC_TXN_FAILED = 2,
395 } execute_outcome_t;
396
397 static char *
398 __noit__strndup(const char *src, int len) {
399   int slen;
400   char *dst;
401   for(slen = 0; slen < len; slen++)
402     if(src[slen] == '\0') break;
403   dst = malloc(slen + 1);
404   memcpy(dst, src, slen);
405   dst[slen] = '\0';
406   return dst;
407 }
408 #define DECLARE_PARAM_STR(str, len) do { \
409   d->paramValues[d->nparams] = __noit__strndup(str, len); \
410   d->paramLengths[d->nparams] = len; \
411   d->paramFormats[d->nparams] = 0; \
412   d->paramAllocd[d->nparams] = 1; \
413   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
414     free(d->paramValues[d->nparams]); \
415     d->paramValues[d->nparams] = NULL; \
416     d->paramLengths[d->nparams] = 0; \
417     d->paramAllocd[d->nparams] = 0; \
418   } \
419   d->nparams++; \
420 } while(0)
421 #define DECLARE_PARAM_INT(i) do { \
422   int buffer__len; \
423   char buffer__[32]; \
424   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
425   buffer__len = strlen(buffer__); \
426   DECLARE_PARAM_STR(buffer__, buffer__len); \
427 } while(0)
428
429 #define PG_GET_STR_COL(dest, row, name) do { \
430   int colnum = PQfnumber(d->res, name); \
431   dest = NULL; \
432   if (colnum >= 0) \
433     dest = PQgetisnull(d->res, row, colnum) \
434          ? NULL : PQgetvalue(d->res, row, colnum); \
435 } while(0)
436
437 #define PG_EXEC(cmd) do { \
438   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
439                         (const char * const *)d->paramValues, \
440                         d->paramLengths, d->paramFormats, 0); \
441   d->rv = PQresultStatus(d->res); \
442   if(d->rv != PGRES_COMMAND_OK && \
443      d->rv != PGRES_TUPLES_OK) { \
444     noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s'\n", \
445           d->rv, PQresultErrorMessage(d->res), cmd); \
446     PQclear(d->res); \
447     goto bad_row; \
448   } \
449 } while(0)
450
451 #define PG_TM_EXEC(cmd, whence) do { \
452   time_t __w = whence; \
453   char cmdbuf[4096]; \
454   struct tm tbuf, *tm; \
455   tm = gmtime_r(&__w, &tbuf); \
456   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
457   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
458                         (const char * const *)d->paramValues, \
459                         d->paramLengths, d->paramFormats, 0); \
460   d->rv = PQresultStatus(d->res); \
461   if(d->rv != PGRES_COMMAND_OK && \
462      d->rv != PGRES_TUPLES_OK) { \
463     noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \
464           d->rv, PQresultErrorMessage(d->res), cmdbuf, \
465           (long long unsigned)whence); \
466     PQclear(d->res); \
467     goto bad_row; \
468   } \
469 } while(0)
470
471 static int
472 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
473                                     struct timeval *now) {
474   ds_single_detail *d;
475   int i, row_count = 0, good = 0;
476   char buff[1024];
477   conn_q *cq;
478   cq = get_conn_q_for_metanode();
479
480   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
481   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
482
483   stratcon_database_connect(cq);
484   d = calloc(1, sizeof(*d));
485   GET_QUERY(check_loadall);
486   PG_EXEC(check_loadall);
487   row_count = PQntuples(d->res);
488  
489   for(i=0; i<row_count; i++) {
490     int rv;
491     int8_t family;
492     struct sockaddr *sin;
493     struct sockaddr_in sin4 = { .sin_family = AF_INET };
494     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
495     char *remote, *id, *target, *module, *name;
496     PG_GET_STR_COL(remote, i, "remote_address");
497     PG_GET_STR_COL(id, i, "id");
498     PG_GET_STR_COL(target, i, "target");
499     PG_GET_STR_COL(module, i, "module");
500     PG_GET_STR_COL(name, i, "name");
501     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
502
503     family = AF_INET;
504     sin = (struct sockaddr *)&sin4;
505     rv = inet_pton(family, remote, &sin4.sin_addr);
506     if(rv != 1) {
507       family = AF_INET6;
508       sin = (struct sockaddr *)&sin6;
509       rv = inet_pton(family, remote, &sin6.sin6_addr);
510       if(rv != 1) {
511         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
512         sin = NULL;
513       }
514     }
515
516     /* stratcon_iep_line_processor takes an allocated operand and frees it */
517     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
518     good++;
519   }
520   noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count);
521  bad_row:
522   free_params((ds_single_detail *)d);
523   free(d);
524   release_conn_q(cq);
525   return 0;
526 }
527 void
528 stratcon_datastore_iep_check_preload() {
529   eventer_t e;
530   conn_pool *cpool;
531
532   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
533   e = eventer_alloc();
534   e->mask = EVENTER_ASYNCH;
535   e->callback = stratcon_datastore_asynch_drive_iep;
536   e->closure = NULL;
537   eventer_add_asynch(cpool->jobq, e);
538 }
539 execute_outcome_t
540 stratcon_datastore_find(conn_q *cq, ds_rt_detail *d) {
541   char *val;
542   int row_count;
543   struct realtime_tracker *node;
544
545   GET_QUERY(check_find);
546   for(node = d->rt; node; node = node->next) {
547     DECLARE_PARAM_INT(node->sid);
548     PG_EXEC(check_find);
549     row_count = PQntuples(d->res);
550     if(row_count != 1) {
551       PQclear(d->res);
552       goto bad_row;
553     }
554
555     /* Get the check uuid */
556     PG_GET_STR_COL(val, 0, "id");
557     if(!val) {
558       PQclear(d->res);
559       goto bad_row;
560     }
561     if(uuid_parse(val, node->checkid)) {
562       PQclear(d->res);
563       goto bad_row;
564     }
565  
566     /* Get the remote_address (which noit owns this) */
567     PG_GET_STR_COL(val, 0, "remote_address");
568     if(!val) {
569       PQclear(d->res);
570       goto bad_row;
571     }
572     node->noit = strdup(val);
573  
574    bad_row:
575     free_params((ds_single_detail *)d);
576     d->nparams = 0;
577   }
578   return DS_EXEC_SUCCESS;
579 }
580 execute_outcome_t
581 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
582                            ds_line_detail *d) {
583   int type, len, sid;
584   char *final_buff;
585   uLong final_len, actual_final_len;
586   char *token;
587   char raddr_blank[1] = "";
588   const char *raddr;
589
590   type = d->data[0];
591   raddr = r ? r : raddr_blank;
592
593   /* Parse the log line, but only if we haven't already */
594   if(!d->nparams) {
595     char *scp, *ecp;
596
597     scp = d->data;
598 #define PROCESS_NEXT_FIELD(t,l) do { \
599   if(!*scp) goto bad_row; \
600   ecp = strchr(scp, '\t'); \
601   if(!ecp) goto bad_row; \
602   token = scp; \
603   len = (ecp-scp); \
604   scp = ecp + 1; \
605 } while(0)
606 #define PROCESS_LAST_FIELD(t,l) do { \
607   if(!*scp) ecp = scp; \
608   else { \
609     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
610     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
611   } \
612   t = scp; \
613   l = (ecp-scp); \
614 } while(0)
615
616     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
617     switch(type) {
618       /* See noit_check_log.c for log description */
619       case 'n':
620         DECLARE_PARAM_STR(raddr, strlen(raddr));
621         DECLARE_PARAM_STR("noitd",5); /* node_type */
622         PROCESS_NEXT_FIELD(token,len);
623         d->whence = (time_t)strtoul(token, NULL, 10);
624         DECLARE_PARAM_STR(token,len); /* timestamp */
625
626         /* This is the expected uncompressed len */
627         PROCESS_NEXT_FIELD(token,len);
628         final_len = atoi(token);
629         final_buff = malloc(final_len);
630         if(!final_buff) goto bad_row;
631  
632         /* The last token is b64 endoded and compressed.
633          * we need to decode it, declare it and then free it.
634          */
635         PROCESS_LAST_FIELD(token, len);
636         /* We can in-place decode this */
637         len = noit_b64_decode((char *)token, len,
638                               (unsigned char *)token, len);
639         if(len <= 0) {
640           noitL(noit_error, "noitd config base64 decoding error.\n");
641           free(final_buff);
642           goto bad_row;
643         }
644         actual_final_len = final_len;
645         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
646                               (unsigned char *)token, len)) {
647           noitL(noit_error, "noitd config decompression failure.\n");
648           free(final_buff);
649           goto bad_row;
650         }
651         if(final_len != actual_final_len) {
652           noitL(noit_error, "noitd config decompression error.\n");
653           free(final_buff);
654           goto bad_row;
655         }
656         DECLARE_PARAM_STR(final_buff, final_len);
657         free(final_buff);
658         break;
659       case 'C':
660         DECLARE_PARAM_STR(raddr, strlen(raddr));
661         PROCESS_NEXT_FIELD(token,len);
662         DECLARE_PARAM_STR(token,len); /* timestamp */
663         d->whence = (time_t)strtoul(token, NULL, 10);
664         PROCESS_NEXT_FIELD(token, len);
665         sid = uuid_to_sid(token, remote_cn);
666         if(sid == 0) goto bad_row;
667         DECLARE_PARAM_INT(sid); /* sid */
668         DECLARE_PARAM_STR(token,len); /* uuid */
669         PROCESS_NEXT_FIELD(token, len);
670         DECLARE_PARAM_STR(token,len); /* target */
671         PROCESS_NEXT_FIELD(token, len);
672         DECLARE_PARAM_STR(token,len); /* module */
673         PROCESS_LAST_FIELD(token, len);
674         DECLARE_PARAM_STR(token,len); /* name */
675         break;
676       case 'M':
677         PROCESS_NEXT_FIELD(token,len);
678         DECLARE_PARAM_STR(token,len); /* timestamp */
679         d->whence = (time_t)strtoul(token, NULL, 10);
680         PROCESS_NEXT_FIELD(token, len);
681         sid = uuid_to_sid(token, remote_cn);
682         if(sid == 0) goto bad_row;
683         DECLARE_PARAM_INT(sid); /* sid */
684         PROCESS_NEXT_FIELD(token, len);
685         DECLARE_PARAM_STR(token,len); /* name */
686         PROCESS_NEXT_FIELD(token,len);
687         d->metric_type = *token;
688         PROCESS_LAST_FIELD(token,len);
689         DECLARE_PARAM_STR(token,len); /* value */
690         break;
691       case 'S':
692         PROCESS_NEXT_FIELD(token,len);
693         DECLARE_PARAM_STR(token,len); /* timestamp */
694         d->whence = (time_t)strtoul(token, NULL, 10);
695         PROCESS_NEXT_FIELD(token, len);
696         sid = uuid_to_sid(token, remote_cn);
697         if(sid == 0) goto bad_row;
698         DECLARE_PARAM_INT(sid); /* sid */
699         PROCESS_NEXT_FIELD(token, len);
700         DECLARE_PARAM_STR(token,len); /* state */
701         PROCESS_NEXT_FIELD(token, len);
702         DECLARE_PARAM_STR(token,len); /* availability */
703         PROCESS_NEXT_FIELD(token, len);
704         DECLARE_PARAM_STR(token,len); /* duration */
705         PROCESS_LAST_FIELD(token,len);
706         DECLARE_PARAM_STR(token,len); /* status */
707         break;
708       default:
709         goto bad_row;
710     }
711
712   }
713
714   /* Now execute the query */
715   switch(type) {
716     case 'n':
717       GET_QUERY(config_insert);
718       PG_EXEC(config_insert);
719       PQclear(d->res);
720       break;
721     case 'C':
722       GET_QUERY(check_insert);
723       PG_EXEC(check_insert);
724       PQclear(d->res);
725       break;
726     case 'S':
727       GET_QUERY(status_insert);
728       PG_TM_EXEC(status_insert, d->whence);
729       PQclear(d->res);
730       break;
731     case 'M':
732       switch(d->metric_type) {
733         case METRIC_INT32:
734         case METRIC_UINT32:
735         case METRIC_INT64:
736         case METRIC_UINT64:
737         case METRIC_DOUBLE:
738           GET_QUERY(metric_insert_numeric);
739           PG_TM_EXEC(metric_insert_numeric, d->whence);
740           PQclear(d->res);
741           break;
742         case METRIC_STRING:
743           GET_QUERY(metric_insert_text);
744           PG_TM_EXEC(metric_insert_text, d->whence);
745           PQclear(d->res);
746           break;
747         default:
748           goto bad_row;
749       }
750       break;
751     default:
752       /* should never get here */
753       goto bad_row;
754   }
755   return DS_EXEC_SUCCESS;
756  bad_row:
757   return DS_EXEC_ROW_FAILED;
758 }
759 static int
760 stratcon_database_post_connect(conn_q *cq) {
761   int rv = 0;
762   ds_single_detail _d = { 0 }, *d = &_d;
763   if(cq->remote_str) {
764     char *remote_str, *remote_cn;
765     /* This is the silly way we get null's in through our declare_param_str */
766     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
767     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
768     /* This is a storage node, it gets the storage node post_connect */
769     GET_QUERY(storage_post_connect);
770     rv = -1; /* now we're serious */
771     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
772     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
773     PG_EXEC(storage_post_connect);
774     PQclear(d->res);
775     rv = 0;
776   }
777   else {
778     /* Metanode post_connect */
779     GET_QUERY(metanode_post_connect);
780     rv = -1; /* now we're serious */
781     PG_EXEC(metanode_post_connect);
782     PQclear(d->res);
783     rv = 0;
784   }
785  bad_row:
786   free_params(d);
787   if(rv == -1) {
788     /* Post-connect intentions are serious and fatal */
789     PQfinish(cq->dbh);
790     cq->dbh = NULL;
791   }
792   return rv;
793 }
794 static int
795 stratcon_database_connect(conn_q *cq) {
796   char *dsn, dsn_meta[512];
797   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
798   const char *k, *v;
799   int klen;
800   noit_hash_table *t;
801
802   dsn_meta[0] = '\0';
803   if(!cq->dsn) {
804     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
805     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
806       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
807       strlcat(dsn_meta, k, sizeof(dsn_meta));
808       strlcat(dsn_meta, "=", sizeof(dsn_meta));
809       strlcat(dsn_meta, v, sizeof(dsn_meta));
810     }
811     noit_hash_destroy(t, free, free);
812     free(t);
813     dsn = dsn_meta;
814   }
815   else dsn = cq->dsn;
816
817   if(cq->dbh) {
818     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
819     PQreset(cq->dbh);
820     if(stratcon_database_post_connect(cq)) return -1;
821     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
822     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
823           dsn, PQerrorMessage(cq->dbh));
824     return -1;
825   }
826
827   cq->dbh = PQconnectdb(dsn);
828   if(!cq->dbh) return -1;
829   if(stratcon_database_post_connect(cq)) return -1;
830   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
831   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
832         dsn, PQerrorMessage(cq->dbh));
833   return -1;
834 }
835 static int
836 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
837                                 const char *name) {
838   int rv = -1;
839   PGresult *res;
840   char cmd[128];
841   strlcpy(cmd, p, sizeof(cmd));
842   strlcat(cmd, name, sizeof(cmd));
843   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
844   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
845   PQclear(res);
846   return rv;
847 }
848 static int
849 stratcon_datastore_do(conn_q *cq, const char *cmd) {
850   PGresult *res;
851   int rv = -1;
852   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
853   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
854   PQclear(res);
855   return rv;
856 }
857 #define BUSTED(cq) do { \
858   PQfinish((cq)->dbh); \
859   (cq)->dbh = NULL; \
860   goto full_monty; \
861 } while(0)
862 #define SAVEPOINT(name) do { \
863   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
864   last_sp = current; \
865 } while(0)
866 #define ROLLBACK_TO_SAVEPOINT(name) do { \
867   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
868     BUSTED(cq); \
869   last_sp = NULL; \
870 } while(0)
871 #define RELEASE_SAVEPOINT(name) do { \
872   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
873     BUSTED(cq); \
874   last_sp = NULL; \
875 } while(0)
876 int
877 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
878                                  struct timeval *now) {
879   ds_rt_detail *dsjd = closure;
880   conn_q *cq;
881   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
882
883   cq = get_conn_q_for_metanode();
884   stratcon_database_connect(cq);
885   assert(dsjd->rt);
886   stratcon_datastore_find(cq, dsjd);
887   if(dsjd->completion_event)
888     eventer_add(dsjd->completion_event);
889
890   free_params((ds_single_detail *)dsjd);
891   free(dsjd);
892   release_conn_q(cq);
893   return 0;
894 }
895 static const char *
896 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
897   void *vinfo;
898   const char *dsn = NULL, *fqdn = NULL;
899   int found = 0;
900   storagenode_info *info = NULL;
901   pthread_mutex_lock(&storagenode_to_info_cache_lock);
902   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
903                         &vinfo)) {
904     found = 1;
905     info = vinfo;
906   }
907   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
908   if(found) {
909     if(fqdn_out) *fqdn_out = info->fqdn;
910     return info->dsn;
911   }
912
913   if(!found && can_use_db) {
914     ds_single_detail *d;
915     conn_q *cq;
916     int row_count;
917     /* Look it up and store it */
918     d = calloc(1, sizeof(*d));
919     cq = get_conn_q_for_metanode();
920     GET_QUERY(find_storage);
921     DECLARE_PARAM_INT(id);
922     PG_EXEC(find_storage);
923     row_count = PQntuples(d->res);
924     if(row_count) {
925       PG_GET_STR_COL(dsn, 0, "dsn");
926       PG_GET_STR_COL(fqdn, 0, "fqdn");
927     }
928     PQclear(d->res);
929    bad_row:
930     free_params(d);
931     free(d);
932     release_conn_q(cq);
933   }
934   if(fqdn) {
935     info = calloc(1, sizeof(*info));
936     info->fqdn = strdup(fqdn);
937     if(fqdn_out) *fqdn_out = info->fqdn;
938     info->dsn = dsn ? strdup(dsn) : NULL;
939     info->storagenode_id = id;
940     pthread_mutex_lock(&storagenode_to_info_cache_lock);
941     noit_hash_store(&storagenode_to_info_cache,
942                     (void *)&info->storagenode_id, sizeof(int), info);
943     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
944   }
945   return info ? info->dsn : NULL;
946 }
947 static ds_line_detail *
948 build_insert_batch(interim_journal_t *ij) {
949   int rv;
950   off_t len;
951   const char *buff, *cp, *lcp;
952   struct stat st;
953   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
954
955   if(ij->fd < 0) {
956     ij->fd = open(ij->filename, O_RDONLY);
957     if(ij->fd < 0) {
958       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
959             ij->filename, strerror(errno));
960       assert(ij->fd >= 0);
961     }
962   }
963   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
964   assert(rv != -1);
965   len = st.st_size;
966   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
967   if(buff == (void *)-1) {
968     noitL(noit_error, "mmap(%s) => %s\n", ij->filename, strerror(errno));
969     assert(buff != (void *)-1);
970   }
971   lcp = buff;
972   while(lcp < (buff + len) &&
973         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
974     next = calloc(1, sizeof(*next));
975     next->data = malloc(cp - lcp + 1);
976     memcpy(next->data, lcp, cp - lcp);
977     next->data[cp - lcp] = '\0';
978     if(!head) head = next;
979     if(last) last->next = next;
980     last = next;
981     lcp = cp + 1;
982   }
983   munmap((void *)buff, len);
984   close(ij->fd);
985   return head;
986 }
987 static void
988 interim_journal_remove(interim_journal_t *ij) {
989   unlink(ij->filename);
990   if(ij->filename) free(ij->filename);
991   if(ij->remote_str) free(ij->remote_str);
992   if(ij->remote_cn) free(ij->remote_cn);
993   if(ij->fqdn) free(ij->fqdn);
994 }
995 int
996 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
997                                   struct timeval *now) {
998   int i;
999   interim_journal_t *ij;
1000   ds_line_detail *head, *current, *last_sp;
1001   const char *dsn;
1002   conn_q *cq;
1003   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1004   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1005
1006   ij = closure;
1007   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1008   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1009                              ij->fqdn, dsn);
1010   noitL(noit_debug, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1011         ij->remote_str, ij->remote_cn, ij->fqdn);
1012  full_monty:
1013   /* Make sure we have a connection */
1014   i = 1;
1015   while(stratcon_database_connect(cq)) {
1016     noitL(noit_error, "Error connecting to database\n");
1017     sleep(i);
1018     i *= 2;
1019     i = MIN(i, 16);
1020   }
1021
1022   head = build_insert_batch(ij);
1023   current = head;
1024   last_sp = NULL;
1025   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1026   while(current) {
1027     execute_outcome_t rv;
1028     if(current->data) {
1029       if(!last_sp) SAVEPOINT("batch");
1030  
1031       if(current->problematic) {
1032         noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1033         RELEASE_SAVEPOINT("batch");
1034         current = current->next;
1035         continue;
1036       }
1037       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1038                                       current);
1039       switch(rv) {
1040         case DS_EXEC_SUCCESS:
1041           current = current->next;
1042           break;
1043         case DS_EXEC_ROW_FAILED:
1044           /* rollback to savepoint, mark this record as bad and start again */
1045           current->problematic = 1;
1046           current = last_sp;
1047           ROLLBACK_TO_SAVEPOINT("batch");
1048           break;
1049         case DS_EXEC_TXN_FAILED:
1050           BUSTED(cq);
1051       }
1052     }
1053   }
1054   if(last_sp) RELEASE_SAVEPOINT("batch");
1055   if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq);
1056   /* Cleanup the mess */
1057   while(head) {
1058     ds_line_detail *tofree;
1059     tofree = head;
1060     head = head->next;
1061     if(tofree->data) free(tofree->data);
1062     free_params((ds_single_detail *)tofree);
1063     free(tofree);
1064   }
1065   interim_journal_remove(ij);
1066   release_conn_q(cq);
1067   return 0;
1068 }
1069 static int
1070 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1071                                 struct timeval *now) {
1072   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1073   const char *k;
1074   int klen;
1075   void *vij;
1076   interim_journal_t *ij;
1077   syncset_t *syncset = closure;
1078
1079   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1080   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1081
1082   noitL(noit_debug, "Syncing journal sets...\n");
1083   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1084     eventer_t ingest;
1085     ij = vij;
1086     noitL(noit_debug, "Syncing journal set [%s,%s,%s]\n",
1087           ij->remote_str, ij->remote_cn, ij->fqdn);
1088     fsync(ij->fd);
1089     close(ij->fd);
1090     ij->fd = -1;
1091     ingest = eventer_alloc();
1092     ingest->mask = EVENTER_ASYNCH;
1093     ingest->callback = stratcon_datastore_asynch_execute;
1094     ingest->closure = ij;
1095     eventer_add_asynch(ij->cpool->jobq, ingest);
1096   }
1097   noit_hash_destroy(syncset->ws, free, NULL);
1098   free(syncset->ws);
1099   eventer_add(syncset->completion);
1100   free(syncset);
1101   return 0;
1102 }
1103 static interim_journal_t *
1104 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1105                     int storagenode_id, const char *fqdn_in) {
1106   void *vhash, *vij;
1107   noit_hash_table *working_set;
1108   interim_journal_t *ij;
1109   struct timeval now;
1110   char jpath[PATH_MAX];
1111   char remote_str[128];
1112   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1113   const char *fqdn = fqdn_in ? fqdn_in : "default";
1114
1115   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1116   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1117
1118   /* Lookup the working set */
1119   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1120     working_set = calloc(1, sizeof(*working_set));
1121     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1122                     working_set);
1123   }
1124   else
1125     working_set = vhash;
1126
1127   /* Lookup the interim journal within the working set */
1128   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1129     ij = calloc(1, sizeof(*ij));
1130     gettimeofday(&now, NULL);
1131     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1132              basejpath, remote_str, remote_cn, storagenode_id,
1133              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1134     ij->remote_str = strdup(remote_str);
1135     ij->remote_cn = strdup(remote_cn);
1136     ij->fqdn = strdup(fqdn);
1137     ij->storagenode_id = storagenode_id;
1138     ij->filename = strdup(jpath);
1139     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1140                                          ij->fqdn);
1141     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1142     if(ij->fd < 0 && errno == ENOENT) {
1143       if(mkdir_for_file(ij->filename, 0750)) {
1144         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1145               ij->filename, strerror(errno));
1146         exit(-1);
1147       }
1148       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1149     }
1150     if(ij->fd < 0) {
1151       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1152             ij->filename, strerror(errno));
1153       exit(-1);
1154     }
1155     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1156   }
1157   else
1158     ij = vij;
1159
1160   return ij;
1161 }
1162 static void
1163 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1164                           int *sid_out, int *storagenode_id_out,
1165                           char **fqdn_out, char **dsn_out) {
1166   /* only called from the main thread -- no safety issues */
1167   void *vuuidinfo, *vinfo;
1168   uuid_info *uuidinfo;
1169   storagenode_info *info = NULL;
1170   char *fqdn = NULL;
1171   char *dsn = NULL;
1172   int storagenode_id = 0, sid = 0;
1173   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1174                          &vuuidinfo)) {
1175     int row_count;
1176     char *tmpint;
1177     ds_single_detail *d;
1178     conn_q *cq;
1179     d = calloc(1, sizeof(*d));
1180     cq = get_conn_q_for_metanode();
1181     if(stratcon_database_connect(cq) == 0) {
1182       /* Blocking call to service the cache miss */
1183       GET_QUERY(check_map);
1184       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1185       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1186       PG_EXEC(check_map);
1187       row_count = PQntuples(d->res);
1188       if(row_count != 1) {
1189         PQclear(d->res);
1190         goto bad_row;
1191       }
1192       PG_GET_STR_COL(tmpint, 0, "sid");
1193       sid = atoi(tmpint);
1194       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1195       if(tmpint) storagenode_id = atoi(tmpint);
1196       PG_GET_STR_COL(fqdn, 0, "fqdn");
1197       PG_GET_STR_COL(dsn, 0, "dsn");
1198       PQclear(d->res);
1199     }
1200    bad_row:
1201     free_params((ds_single_detail *)d);
1202     free(d);
1203     release_conn_q(cq);
1204     /* Place in cache */
1205     if(fqdn) fqdn = strdup(fqdn);
1206     uuidinfo = calloc(1, sizeof(*uuidinfo));
1207     uuidinfo->sid = sid;
1208     uuidinfo->uuid_str = strdup(uuid_str);
1209     noit_hash_store(&uuid_to_info_cache,
1210                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1211     /* Also, we may have just witnessed a new storage node, store it */
1212     if(storagenode_id) {
1213       int needs_free = 0;
1214       info = calloc(1, sizeof(*info));
1215       info->storagenode_id = storagenode_id;
1216       info->dsn = dsn ? strdup(dsn) : NULL;
1217       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1218       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1219       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1220                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1221         /* hack to save memory -- we *never* remove from these caches,
1222            so we can use the same fqdn value in the above cache for the key
1223            in the cache below -- (no strdup) */
1224         noit_hash_store(&storagenode_to_info_cache,
1225                         (void *)&info->storagenode_id, sizeof(int), info);
1226       }
1227       else needs_free = 1;
1228       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1229       if(needs_free) {
1230         if(info->dsn) free(info->dsn);
1231         if(info->fqdn) free(info->fqdn);
1232         free(info);
1233       }
1234     }
1235   }
1236   else
1237     uuidinfo = vuuidinfo;
1238
1239   if(storagenode_id) {
1240     if(uuidinfo &&
1241        ((!dsn && dsn_out) || (!fqdn && fqdn_out))) {
1242       /* we don't have dsn and we actually want it */
1243       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1244       if(noit_hash_retrieve(&storagenode_to_info_cache,
1245                             (void *)&storagenode_id, sizeof(int), &vinfo))
1246         info = vinfo;
1247       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1248     }
1249   }
1250
1251   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1252   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1253   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1254   if(sid_out) *sid_out = uuidinfo->sid;
1255 }
1256 static int
1257 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1258   char uuid_str[UUID_STR_LEN+1];
1259   int sid = 0;
1260   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1261   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL);
1262   return sid;
1263 }
1264 static void
1265 stratcon_datastore_journal(struct sockaddr *remote,
1266                            const char *remote_cn, const char *line) {
1267   interim_journal_t *ij = NULL;
1268   char uuid_str[UUID_STR_LEN+1], *cp, *fqdn, *dsn;
1269   int storagenode_id = 0;
1270   uuid_t checkid;
1271   if(!line) return;
1272   /* if it is a UUID based thing, find the storage node */
1273   switch(*line) {
1274     case 'C':
1275     case 'S':
1276     case 'M':
1277       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1278         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1279         if(!uuid_parse(uuid_str, checkid)) {
1280           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1281                                     &storagenode_id, &fqdn, &dsn);
1282           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1283         }
1284       }
1285       break;
1286     case 'n':
1287       ij = interim_journal_get(remote,remote_cn,0,NULL);
1288       break;
1289     default:
1290       break;
1291   }
1292   if(!ij && fqdn) {
1293     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1294   }
1295   else {
1296     int len;
1297     len = write(ij->fd, line, strlen(line));
1298     if(len < 0) {
1299       noitL(noit_error, "write to %s failed: %s\n",
1300             ij->filename, strerror(errno));
1301     }
1302   }
1303   return;
1304 }
1305 static noit_hash_table *
1306 stratcon_datastore_journal_remove(struct sockaddr *remote,
1307                                   const char *remote_cn) {
1308   void *vhash = NULL;
1309   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1310     /* pluck it out */
1311     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1312   }
1313   else {
1314     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1315           remote_cn);
1316     abort();
1317   }
1318   return vhash;
1319 }
1320 void
1321 stratcon_datastore_push(stratcon_datastore_op_t op,
1322                         struct sockaddr *remote,
1323                         const char *remote_cn, void *operand,
1324                         eventer_t completion) {
1325   conn_pool *cpool;
1326   syncset_t *syncset;
1327   eventer_t e;
1328   ds_rt_detail *rtdetail;
1329   struct datastore_onlooker_list *nnode;
1330
1331   for(nnode = onlookers; nnode; nnode = nnode->next)
1332     nnode->dispatch(op,remote,remote_cn,operand);
1333
1334   switch(op) {
1335     case DS_OP_INSERT:
1336       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1337       break;
1338     case DS_OP_CHKPT:
1339       e = eventer_alloc();
1340       syncset = calloc(1, sizeof(*syncset));
1341       e->mask = EVENTER_ASYNCH;
1342       e->callback = stratcon_datastore_journal_sync;
1343       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1344       syncset->completion = completion;
1345       e->closure = syncset;
1346       eventer_add(e);
1347       break;
1348     case DS_OP_FIND_COMPLETE:
1349       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1350       rtdetail = calloc(1, sizeof(*rtdetail));
1351       rtdetail->rt = operand;
1352       rtdetail->completion_event = completion;
1353       e = eventer_alloc();
1354       e->mask = EVENTER_ASYNCH;
1355       e->callback = stratcon_datastore_asynch_lookup;
1356       e->closure = rtdetail;
1357       eventer_add_asynch(cpool->jobq, e);
1358       break;
1359   }
1360 }
1361
1362 int
1363 stratcon_datastore_saveconfig(void *unused) {
1364   int rv = -1;
1365   char *buff;
1366   ds_single_detail _d = { 0 }, *d = &_d;
1367   conn_q *cq;
1368   cq = get_conn_q_for_metanode();
1369
1370   if(stratcon_database_connect(cq) == 0) {
1371     char time_as_str[20];
1372     size_t len;
1373     buff = noit_conf_xml_in_mem(&len);
1374     if(!buff) goto bad_row;
1375
1376     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1377     DECLARE_PARAM_STR("0.0.0.0", 7);
1378     DECLARE_PARAM_STR("stratcond", 9);
1379     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1380     DECLARE_PARAM_STR(buff, len);
1381     free(buff);
1382
1383     GET_QUERY(config_insert);
1384     PG_EXEC(config_insert);
1385     PQclear(d->res);
1386     rv = 0;
1387
1388     bad_row:
1389       free_params(d);
1390   }
1391   release_conn_q(cq);
1392   return rv;
1393 }
1394
1395 void
1396 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1397                                                struct sockaddr *,
1398                                                const char *, void *)) {
1399   struct datastore_onlooker_list *nnode;
1400   nnode = calloc(1, sizeof(*nnode));
1401   nnode->dispatch = f;
1402   nnode->next = onlookers;
1403   while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (void *)nnode->next)
1404     nnode->next = onlookers;
1405 }
1406 static void
1407 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1408                                          char *id_str, char *file) {
1409   char path[PATH_MAX];
1410   interim_journal_t *ij;
1411   eventer_t ingest;
1412
1413   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1414            basejpath, remote_str, remote_cn, id_str, file);
1415   ij = calloc(1, sizeof(*ij));
1416   ij->fd = open(path, O_RDONLY);
1417   if(ij->fd < 0) {
1418     noitL(noit_error, "cannot open journal '%s': %s\n",
1419           path, strerror(errno));
1420     free(ij);
1421     return;
1422   }
1423   close(ij->fd);
1424   ij->fd = -1;
1425   ij->filename = strdup(path);
1426   ij->remote_str = strdup(remote_str);
1427   ij->remote_cn = strdup(remote_cn);
1428   ij->storagenode_id = atoi(id_str);
1429   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1430                                        ij->fqdn);
1431   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1432   ingest = eventer_alloc();
1433   ingest->mask = EVENTER_ASYNCH;
1434   ingest->callback = stratcon_datastore_asynch_execute;
1435   ingest->closure = ij;
1436   eventer_add_asynch(ij->cpool->jobq, ingest);
1437 }
1438 static void
1439 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1440   char path[PATH_MAX];
1441   DIR *root;
1442   struct dirent de, *entry;
1443   int i = 0, cnt = 0;
1444   char **entries;
1445
1446   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1447            first ? "/" : "", first ? first : "",
1448            second ? "/" : "", second ? second : "",
1449            third ? "/" : "", third ? third : "");
1450   root = opendir(path);
1451   if(!root) return;
1452   while(readdir_r(root, &de, &entry) == 0 && entry != NULL) cnt++;
1453   rewinddir(root);
1454   entries = malloc(sizeof(*entries) * cnt);
1455   while(readdir_r(root, &de, &entry) == 0 && entry != NULL) {
1456     if(i < cnt) {
1457       entries[i++] = strdup(entry->d_name);
1458     }
1459   }
1460   closedir(root);
1461   cnt = i; /* could have changed, directories are fickle */
1462   qsort(entries, i, sizeof(*entries),
1463         (int (*)(const void *, const void *))strcasecmp);
1464   for(i=0; i<cnt; i++) {
1465     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1466     if(!first)
1467       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1468     else if(!second)
1469       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1470     else if(!third)
1471       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1472     else if(strlen(entries[i]) == 16)
1473       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1474   }
1475 }
1476 static void
1477 stratcon_datastore_sweep_journals() {
1478   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1479 }
1480
1481 int
1482 stratcon_datastore_ingest_all_storagenode_info() {
1483   int i, cnt;
1484   ds_single_detail _d = { 0 }, *d = &_d;
1485   conn_q *cq;
1486   cq = get_conn_q_for_metanode();
1487
1488   while(stratcon_database_connect(cq)) {
1489     noitL(noit_error, "Error connecting to database\n");
1490     sleep(1);
1491   }
1492
1493   GET_QUERY(all_storage);
1494   PG_EXEC(all_storage);
1495   cnt = PQntuples(d->res);
1496   for(i=0; i<cnt; i++) {
1497     void *vinfo;
1498     char *tmpint, *fqdn, *dsn;
1499     int storagenode_id;
1500     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1501     storagenode_id = atoi(tmpint);
1502     PG_GET_STR_COL(fqdn, i, "fqdn");
1503     PG_GET_STR_COL(dsn, i, "dsn");
1504     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1505     storagenode_id = tmpint ? atoi(tmpint) : 0;
1506
1507     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1508                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1509       storagenode_info *info;
1510       info = calloc(1, sizeof(*info));
1511       info->storagenode_id = storagenode_id;
1512       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1513       info->dsn = dsn ? strdup(dsn) : NULL;
1514       noit_hash_store(&storagenode_to_info_cache,
1515                       (void *)&info->storagenode_id, sizeof(int), info);
1516     }
1517   }
1518   PQclear(d->res);
1519  bad_row:
1520   free_params(d);
1521
1522   release_conn_q(cq);
1523   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1524   return cnt;
1525 }
1526 int
1527 stratcon_datastore_ingest_all_check_info() {
1528   int i, cnt, loaded = 0;
1529   ds_single_detail _d = { 0 }, *d = &_d;
1530   conn_q *cq;
1531   cq = get_conn_q_for_metanode();
1532
1533   while(stratcon_database_connect(cq)) {
1534     noitL(noit_error, "Error connecting to database\n");
1535     sleep(1);
1536   }
1537
1538   GET_QUERY(check_mapall);
1539   PG_EXEC(check_mapall);
1540   cnt = PQntuples(d->res);
1541   for(i=0; i<cnt; i++) {
1542     void *vinfo;
1543     char *tmpint, *fqdn, *dsn, *uuid_str;
1544     int sid, storagenode_id;
1545     uuid_info *uuidinfo;
1546     PG_GET_STR_COL(uuid_str, i, "id");
1547     if(!uuid_str) continue;
1548     PG_GET_STR_COL(tmpint, i, "sid");
1549     if(!tmpint) continue;
1550     sid = atoi(tmpint);
1551     PG_GET_STR_COL(fqdn, i, "fqdn");
1552     PG_GET_STR_COL(dsn, i, "dsn");
1553     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1554     storagenode_id = tmpint ? atoi(tmpint) : 0;
1555
1556     uuidinfo = calloc(1, sizeof(*uuidinfo));
1557     uuidinfo->uuid_str = strdup(uuid_str);
1558     uuidinfo->sid = sid;
1559     noit_hash_store(&uuid_to_info_cache,
1560                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1561     loaded++;
1562     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1563                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1564       storagenode_info *info;
1565       info = calloc(1, sizeof(*info));
1566       info->storagenode_id = storagenode_id;
1567       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1568       info->dsn = dsn ? strdup(dsn) : NULL;
1569       noit_hash_store(&storagenode_to_info_cache,
1570                       (void *)&info->storagenode_id, sizeof(int), info);
1571     }
1572   }
1573   PQclear(d->res);
1574  bad_row:
1575   free_params(d);
1576
1577   release_conn_q(cq);
1578   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1579   return loaded;
1580 }
1581 void
1582 stratcon_datastore_init() {
1583   pthread_mutex_init(&ds_conns_lock, NULL);
1584   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1585   ds_err = noit_log_stream_find("error/datastore");
1586   ingest_err = noit_log_stream_find("error/ingest");
1587   if(!ds_err) ds_err = noit_error;
1588   if(!ingest_err) ingest_err = noit_error;
1589   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1590                            &basejpath)) {
1591     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1592     exit(-1);
1593   }
1594   stratcon_datastore_ingest_all_check_info();
1595   stratcon_datastore_ingest_all_storagenode_info();
1596   stratcon_datastore_sweep_journals();
1597 }
Note: See TracBrowser for help on using the browser.