root/src/stratcon_datastore.c

Revision abe7edbdd20777ec3d02b2f26963cdffdbff79e3, 54.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

cleanup error messages and add some useful debugging

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "utils/noit_str.h"
38 #include "utils/noit_mkdir.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_realtime_http.h"
41 #include "stratcon_iep.h"
42 #include "noit_conf.h"
43 #include "noit_check.h"
44 #include <unistd.h>
45 #include <fcntl.h>
46 #include <netinet/in.h>
47 #include <sys/un.h>
48 #include <dirent.h>
49 #include <arpa/inet.h>
50 #include <sys/mman.h>
51 #include <libpq-fe.h>
52 #include <zlib.h>
53 #include <assert.h>
54 #include <errno.h>
55
56 #define DECL_STMT(codename,confname) \
57 static char *codename = NULL; \
58 static const char *codename##_conf = "/stratcon/database/statements/" #confname
59
60 DECL_STMT(storage_post_connect, storagepostconnect);
61 DECL_STMT(metanode_post_connect, metanodepostconnect);
62 DECL_STMT(find_storage, findstoragenode);
63 DECL_STMT(all_storage, allstoragenodes);
64 DECL_STMT(check_map, mapchecktostoragenode);
65 DECL_STMT(check_mapall, mapallchecks);
66 DECL_STMT(check_loadall, allchecks);
67 DECL_STMT(check_find, findcheck);
68 DECL_STMT(check_insert, check);
69 DECL_STMT(status_insert, status);
70 DECL_STMT(metric_insert_numeric, metric_numeric);
71 DECL_STMT(metric_insert_text, metric_text);
72 DECL_STMT(config_insert, config);
73
74 static noit_log_stream_t ds_err = NULL;
75 static noit_log_stream_t ds_deb = NULL;
76 static noit_log_stream_t ds_pool_deb = NULL;
77 static noit_log_stream_t ingest_err = NULL;
78
79 static struct datastore_onlooker_list {
80   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *,
81                    const char *, void *);
82   struct datastore_onlooker_list *next;
83 } *onlookers = NULL;
84
85 #define GET_QUERY(a) do { \
86   if(a == NULL) \
87     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \
88       goto bad_row; \
89 } while(0)
90
91 struct conn_q;
92
93 typedef struct {
94   char            *queue_name; /* the key fqdn+remote_sn */
95   eventer_jobq_t  *jobq;
96   struct conn_q   *head;
97   pthread_mutex_t  lock;
98   pthread_cond_t   cv;
99   int              ttl;
100   int              in_pool;
101   int              outstanding;
102   int              max_allocated;
103   int              max_in_pool;
104 } conn_pool;
105 typedef struct conn_q {
106   time_t           last_use;
107   char            *dsn;        /* Pg connect string */
108   char            *remote_str; /* the IP of the noit*/
109   char            *remote_cn;  /* the Cert CN of the noit */
110   char            *fqdn;       /* the fqdn of the storage node */
111   conn_pool       *pool;
112   struct conn_q   *next;
113   /* Postgres specific stuff */
114   PGconn          *dbh;
115 } conn_q;
116
117
118 #define MAX_PARAMS 8
119 #define POSTGRES_PARTS \
120   PGresult *res; \
121   int rv; \
122   time_t whence; \
123   int nparams; \
124   int metric_type; \
125   char *paramValues[MAX_PARAMS]; \
126   int paramLengths[MAX_PARAMS]; \
127   int paramFormats[MAX_PARAMS]; \
128   int paramAllocd[MAX_PARAMS];
129
130 typedef struct ds_single_detail {
131   POSTGRES_PARTS
132 } ds_single_detail;
133 typedef struct {
134   /* Postgres specific stuff */
135   POSTGRES_PARTS
136   struct realtime_tracker *rt;
137   conn_q *cq; /* connection on which to perform this job */
138   eventer_t completion_event; /* This event should be registered if non NULL */
139 } ds_rt_detail;
140 typedef struct ds_line_detail {
141   /* Postgres specific stuff */
142   POSTGRES_PARTS
143   char *data;
144   int problematic;
145   struct ds_line_detail *next;
146 } ds_line_detail;
147
148 typedef struct {
149   noit_hash_table *ws;
150   eventer_t completion;
151 } syncset_t;
152
153 typedef struct {
154   char *remote_str;
155   char *remote_cn;
156   char *fqdn;
157   int storagenode_id;
158   int fd;
159   char *filename;
160   conn_pool *cpool;
161 } interim_journal_t;
162
163 static int stratcon_database_connect(conn_q *cq);
164 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn);
165 static int storage_node_quick_lookup(const char *uuid_str,
166                                      const char *remote_cn,
167                                      int *sid_out, int *storagenode_id_out,
168                                      const char **remote_cn_out,
169                                      const char **fqdn_out,
170                                      const char **dsn_out);
171
172 static void
173 free_params(ds_single_detail *d) {
174   int i;
175   for(i=0; i<d->nparams; i++)
176     if(d->paramAllocd[i] && d->paramValues[i])
177       free(d->paramValues[i]);
178 }
179
180 char *basejpath = NULL;
181 pthread_mutex_t ds_conns_lock;
182 noit_hash_table ds_conns;
183 noit_hash_table working_sets;
184
185 /* the fqdn cache needs to be thread safe */
186 typedef struct {
187   char *uuid_str;
188   char *remote_cn;
189   int storagenode_id;
190   int sid;
191 } uuid_info;
192 typedef struct {
193   int storagenode_id;
194   char *fqdn;
195   char *dsn;
196 } storagenode_info;
197 noit_hash_table uuid_to_info_cache;
198 pthread_mutex_t storagenode_to_info_cache_lock;
199 noit_hash_table storagenode_to_info_cache;
200
201 int
202 convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) {
203   char name[128] = "";
204   buff[0] = '\0';
205   if(remote) {
206     int len = 0;
207     switch(remote->sa_family) {
208       case AF_INET:
209         len = sizeof(struct sockaddr_in);
210         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
211                   name, len);
212         break;
213       case AF_INET6:
214        len = sizeof(struct sockaddr_in6);
215         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
216                   name, len);
217        break;
218       case AF_UNIX:
219         len = SUN_LEN(((struct sockaddr_un *)remote));
220         snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path);
221         break;
222       default: return 0;
223     }
224   }
225   strlcpy(buff, name, blen);
226   return strlen(buff);
227 }
228
229 /* Thread-safe connection pools */
230
231 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */
232 static void
233 release_conn_q_forceable(conn_q *cq, int forcefree) {
234   int putback = 0;
235   cq->last_use = time(NULL);
236   pthread_mutex_lock(&cq->pool->lock);
237   cq->pool->outstanding--;
238   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) {
239     putback = 1;
240     cq->next = cq->pool->head;
241     cq->pool->head = cq;
242     cq->pool->in_pool++;
243   }
244   pthread_mutex_unlock(&cq->pool->lock);
245   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(),
246         putback ? "to pool" : "and destroy", cq->pool->queue_name);
247   pthread_cond_signal(&cq->pool->cv);
248   if(putback) return;
249
250   /* Not put back, release it */
251   if(cq->dbh) PQfinish(cq->dbh);
252   if(cq->remote_str) free(cq->remote_str);
253   if(cq->remote_cn) free(cq->remote_cn);
254   if(cq->fqdn) free(cq->fqdn);
255   if(cq->dsn) free(cq->dsn);
256   free(cq);
257 }
258 static void
259 ttl_purge_conn_pool(conn_pool *pool) {
260   int old_cnt, new_cnt;
261   time_t now = time(NULL);
262   conn_q *cq, *prev = NULL, *iter;
263   /* because we always replace on the head and update the last_use time when
264      doing so, we know they are ordered LRU on the end.  So, once we hit an
265      old one, we know all the others are old too.
266    */
267   if(!pool->head) return; /* hack short circuit for no locks */
268   pthread_mutex_lock(&pool->lock);
269   old_cnt = pool->in_pool;
270   cq = pool->head;
271   while(cq) {
272     if(cq->last_use + cq->pool->ttl < now) {
273       if(prev) prev->next = NULL;
274       else pool->head = NULL;
275       break;
276     }
277     prev = cq;
278     cq = cq->next;
279   }
280   /* Now pool->head is a chain of unexpired and cq is a chain of expired */
281   /* Fix accounting */
282   for(iter=cq; iter; iter=iter->next) pool->in_pool--;
283   new_cnt = pool->in_pool;
284   pthread_mutex_unlock(&pool->lock);
285
286   /* Force release these without holding the lock */
287   while(cq) {
288     prev = cq;
289     cq = cq->next;
290     release_conn_q_forceable(cq, 1);
291   }
292   if(old_cnt != new_cnt)
293     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt,
294           pool->queue_name);
295 }
296 static void
297 release_conn_q(conn_q *cq) {
298   ttl_purge_conn_pool(cq->pool);
299   release_conn_q_forceable(cq, 0);
300 }
301 static conn_pool *
302 get_conn_pool_for_remote(const char *remote_str,
303                          const char *remote_cn, const char *fqdn) {
304   void *vcpool;
305   conn_pool *cpool = NULL;
306   char queue_name[256] = "datastore_";
307   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s",
308            (remote_str && *remote_str) ? remote_str : "0.0.0.0",
309            fqdn ? fqdn : "default",
310            remote_cn ? remote_cn : "default");
311   pthread_mutex_lock(&ds_conns_lock);
312   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name,
313                         strlen(queue_name), &vcpool))
314     cpool = vcpool;
315   pthread_mutex_unlock(&ds_conns_lock);
316   if(!cpool) {
317     vcpool = cpool = calloc(1, sizeof(*cpool));
318     cpool->queue_name = strdup(queue_name);
319     pthread_mutex_init(&cpool->lock, NULL);
320     pthread_cond_init(&cpool->cv, NULL);
321     cpool->in_pool = 0;
322     cpool->outstanding = 0;
323     cpool->max_in_pool = 1;
324     cpool->max_allocated = 1;
325     pthread_mutex_lock(&ds_conns_lock);
326     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name),
327                         cpool)) {
328       noit_hash_retrieve(&ds_conns, (const char *)queue_name,
329                          strlen(queue_name), &vcpool);
330     }
331     pthread_mutex_unlock(&ds_conns_lock);
332     if(vcpool != cpool) {
333       /* someone beat us to it */
334       free(cpool->queue_name);
335       pthread_mutex_destroy(&cpool->lock);
336       pthread_cond_destroy(&cpool->cv);
337       free(cpool);
338     }
339     else {
340       int i;
341       /* Our job to setup the pool */
342       cpool->jobq = calloc(1, sizeof(*cpool->jobq));
343       eventer_jobq_init(cpool->jobq, queue_name);
344       cpool->jobq->backq = eventer_default_backq();
345       /* Add one thread */
346       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++)
347         eventer_jobq_increase_concurrency(cpool->jobq);
348     }
349     cpool = vcpool;
350   }
351   return cpool;
352 }
353 static conn_q *
354 get_conn_q_for_remote(const char *remote_str,
355                       const char *remote_cn, const char *fqdn,
356                       const char *dsn) {
357   conn_pool *cpool;
358   conn_q *cq;
359   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn);
360   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(),
361         cpool->queue_name);
362   pthread_mutex_lock(&cpool->lock);
363  again:
364   if(cpool->head) {
365     assert(cpool->in_pool > 0);
366     cq = cpool->head;
367     cpool->head = cq->next;
368     cpool->in_pool--;
369     cpool->outstanding++;
370     cq->next = NULL;
371     pthread_mutex_unlock(&cpool->lock);
372     return cq;
373   }
374   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) {
375     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n",
376           (void *)pthread_self(), cpool->queue_name);
377     pthread_cond_wait(&cpool->cv, &cpool->lock);
378     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n",
379           (void *)pthread_self(), cpool->queue_name);
380     goto again;
381   }
382   else {
383     cpool->outstanding++;
384     pthread_mutex_unlock(&cpool->lock);
385   }
386  
387   cq = calloc(1, sizeof(*cq));
388   cq->pool = cpool;
389   cq->remote_str = remote_str ? strdup(remote_str) : NULL;
390   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL;
391   cq->fqdn = fqdn ? strdup(fqdn) : NULL;
392   cq->dsn = dsn ? strdup(dsn) : NULL;
393   return cq;
394 }
395 static conn_q *
396 get_conn_q_for_metanode() {
397   return get_conn_q_for_remote(NULL,NULL,NULL,NULL);
398 }
399
400 typedef enum {
401   DS_EXEC_SUCCESS = 0,
402   DS_EXEC_ROW_FAILED = 1,
403   DS_EXEC_TXN_FAILED = 2,
404 } execute_outcome_t;
405
406 static char *
407 __noit__strndup(const char *src, int len) {
408   int slen;
409   char *dst;
410   for(slen = 0; slen < len; slen++)
411     if(src[slen] == '\0') break;
412   dst = malloc(slen + 1);
413   memcpy(dst, src, slen);
414   dst[slen] = '\0';
415   return dst;
416 }
417 #define DECLARE_PARAM_STR(str, len) do { \
418   d->paramValues[d->nparams] = __noit__strndup(str, len); \
419   d->paramLengths[d->nparams] = len; \
420   d->paramFormats[d->nparams] = 0; \
421   d->paramAllocd[d->nparams] = 1; \
422   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \
423     free(d->paramValues[d->nparams]); \
424     d->paramValues[d->nparams] = NULL; \
425     d->paramLengths[d->nparams] = 0; \
426     d->paramAllocd[d->nparams] = 0; \
427   } \
428   d->nparams++; \
429 } while(0)
430 #define DECLARE_PARAM_INT(i) do { \
431   int buffer__len; \
432   char buffer__[32]; \
433   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \
434   buffer__len = strlen(buffer__); \
435   DECLARE_PARAM_STR(buffer__, buffer__len); \
436 } while(0)
437
438 #define PG_GET_STR_COL(dest, row, name) do { \
439   int colnum = PQfnumber(d->res, name); \
440   dest = NULL; \
441   if (colnum >= 0) \
442     dest = PQgetisnull(d->res, row, colnum) \
443          ? NULL : PQgetvalue(d->res, row, colnum); \
444 } while(0)
445
446 #define PG_EXEC(cmd) do { \
447   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \
448                         (const char * const *)d->paramValues, \
449                         d->paramLengths, d->paramFormats, 0); \
450   d->rv = PQresultStatus(d->res); \
451   if(d->rv != PGRES_COMMAND_OK && \
452      d->rv != PGRES_TUPLES_OK) { \
453     const char *pgerr = PQresultErrorMessage(d->res); \
454     const char *pgerr_end = strchr(pgerr, '\n'); \
455     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
456     noitL(ds_err, "[%s] stratcon datasource bad (%d): %.*s\n", \
457           cq->fqdn ? cq->fqdn : "metanode", d->rv, \
458           (pgerr_end - pgerr), pgerr); \
459     PQclear(d->res); \
460     goto bad_row; \
461   } \
462 } while(0)
463
464 #define PG_TM_EXEC(cmd, whence) do { \
465   time_t __w = whence; \
466   char cmdbuf[4096]; \
467   struct tm tbuf, *tm; \
468   tm = gmtime_r(&__w, &tbuf); \
469   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \
470   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \
471                         (const char * const *)d->paramValues, \
472                         d->paramLengths, d->paramFormats, 0); \
473   d->rv = PQresultStatus(d->res); \
474   if(d->rv != PGRES_COMMAND_OK && \
475      d->rv != PGRES_TUPLES_OK) { \
476     const char *pgerr = PQresultErrorMessage(d->res); \
477     const char *pgerr_end = strchr(pgerr, '\n'); \
478     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \
479     noitL(ds_err, "stratcon datasource bad (%d): %.*s time: %llu\n", \
480           d->rv, (pgerr_end - pgerr), pgerr, \
481           (long long unsigned)whence); \
482     PQclear(d->res); \
483     goto bad_row; \
484   } \
485 } while(0)
486
487 static void *
488 stratcon_datastore_check_loadall(void *vsn) {
489   storagenode_info *sn = vsn;
490   ds_single_detail *d;
491   int i, row_count = 0, good = 0;
492   char buff[1024];
493   conn_q *cq = NULL;
494
495   d = calloc(1, sizeof(*d));
496   GET_QUERY(check_loadall);
497   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn);
498   i = 0;
499   while(stratcon_database_connect(cq)) {
500     if(i++ > 4) {
501       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn);
502       release_conn_q(cq);
503       return (void *)(vpsized_int)good;
504     }
505     sleep(1);
506   }
507   PG_EXEC(check_loadall);
508   row_count = PQntuples(d->res);
509  
510   for(i=0; i<row_count; i++) {
511     int rv;
512     int8_t family;
513     struct sockaddr *sin;
514     struct sockaddr_in sin4 = { .sin_family = AF_INET };
515     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 };
516     char *remote, *id, *target, *module, *name;
517     PG_GET_STR_COL(remote, i, "remote_address");
518     PG_GET_STR_COL(id, i, "id");
519     PG_GET_STR_COL(target, i, "target");
520     PG_GET_STR_COL(module, i, "module");
521     PG_GET_STR_COL(name, i, "name");
522     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name);
523
524     family = AF_INET;
525     sin = (struct sockaddr *)&sin4;
526     rv = inet_pton(family, remote, &sin4.sin_addr);
527     if(rv != 1) {
528       family = AF_INET6;
529       sin = (struct sockaddr *)&sin6;
530       rv = inet_pton(family, remote, &sin6.sin6_addr);
531       if(rv != 1) {
532         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote);
533         sin = NULL;
534       }
535     }
536
537     /* stratcon_iep_line_processor takes an allocated operand and frees it */
538     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL);
539     good++;
540   }
541   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n",
542         good, row_count, sn->fqdn);
543  bad_row:
544   free_params((ds_single_detail *)d);
545   free(d);
546   if(cq) release_conn_q(cq);
547   return (void *)(vpsized_int)good;
548 }
549 static int
550 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure,
551                                     struct timeval *now) {
552   storagenode_info self = { 0, NULL, NULL }, **sns = NULL;
553   pthread_t *jobs = NULL;
554   int nodes, i = 0, tcnt = 0;
555   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
556   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
557
558   pthread_mutex_lock(&storagenode_to_info_cache_lock);
559   nodes = storagenode_to_info_cache.size;
560   jobs = calloc(MAX(1,nodes), sizeof(*jobs));
561   sns = calloc(MAX(1,nodes), sizeof(*sns));
562   if(nodes == 0) sns[nodes++] = &self;
563   else {
564     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
565     const char *k;
566     void *v;
567     int klen;
568     while(noit_hash_next(&storagenode_to_info_cache,
569                          &iter, &k, &klen, &v)) {
570       sns[i++] = (storagenode_info *)v;
571     }
572   }
573   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
574
575   for(i=0; i<nodes; i++) {
576     if(pthread_create(&jobs[i], NULL,
577                       stratcon_datastore_check_loadall, sns[i]) != 0) {
578       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno));
579     }
580   }
581   for(i=0; i<nodes; i++) {
582     void *good;
583     pthread_join(jobs[i], &good);
584     tcnt += (int)(vpsized_int)good;
585   }
586   noitL(noit_error, "Loaded all %d check states.\n", tcnt);
587   return 0;
588 }
589 void
590 stratcon_datastore_iep_check_preload() {
591   eventer_t e;
592   conn_pool *cpool;
593
594   cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
595   e = eventer_alloc();
596   e->mask = EVENTER_ASYNCH;
597   e->callback = stratcon_datastore_asynch_drive_iep;
598   e->closure = NULL;
599   eventer_add_asynch(cpool->jobq, e);
600 }
601 execute_outcome_t
602 stratcon_datastore_find(ds_rt_detail *d) {
603   conn_q *cq;
604   char *val;
605   int row_count;
606   struct realtime_tracker *node;
607
608   for(node = d->rt; node; node = node->next) {
609     char uuid_str[UUID_STR_LEN+1];
610     const char *fqdn, *dsn, *remote_cn;
611     int storagenode_id;
612
613     uuid_unparse_lower(node->checkid, uuid_str);
614     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid,
615                                  &storagenode_id, &remote_cn, &fqdn, &dsn))
616       continue;
617
618     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn);
619     stratcon_database_connect(cq);
620
621     GET_QUERY(check_find);
622     DECLARE_PARAM_INT(node->sid);
623     PG_EXEC(check_find);
624     row_count = PQntuples(d->res);
625     if(row_count != 1) {
626       PQclear(d->res);
627       goto bad_row;
628     }
629
630     /* Get the check uuid */
631     PG_GET_STR_COL(val, 0, "id");
632     if(!val) {
633       PQclear(d->res);
634       goto bad_row;
635     }
636     if(uuid_parse(val, node->checkid)) {
637       PQclear(d->res);
638       goto bad_row;
639     }
640  
641     /* Get the remote_address (which noit owns this) */
642     PG_GET_STR_COL(val, 0, "remote_address");
643     if(!val) {
644       PQclear(d->res);
645       goto bad_row;
646     }
647     node->noit = strdup(val);
648  
649    bad_row:
650     free_params((ds_single_detail *)d);
651     d->nparams = 0;
652     release_conn_q(cq);
653   }
654   return DS_EXEC_SUCCESS;
655 }
656 execute_outcome_t
657 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn,
658                            ds_line_detail *d) {
659   int type, len, sid;
660   char *final_buff;
661   uLong final_len, actual_final_len;
662   char *token;
663   char raddr_blank[1] = "";
664   const char *raddr;
665
666   type = d->data[0];
667   raddr = r ? r : raddr_blank;
668
669   /* Parse the log line, but only if we haven't already */
670   if(!d->nparams) {
671     char *scp, *ecp;
672
673     scp = d->data;
674 #define PROCESS_NEXT_FIELD(t,l) do { \
675   if(!*scp) goto bad_row; \
676   ecp = strchr(scp, '\t'); \
677   if(!ecp) goto bad_row; \
678   token = scp; \
679   len = (ecp-scp); \
680   scp = ecp + 1; \
681 } while(0)
682 #define PROCESS_LAST_FIELD(t,l) do { \
683   if(!*scp) ecp = scp; \
684   else { \
685     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
686     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
687   } \
688   t = scp; \
689   l = (ecp-scp); \
690 } while(0)
691
692     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
693     switch(type) {
694       /* See noit_check_log.c for log description */
695       case 'n':
696         DECLARE_PARAM_STR(raddr, strlen(raddr));
697         DECLARE_PARAM_STR("noitd",5); /* node_type */
698         PROCESS_NEXT_FIELD(token,len);
699         d->whence = (time_t)strtoul(token, NULL, 10);
700         DECLARE_PARAM_STR(token,len); /* timestamp */
701
702         /* This is the expected uncompressed len */
703         PROCESS_NEXT_FIELD(token,len);
704         final_len = atoi(token);
705         final_buff = malloc(final_len);
706         if(!final_buff) goto bad_row;
707  
708         /* The last token is b64 endoded and compressed.
709          * we need to decode it, declare it and then free it.
710          */
711         PROCESS_LAST_FIELD(token, len);
712         /* We can in-place decode this */
713         len = noit_b64_decode((char *)token, len,
714                               (unsigned char *)token, len);
715         if(len <= 0) {
716           noitL(noit_error, "noitd config base64 decoding error.\n");
717           free(final_buff);
718           goto bad_row;
719         }
720         actual_final_len = final_len;
721         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
722                               (unsigned char *)token, len)) {
723           noitL(noit_error, "noitd config decompression failure.\n");
724           free(final_buff);
725           goto bad_row;
726         }
727         if(final_len != actual_final_len) {
728           noitL(noit_error, "noitd config decompression error.\n");
729           free(final_buff);
730           goto bad_row;
731         }
732         DECLARE_PARAM_STR(final_buff, final_len);
733         free(final_buff);
734         break;
735       case 'C':
736         DECLARE_PARAM_STR(raddr, strlen(raddr));
737         PROCESS_NEXT_FIELD(token,len);
738         DECLARE_PARAM_STR(token,len); /* timestamp */
739         d->whence = (time_t)strtoul(token, NULL, 10);
740         PROCESS_NEXT_FIELD(token, len);
741         sid = uuid_to_sid(token, remote_cn);
742         if(sid == 0) goto bad_row;
743         DECLARE_PARAM_INT(sid); /* sid */
744         DECLARE_PARAM_STR(token,len); /* uuid */
745         PROCESS_NEXT_FIELD(token, len);
746         DECLARE_PARAM_STR(token,len); /* target */
747         PROCESS_NEXT_FIELD(token, len);
748         DECLARE_PARAM_STR(token,len); /* module */
749         PROCESS_LAST_FIELD(token, len);
750         DECLARE_PARAM_STR(token,len); /* name */
751         break;
752       case 'M':
753         PROCESS_NEXT_FIELD(token,len);
754         DECLARE_PARAM_STR(token,len); /* timestamp */
755         d->whence = (time_t)strtoul(token, NULL, 10);
756         PROCESS_NEXT_FIELD(token, len);
757         sid = uuid_to_sid(token, remote_cn);
758         if(sid == 0) goto bad_row;
759         DECLARE_PARAM_INT(sid); /* sid */
760         PROCESS_NEXT_FIELD(token, len);
761         DECLARE_PARAM_STR(token,len); /* name */
762         PROCESS_NEXT_FIELD(token,len);
763         d->metric_type = *token;
764         PROCESS_LAST_FIELD(token,len);
765         DECLARE_PARAM_STR(token,len); /* value */
766         break;
767       case 'S':
768         PROCESS_NEXT_FIELD(token,len);
769         DECLARE_PARAM_STR(token,len); /* timestamp */
770         d->whence = (time_t)strtoul(token, NULL, 10);
771         PROCESS_NEXT_FIELD(token, len);
772         sid = uuid_to_sid(token, remote_cn);
773         if(sid == 0) goto bad_row;
774         DECLARE_PARAM_INT(sid); /* sid */
775         PROCESS_NEXT_FIELD(token, len);
776         DECLARE_PARAM_STR(token,len); /* state */
777         PROCESS_NEXT_FIELD(token, len);
778         DECLARE_PARAM_STR(token,len); /* availability */
779         PROCESS_NEXT_FIELD(token, len);
780         DECLARE_PARAM_STR(token,len); /* duration */
781         PROCESS_LAST_FIELD(token,len);
782         DECLARE_PARAM_STR(token,len); /* status */
783         break;
784       default:
785         goto bad_row;
786     }
787
788   }
789
790   /* Now execute the query */
791   switch(type) {
792     case 'n':
793       GET_QUERY(config_insert);
794       PG_EXEC(config_insert);
795       PQclear(d->res);
796       break;
797     case 'C':
798       GET_QUERY(check_insert);
799       PG_TM_EXEC(check_insert, d->whence);
800       PQclear(d->res);
801       break;
802     case 'S':
803       GET_QUERY(status_insert);
804       PG_TM_EXEC(status_insert, d->whence);
805       PQclear(d->res);
806       break;
807     case 'M':
808       switch(d->metric_type) {
809         case METRIC_INT32:
810         case METRIC_UINT32:
811         case METRIC_INT64:
812         case METRIC_UINT64:
813         case METRIC_DOUBLE:
814           GET_QUERY(metric_insert_numeric);
815           PG_TM_EXEC(metric_insert_numeric, d->whence);
816           PQclear(d->res);
817           break;
818         case METRIC_STRING:
819           GET_QUERY(metric_insert_text);
820           PG_TM_EXEC(metric_insert_text, d->whence);
821           PQclear(d->res);
822           break;
823         default:
824           goto bad_row;
825       }
826       break;
827     default:
828       /* should never get here */
829       goto bad_row;
830   }
831   return DS_EXEC_SUCCESS;
832  bad_row:
833   return DS_EXEC_ROW_FAILED;
834 }
835 static int
836 stratcon_database_post_connect(conn_q *cq) {
837   int rv = 0;
838   ds_single_detail _d = { 0 }, *d = &_d;
839   if(cq->fqdn) {
840     char *remote_str, *remote_cn;
841     /* This is the silly way we get null's in through our declare_param_str */
842     remote_str = cq->remote_str ? cq->remote_str : "[[null]]";
843     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]";
844     /* This is a storage node, it gets the storage node post_connect */
845     GET_QUERY(storage_post_connect);
846     rv = -1; /* now we're serious */
847     DECLARE_PARAM_STR(remote_str, strlen(remote_str));
848     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
849     PG_EXEC(storage_post_connect);
850     PQclear(d->res);
851     rv = 0;
852   }
853   else {
854     /* Metanode post_connect */
855     GET_QUERY(metanode_post_connect);
856     rv = -1; /* now we're serious */
857     PG_EXEC(metanode_post_connect);
858     PQclear(d->res);
859     rv = 0;
860   }
861  bad_row:
862   free_params(d);
863   if(rv == -1) {
864     /* Post-connect intentions are serious and fatal */
865     PQfinish(cq->dbh);
866     cq->dbh = NULL;
867   }
868   return rv;
869 }
870 static int
871 stratcon_database_connect(conn_q *cq) {
872   char *dsn, dsn_meta[512];
873   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
874   const char *k, *v;
875   int klen;
876   noit_hash_table *t;
877
878   dsn_meta[0] = '\0';
879   if(!cq->dsn) {
880     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");
881     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {
882       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta));
883       strlcat(dsn_meta, k, sizeof(dsn_meta));
884       strlcat(dsn_meta, "=", sizeof(dsn_meta));
885       strlcat(dsn_meta, v, sizeof(dsn_meta));
886     }
887     noit_hash_destroy(t, free, free);
888     free(t);
889     dsn = dsn_meta;
890   }
891   else dsn = cq->dsn;
892
893   if(cq->dbh) {
894     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
895     PQreset(cq->dbh);
896     if(PQstatus(cq->dbh) != CONNECTION_OK) {
897       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
898             dsn, PQerrorMessage(cq->dbh));
899       return -1;
900     }
901     if(stratcon_database_post_connect(cq)) return -1;
902     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
903     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
904           dsn, PQerrorMessage(cq->dbh));
905     return -1;
906   }
907
908   cq->dbh = PQconnectdb(dsn);
909   if(!cq->dbh) return -1;
910   if(PQstatus(cq->dbh) != CONNECTION_OK) {
911     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n",
912           dsn, PQerrorMessage(cq->dbh));
913     return -1;
914   }
915   if(stratcon_database_post_connect(cq)) return -1;
916   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0;
917   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n",
918         dsn, PQerrorMessage(cq->dbh));
919   return -1;
920 }
921 static int
922 stratcon_datastore_savepoint_op(conn_q *cq, const char *p,
923                                 const char *name) {
924   int rv = -1;
925   PGresult *res;
926   char cmd[128];
927   strlcpy(cmd, p, sizeof(cmd));
928   strlcat(cmd, name, sizeof(cmd));
929   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
930   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
931   PQclear(res);
932   return rv;
933 }
934 static int
935 stratcon_datastore_do(conn_q *cq, const char *cmd) {
936   PGresult *res;
937   int rv = -1;
938   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1;
939   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0;
940   PQclear(res);
941   return rv;
942 }
943 #define BUSTED(cq) do { \
944   PQfinish((cq)->dbh); \
945   (cq)->dbh = NULL; \
946   goto full_monty; \
947 } while(0)
948 #define SAVEPOINT(name) do { \
949   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \
950   last_sp = current; \
951 } while(0)
952 #define ROLLBACK_TO_SAVEPOINT(name) do { \
953   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \
954     BUSTED(cq); \
955   last_sp = NULL; \
956 } while(0)
957 #define RELEASE_SAVEPOINT(name) do { \
958   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \
959     BUSTED(cq); \
960   last_sp = NULL; \
961 } while(0)
962 int
963 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure,
964                                  struct timeval *now) {
965   ds_rt_detail *dsjd = closure;
966   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
967   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
968
969   assert(dsjd->rt);
970   stratcon_datastore_find(dsjd);
971   if(dsjd->completion_event)
972     eventer_add(dsjd->completion_event);
973
974   free_params((ds_single_detail *)dsjd);
975   free(dsjd);
976   return 0;
977 }
978 static const char *
979 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) {
980   void *vinfo;
981   const char *dsn = NULL, *fqdn = NULL;
982   int found = 0;
983   storagenode_info *info = NULL;
984   pthread_mutex_lock(&storagenode_to_info_cache_lock);
985   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id),
986                         &vinfo)) {
987     found = 1;
988     info = vinfo;
989   }
990   pthread_mutex_unlock(&storagenode_to_info_cache_lock);
991   if(found) {
992     if(fqdn_out) *fqdn_out = info->fqdn;
993     return info->dsn;
994   }
995
996   if(!found && can_use_db) {
997     ds_single_detail *d;
998     conn_q *cq;
999     int row_count;
1000     /* Look it up and store it */
1001     d = calloc(1, sizeof(*d));
1002     cq = get_conn_q_for_metanode();
1003     GET_QUERY(find_storage);
1004     DECLARE_PARAM_INT(id);
1005     PG_EXEC(find_storage);
1006     row_count = PQntuples(d->res);
1007     if(row_count) {
1008       PG_GET_STR_COL(dsn, 0, "dsn");
1009       PG_GET_STR_COL(fqdn, 0, "fqdn");
1010     }
1011     PQclear(d->res);
1012    bad_row:
1013     free_params(d);
1014     free(d);
1015     release_conn_q(cq);
1016   }
1017   if(fqdn) {
1018     info = calloc(1, sizeof(*info));
1019     info->fqdn = strdup(fqdn);
1020     if(fqdn_out) *fqdn_out = info->fqdn;
1021     info->dsn = dsn ? strdup(dsn) : NULL;
1022     info->storagenode_id = id;
1023     pthread_mutex_lock(&storagenode_to_info_cache_lock);
1024     noit_hash_store(&storagenode_to_info_cache,
1025                     (void *)&info->storagenode_id, sizeof(int), info);
1026     pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1027   }
1028   return info ? info->dsn : NULL;
1029 }
1030 static ds_line_detail *
1031 build_insert_batch(interim_journal_t *ij) {
1032   int rv;
1033   off_t len;
1034   const char *buff, *cp, *lcp;
1035   struct stat st;
1036   ds_line_detail *head = NULL, *last = NULL, *next = NULL;
1037
1038   if(ij->fd < 0) {
1039     ij->fd = open(ij->filename, O_RDONLY);
1040     if(ij->fd < 0) {
1041       noitL(noit_error, "Cannot open interim journal '%s': %s\n",
1042             ij->filename, strerror(errno));
1043       assert(ij->fd >= 0);
1044     }
1045   }
1046   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR);
1047   assert(rv != -1);
1048   len = st.st_size;
1049   buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0);
1050   if(buff == (void *)-1) {
1051     noitL(noit_error, "mmap(%s) => %s\n", ij->filename, strerror(errno));
1052     assert(buff != (void *)-1);
1053   }
1054   lcp = buff;
1055   while(lcp < (buff + len) &&
1056         NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) {
1057     next = calloc(1, sizeof(*next));
1058     next->data = malloc(cp - lcp + 1);
1059     memcpy(next->data, lcp, cp - lcp);
1060     next->data[cp - lcp] = '\0';
1061     if(!head) head = next;
1062     if(last) last->next = next;
1063     last = next;
1064     lcp = cp + 1;
1065   }
1066   munmap((void *)buff, len);
1067   close(ij->fd);
1068   return head;
1069 }
1070 static void
1071 interim_journal_remove(interim_journal_t *ij) {
1072   unlink(ij->filename);
1073   if(ij->filename) free(ij->filename);
1074   if(ij->remote_str) free(ij->remote_str);
1075   if(ij->remote_cn) free(ij->remote_cn);
1076   if(ij->fqdn) free(ij->fqdn);
1077 }
1078 int
1079 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure,
1080                                   struct timeval *now) {
1081   int i, total, success, sp_total, sp_success;
1082   interim_journal_t *ij;
1083   ds_line_detail *head, *current, *last_sp;
1084   const char *dsn;
1085   conn_q *cq;
1086   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1087   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1088
1089   ij = closure;
1090   dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn);
1091   if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */
1092   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn,
1093                              ij->fqdn, dsn);
1094   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n",
1095         ij->remote_str, ij->remote_cn, ij->fqdn);
1096  full_monty:
1097   /* Make sure we have a connection */
1098   i = 1;
1099   while(stratcon_database_connect(cq)) {
1100     noitL(noit_error, "Error connecting to database: %s\n",
1101           ij->fqdn ? ij->fqdn : "(null)");
1102     sleep(i);
1103     i *= 2;
1104     i = MIN(i, 16);
1105   }
1106
1107   head = build_insert_batch(ij);
1108   noitL(ds_deb, "Starting batch from %s/%s to %s\n",
1109         ij->remote_str ? ij->remote_str : "(null)",
1110         ij->remote_cn ? ij->remote_cn : "(null)",
1111         ij->fqdn ? ij->fqdn : "(null)");
1112   current = head;
1113   last_sp = NULL;
1114   total = success = sp_total = sp_success = 0;
1115   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq);
1116   while(current) {
1117     execute_outcome_t rv;
1118     if(current->data) {
1119       if(!last_sp) {
1120         SAVEPOINT("batch");
1121         sp_success = success;
1122         sp_total = total;
1123       }
1124  
1125       if(current->problematic) {
1126         RELEASE_SAVEPOINT("batch");
1127         current = current->next;
1128         total++;
1129         continue;
1130       }
1131       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn,
1132                                       current);
1133       switch(rv) {
1134         case DS_EXEC_SUCCESS:
1135           total++;
1136           success++;
1137           current = current->next;
1138           break;
1139         case DS_EXEC_ROW_FAILED:
1140           /* rollback to savepoint, mark this record as bad and start again */
1141           noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data);
1142           current->problematic = 1;
1143           current = last_sp;
1144           success = sp_success;
1145           total = sp_total;
1146           ROLLBACK_TO_SAVEPOINT("batch");
1147           break;
1148         case DS_EXEC_TXN_FAILED:
1149           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename);
1150           BUSTED(cq);
1151       }
1152     }
1153   }
1154   if(last_sp) RELEASE_SAVEPOINT("batch");
1155   if(stratcon_datastore_do(cq, "COMMIT")) {
1156     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename);
1157     BUSTED(cq);
1158   }
1159   /* Cleanup the mess */
1160   while(head) {
1161     ds_line_detail *tofree;
1162     tofree = head;
1163     head = head->next;
1164     if(tofree->data) free(tofree->data);
1165     free_params((ds_single_detail *)tofree);
1166     free(tofree);
1167   }
1168   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n",
1169         ij->remote_str ? ij->remote_str : "(null)",
1170         ij->remote_cn ? ij->remote_cn : "(null)",
1171         ij->fqdn ? ij->fqdn : "(null)", success, total);
1172   interim_journal_remove(ij);
1173   release_conn_q(cq);
1174   return 0;
1175 }
1176 static int
1177 stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure,
1178                                 struct timeval *now) {
1179   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1180   const char *k;
1181   int klen;
1182   void *vij;
1183   interim_journal_t *ij;
1184   syncset_t *syncset = closure;
1185
1186   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
1187   if(mask & EVENTER_ASYNCH_CLEANUP) return 0;
1188
1189   noitL(ds_deb, "Syncing journal sets...\n");
1190   while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) {
1191     eventer_t ingest;
1192     ij = vij;
1193     noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n",
1194           ij->remote_str, ij->remote_cn, ij->fqdn);
1195     fsync(ij->fd);
1196     close(ij->fd);
1197     ij->fd = -1;
1198     ingest = eventer_alloc();
1199     ingest->mask = EVENTER_ASYNCH;
1200     ingest->callback = stratcon_datastore_asynch_execute;
1201     ingest->closure = ij;
1202     eventer_add_asynch(ij->cpool->jobq, ingest);
1203   }
1204   noit_hash_destroy(syncset->ws, free, NULL);
1205   free(syncset->ws);
1206   eventer_add(syncset->completion);
1207   free(syncset);
1208   return 0;
1209 }
1210 static interim_journal_t *
1211 interim_journal_get(struct sockaddr *remote, const char *remote_cn_in,
1212                     int storagenode_id, const char *fqdn_in) {
1213   void *vhash, *vij;
1214   noit_hash_table *working_set;
1215   interim_journal_t *ij;
1216   struct timeval now;
1217   char jpath[PATH_MAX];
1218   char remote_str[128];
1219   const char *remote_cn = remote_cn_in ? remote_cn_in : "default";
1220   const char *fqdn = fqdn_in ? fqdn_in : "default";
1221
1222   convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote);
1223   if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str));
1224
1225   /* Lookup the working set */
1226   if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1227     working_set = calloc(1, sizeof(*working_set));
1228     noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn),
1229                     working_set);
1230   }
1231   else
1232     working_set = vhash;
1233
1234   /* Lookup the interim journal within the working set */
1235   if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) {
1236     ij = calloc(1, sizeof(*ij));
1237     gettimeofday(&now, NULL);
1238     snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x",
1239              basejpath, remote_str, remote_cn, storagenode_id,
1240              (unsigned int)now.tv_sec, (unsigned int)now.tv_usec);
1241     ij->remote_str = strdup(remote_str);
1242     ij->remote_cn = strdup(remote_cn);
1243     ij->fqdn = fqdn_in ? strdup(fqdn_in) : NULL;
1244     ij->storagenode_id = storagenode_id;
1245     ij->filename = strdup(jpath);
1246     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1247                                          ij->fqdn);
1248     ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1249     if(ij->fd < 0 && errno == ENOENT) {
1250       if(mkdir_for_file(ij->filename, 0750)) {
1251         noitL(noit_error, "Failed to create dir for '%s': %s\n",
1252               ij->filename, strerror(errno));
1253         exit(-1);
1254       }
1255       ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640);
1256     }
1257     if(ij->fd < 0) {
1258       noitL(noit_error, "Failed to open interim journal '%s': %s\n",
1259             ij->filename, strerror(errno));
1260       exit(-1);
1261     }
1262     noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij);
1263   }
1264   else
1265     ij = vij;
1266
1267   return ij;
1268 }
1269 static int
1270 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn,
1271                           int *sid_out, int *storagenode_id_out,
1272                           const char **remote_cn_out,
1273                           const char **fqdn_out, const char **dsn_out) {
1274   /* only called from the main thread -- no safety issues */
1275   void *vuuidinfo, *vinfo;
1276   uuid_info *uuidinfo;
1277   storagenode_info *info = NULL;
1278   char *fqdn = NULL;
1279   char *dsn = NULL;
1280   int storagenode_id = 0, sid = 0;
1281   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str),
1282                          &vuuidinfo)) {
1283     int row_count = 0;
1284     char *tmpint;
1285     ds_single_detail *d;
1286     conn_q *cq;
1287
1288     /* We can't do a database lookup without the remote_cn */
1289     if(!remote_cn) return -1;
1290
1291     d = calloc(1, sizeof(*d));
1292     cq = get_conn_q_for_metanode();
1293     if(stratcon_database_connect(cq) == 0) {
1294       /* Blocking call to service the cache miss */
1295       GET_QUERY(check_map);
1296       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str));
1297       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn));
1298       PG_EXEC(check_map);
1299       row_count = PQntuples(d->res);
1300       if(row_count != 1) {
1301         PQclear(d->res);
1302         goto bad_row;
1303       }
1304       PG_GET_STR_COL(tmpint, 0, "sid");
1305       sid = atoi(tmpint);
1306       PG_GET_STR_COL(tmpint, 0, "storage_node_id");
1307       if(tmpint) storagenode_id = atoi(tmpint);
1308       PG_GET_STR_COL(fqdn, 0, "fqdn");
1309       PG_GET_STR_COL(dsn, 0, "dsn");
1310       PQclear(d->res);
1311     }
1312    bad_row:
1313     free_params((ds_single_detail *)d);
1314     free(d);
1315     release_conn_q(cq);
1316     if(row_count != 1) return -1;
1317     /* Place in cache */
1318     if(fqdn) fqdn = strdup(fqdn);
1319     uuidinfo = calloc(1, sizeof(*uuidinfo));
1320     uuidinfo->sid = sid;
1321     uuidinfo->uuid_str = strdup(uuid_str);
1322     uuidinfo->storagenode_id = storagenode_id;
1323     uuidinfo->remote_cn = strdup(remote_cn);
1324     noit_hash_store(&uuid_to_info_cache,
1325                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1326     /* Also, we may have just witnessed a new storage node, store it */
1327     if(storagenode_id) {
1328       int needs_free = 0;
1329       info = calloc(1, sizeof(*info));
1330       info->storagenode_id = storagenode_id;
1331       info->dsn = dsn ? strdup(dsn) : NULL;
1332       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1333       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1334       if(!noit_hash_retrieve(&storagenode_to_info_cache,
1335                              (void *)&storagenode_id, sizeof(int), &vinfo)) {
1336         /* hack to save memory -- we *never* remove from these caches,
1337            so we can use the same fqdn value in the above cache for the key
1338            in the cache below -- (no strdup) */
1339         noit_hash_store(&storagenode_to_info_cache,
1340                         (void *)&info->storagenode_id, sizeof(int), info);
1341       }
1342       else needs_free = 1;
1343       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1344       if(needs_free) {
1345         if(info->dsn) free(info->dsn);
1346         if(info->fqdn) free(info->fqdn);
1347         free(info);
1348       }
1349     }
1350   }
1351   else
1352     uuidinfo = vuuidinfo;
1353
1354   if(uuidinfo && uuidinfo->storagenode_id) {
1355     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) {
1356       /* we don't have dsn and we actually want it */
1357       pthread_mutex_lock(&storagenode_to_info_cache_lock);
1358       if(noit_hash_retrieve(&storagenode_to_info_cache,
1359                             (void *)&uuidinfo->storagenode_id, sizeof(int),
1360                             &vinfo))
1361         info = vinfo;
1362       pthread_mutex_unlock(&storagenode_to_info_cache_lock);
1363     }
1364   }
1365
1366   if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL);
1367   if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL);
1368   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn;
1369   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id;
1370   if(sid_out) *sid_out = uuidinfo->sid;
1371   return 0;
1372 }
1373 static int
1374 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) {
1375   char uuid_str[UUID_STR_LEN+1];
1376   int sid = 0;
1377   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str));
1378   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL);
1379   return sid;
1380 }
1381 static void
1382 stratcon_datastore_journal(struct sockaddr *remote,
1383                            const char *remote_cn, const char *line) {
1384   interim_journal_t *ij = NULL;
1385   char uuid_str[UUID_STR_LEN+1], *cp;
1386   const char *fqdn = NULL, *dsn = NULL;
1387   int storagenode_id = 0;
1388   uuid_t checkid;
1389   if(!line) return;
1390   /* if it is a UUID based thing, find the storage node */
1391   switch(*line) {
1392     case 'C':
1393     case 'S':
1394     case 'M':
1395       if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) {
1396         strlcpy(uuid_str, cp + 1, sizeof(uuid_str));
1397         if(!uuid_parse(uuid_str, checkid)) {
1398           storage_node_quick_lookup(uuid_str, remote_cn, NULL,
1399                                     &storagenode_id, NULL, &fqdn, &dsn);
1400           ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn);
1401         }
1402       }
1403       break;
1404     case 'n':
1405       ij = interim_journal_get(remote,remote_cn,0,NULL);
1406       break;
1407     default:
1408       break;
1409   }
1410   if(!ij) {
1411     noitL(ingest_err, "%d\t%s\n", storagenode_id, line);
1412   }
1413   else {
1414     int len;
1415     len = write(ij->fd, line, strlen(line));
1416     if(len < 0) {
1417       noitL(noit_error, "write to %s failed: %s\n",
1418             ij->filename, strerror(errno));
1419     }
1420   }
1421   return;
1422 }
1423 static noit_hash_table *
1424 stratcon_datastore_journal_remove(struct sockaddr *remote,
1425                                   const char *remote_cn) {
1426   void *vhash = NULL;
1427   if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) {
1428     /* pluck it out */
1429     noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL);
1430   }
1431   else {
1432     noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n",
1433           remote_cn);
1434     abort();
1435   }
1436   return vhash;
1437 }
1438 void
1439 stratcon_datastore_push(stratcon_datastore_op_t op,
1440                         struct sockaddr *remote,
1441                         const char *remote_cn, void *operand,
1442                         eventer_t completion) {
1443   conn_pool *cpool;
1444   syncset_t *syncset;
1445   eventer_t e;
1446   ds_rt_detail *rtdetail;
1447   struct datastore_onlooker_list *nnode;
1448
1449   for(nnode = onlookers; nnode; nnode = nnode->next)
1450     nnode->dispatch(op,remote,remote_cn,operand);
1451
1452   switch(op) {
1453     case DS_OP_INSERT:
1454       stratcon_datastore_journal(remote, remote_cn, (const char *)operand);
1455       break;
1456     case DS_OP_CHKPT:
1457       e = eventer_alloc();
1458       syncset = calloc(1, sizeof(*syncset));
1459       e->mask = EVENTER_ASYNCH;
1460       e->callback = stratcon_datastore_journal_sync;
1461       syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn);
1462       syncset->completion = completion;
1463       e->closure = syncset;
1464       eventer_add(e);
1465       break;
1466     case DS_OP_FIND_COMPLETE:
1467       cpool = get_conn_pool_for_remote(NULL,NULL,NULL);
1468       rtdetail = calloc(1, sizeof(*rtdetail));
1469       rtdetail->rt = operand;
1470       rtdetail->completion_event = completion;
1471       e = eventer_alloc();
1472       e->mask = EVENTER_ASYNCH;
1473       e->callback = stratcon_datastore_asynch_lookup;
1474       e->closure = rtdetail;
1475       eventer_add_asynch(cpool->jobq, e);
1476       break;
1477   }
1478 }
1479
1480 int
1481 stratcon_datastore_saveconfig(void *unused) {
1482   int rv = -1;
1483   char *buff;
1484   ds_single_detail _d = { 0 }, *d = &_d;
1485   conn_q *cq;
1486   cq = get_conn_q_for_metanode();
1487
1488   if(stratcon_database_connect(cq) == 0) {
1489     char time_as_str[20];
1490     size_t len;
1491     buff = noit_conf_xml_in_mem(&len);
1492     if(!buff) goto bad_row;
1493
1494     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL));
1495     DECLARE_PARAM_STR("0.0.0.0", 7);
1496     DECLARE_PARAM_STR("stratcond", 9);
1497     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str));
1498     DECLARE_PARAM_STR(buff, len);
1499     free(buff);
1500
1501     GET_QUERY(config_insert);
1502     PG_EXEC(config_insert);
1503     PQclear(d->res);
1504     rv = 0;
1505
1506     bad_row:
1507       free_params(d);
1508   }
1509   release_conn_q(cq);
1510   return rv;
1511 }
1512
1513 void
1514 stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t,
1515                                                struct sockaddr *,
1516                                                const char *, void *)) {
1517   struct datastore_onlooker_list *nnode;
1518   nnode = calloc(1, sizeof(*nnode));
1519   nnode->dispatch = f;
1520   nnode->next = onlookers;
1521   while(noit_atomic_casptr((volatile void **)&onlookers,
1522                            nnode, nnode->next) != (void *)nnode->next)
1523     nnode->next = onlookers;
1524 }
1525 static void
1526 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn,
1527                                          char *id_str, char *file) {
1528   char path[PATH_MAX];
1529   interim_journal_t *ij;
1530   eventer_t ingest;
1531
1532   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s",
1533            basejpath, remote_str, remote_cn, id_str, file);
1534   ij = calloc(1, sizeof(*ij));
1535   ij->fd = open(path, O_RDONLY);
1536   if(ij->fd < 0) {
1537     noitL(noit_error, "cannot open journal '%s': %s\n",
1538           path, strerror(errno));
1539     free(ij);
1540     return;
1541   }
1542   close(ij->fd);
1543   ij->fd = -1;
1544   ij->filename = strdup(path);
1545   ij->remote_str = strdup(remote_str);
1546   ij->remote_cn = strdup(remote_cn);
1547   ij->storagenode_id = atoi(id_str);
1548   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn,
1549                                        ij->fqdn);
1550   noitL(noit_error, "ingesting old payload: %s\n", ij->filename);
1551   ingest = eventer_alloc();
1552   ingest->mask = EVENTER_ASYNCH;
1553   ingest->callback = stratcon_datastore_asynch_execute;
1554   ingest->closure = ij;
1555   eventer_add_asynch(ij->cpool->jobq, ingest);
1556 }
1557 static void
1558 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) {
1559   char path[PATH_MAX];
1560   DIR *root;
1561   struct dirent *de, *entry;
1562   int i = 0, cnt = 0;
1563   char **entries;
1564   int size = 0;
1565
1566   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath,
1567            first ? "/" : "", first ? first : "",
1568            second ? "/" : "", second ? second : "",
1569            third ? "/" : "", third ? third : "");
1570 #ifdef _PC_NAME_MAX
1571   size = pathconf(path, _PC_NAME_MAX);
1572 #endif
1573   size = MIN(size, PATH_MAX + 128);
1574   de = alloca(size);
1575   root = opendir(path);
1576   if(!root) return;
1577   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) cnt++;
1578   closedir(root);
1579   root = opendir(path);
1580   if(!root) return;
1581   entries = malloc(sizeof(*entries) * cnt);
1582   while(portable_readdir_r(root, de, &entry) != 0 && entry != NULL) {
1583     if(i < cnt) {
1584       entries[i++] = strdup(entry->d_name);
1585     }
1586   }
1587   closedir(root);
1588   cnt = i; /* could have changed, directories are fickle */
1589   qsort(entries, i, sizeof(*entries),
1590         (int (*)(const void *, const void *))strcasecmp);
1591   for(i=0; i<cnt; i++) {
1592     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue;
1593     noitL(ds_deb, "Processing L%d entry '%s'\n",
1594           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]);
1595     if(!first)
1596       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL);
1597     else if(!second)
1598       stratcon_datastore_sweep_journals_int(first, entries[i], NULL);
1599     else if(!third)
1600       stratcon_datastore_sweep_journals_int(first, second, entries[i]);
1601     else if(strlen(entries[i]) == 16)
1602       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]);
1603   }
1604 }
1605 static void
1606 stratcon_datastore_sweep_journals() {
1607   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL);
1608 }
1609
1610 int
1611 stratcon_datastore_ingest_all_storagenode_info() {
1612   int i, cnt = 0;
1613   ds_single_detail _d = { 0 }, *d = &_d;
1614   conn_q *cq;
1615   cq = get_conn_q_for_metanode();
1616
1617   while(stratcon_database_connect(cq)) {
1618     noitL(noit_error, "Error connecting to database\n");
1619     sleep(1);
1620   }
1621
1622   GET_QUERY(all_storage);
1623   PG_EXEC(all_storage);
1624   cnt = PQntuples(d->res);
1625   for(i=0; i<cnt; i++) {
1626     void *vinfo;
1627     char *tmpint, *fqdn, *dsn;
1628     int storagenode_id;
1629     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1630     storagenode_id = atoi(tmpint);
1631     PG_GET_STR_COL(fqdn, i, "fqdn");
1632     PG_GET_STR_COL(dsn, i, "dsn");
1633     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1634     storagenode_id = tmpint ? atoi(tmpint) : 0;
1635
1636     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1637                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1638       storagenode_info *info;
1639       info = calloc(1, sizeof(*info));
1640       info->storagenode_id = storagenode_id;
1641       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1642       info->dsn = dsn ? strdup(dsn) : NULL;
1643       noit_hash_store(&storagenode_to_info_cache,
1644                       (void *)&info->storagenode_id, sizeof(int), info);
1645       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1646             info->storagenode_id,
1647             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1648     }
1649   }
1650   PQclear(d->res);
1651  bad_row:
1652   free_params(d);
1653
1654   release_conn_q(cq);
1655   noitL(noit_error, "Loaded %d storage nodes\n", cnt);
1656   return cnt;
1657 }
1658 int
1659 stratcon_datastore_ingest_all_check_info() {
1660   int i, cnt, loaded = 0;
1661   ds_single_detail _d = { 0 }, *d = &_d;
1662   conn_q *cq;
1663   cq = get_conn_q_for_metanode();
1664
1665   while(stratcon_database_connect(cq)) {
1666     noitL(noit_error, "Error connecting to database\n");
1667     sleep(1);
1668   }
1669
1670   GET_QUERY(check_mapall);
1671   PG_EXEC(check_mapall);
1672   cnt = PQntuples(d->res);
1673   for(i=0; i<cnt; i++) {
1674     void *vinfo;
1675     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn;
1676     int sid, storagenode_id;
1677     uuid_info *uuidinfo;
1678     PG_GET_STR_COL(uuid_str, i, "id");
1679     if(!uuid_str) continue;
1680     PG_GET_STR_COL(tmpint, i, "sid");
1681     if(!tmpint) continue;
1682     sid = atoi(tmpint);
1683     PG_GET_STR_COL(fqdn, i, "fqdn");
1684     PG_GET_STR_COL(dsn, i, "dsn");
1685     PG_GET_STR_COL(remote_cn, i, "remote_cn");
1686     PG_GET_STR_COL(tmpint, i, "storage_node_id");
1687     storagenode_id = tmpint ? atoi(tmpint) : 0;
1688
1689     uuidinfo = calloc(1, sizeof(*uuidinfo));
1690     uuidinfo->uuid_str = strdup(uuid_str);
1691     uuidinfo->remote_cn = strdup(remote_cn);
1692     uuidinfo->storagenode_id = storagenode_id;
1693     uuidinfo->sid = sid;
1694     noit_hash_store(&uuid_to_info_cache,
1695                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo);
1696     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n",
1697           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id);
1698     loaded++;
1699     if(!noit_hash_retrieve(&storagenode_to_info_cache,
1700                            (void *)&storagenode_id, sizeof(int), &vinfo)) {
1701       storagenode_info *info;
1702       info = calloc(1, sizeof(*info));
1703       info->storagenode_id = storagenode_id;
1704       info->fqdn = fqdn ? strdup(fqdn) : NULL;
1705       info->dsn = dsn ? strdup(dsn) : NULL;
1706       noit_hash_store(&storagenode_to_info_cache,
1707                       (void *)&info->storagenode_id, sizeof(int), info);
1708       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n",
1709             info->storagenode_id,
1710             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : "");
1711     }
1712   }
1713   PQclear(d->res);
1714  bad_row:
1715   free_params(d);
1716
1717   release_conn_q(cq);
1718   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded);
1719   return loaded;
1720 }
1721 void
1722 stratcon_datastore_init() {
1723   pthread_mutex_init(&ds_conns_lock, NULL);
1724   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL);
1725   ds_err = noit_log_stream_find("error/datastore");
1726   ds_deb = noit_log_stream_find("debug/datastore");
1727   ds_pool_deb = noit_log_stream_find("debug/datastore_pool");
1728   ingest_err = noit_log_stream_find("error/ingest");
1729   if(!ds_err) ds_err = noit_error;
1730   if(!ingest_err) ingest_err = noit_error;
1731   if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path",
1732                            &basejpath)) {
1733     noitL(noit_error, "/stratcon/database/journal/path is unspecified\n");
1734     exit(-1);
1735   }
1736   stratcon_datastore_ingest_all_check_info();
1737   stratcon_datastore_ingest_all_storagenode_info();
1738   stratcon_datastore_sweep_journals();
1739 }
Note: See TracBrowser for help on using the browser.