Changeset 5f816fc68a68eaeb73588d831044fdaadbafe5e9

Show
Ignore:
Timestamp:
03/22/11 15:07:59 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1300806479 +0000
git-parent:

[3bcfcdb7b94d3386584d6422fa42ff6d5b0a913a]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1300806479 +0000
Message:

fixes #358

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/Makefile.in

    r2527e31 r5f816fc  
    1010RANLIB=@RANLIB@ 
    1111LIBS=@LIBS@ 
    12 PGLIBS=@PGLIBS@ 
    1312INSTALL=@INSTALL@ 
    1413XML2H=@top_srcdir@/buildtools/xml2h 
     
    166165                -Lnoitedit -lnoitedit \ 
    167166                $(NOWHOLE_ARCHIVE) \ 
    168                 $(LIBS) $(PGLIBS) 
     167                $(LIBS) 
    169168 
    170169stratcon_datastore.o:   stratcon_datastore.c 
  • src/modules/Makefile.in

    rc59d0c2 r5f816fc  
    3030        @BUILD_MODULES@ 
    3131 
    32 SMODULES=stomp_driver.@MODULEEXT@ 
     32SMODULES=stomp_driver.@MODULEEXT@ postgres_ingestor.@MODULEEXT@ 
    3333 
    3434all:    $(MODULES) $(SMODULES) test_abort.@MODULEEXT@ 
     
    6464 
    6565postgres.lo:    postgres.c postgres.xmlh 
     66        @$(CC) $(CPPFLAGS) $(SHCFLAGS) $(PGCFLAGS) -c $< -o $@ 
     67        @echo "- compiling $<" 
     68 
     69postgres_ingestor.@MODULEEXT@:  postgres_ingestor.lo 
     70        @$(MODULELD) $(LDFLAGS) -o $@ postgres_ingestor.lo $(PGLIBS) -lz -lssl -lcrypto 
     71        @echo "- linking $@" 
     72 
     73postgres_ingestor.lo: postgres_ingestor.c 
    6674        @$(CC) $(CPPFLAGS) $(SHCFLAGS) $(PGCFLAGS) -c $< -o $@ 
    6775        @echo "- compiling $<" 
  • src/stratcon.conf.in

    r8c8aea6 r5f816fc  
    1717  <modules directory="%modulesdir%"> 
    1818    <generic image="stomp_driver" name="stomp_driver"/> 
     19    <generic image="postgres_ingestor" name="postgres_ingestor"/> 
    1920  </modules> 
    2021 
  • src/stratcon_datastore.c

    r3bcfcdb r5f816fc  
    5151#include <arpa/inet.h> 
    5252#include <sys/mman.h> 
    53 #include <libpq-fe.h> 
    5453#include <zlib.h> 
    5554#include <assert.h> 
    5655#include <errno.h> 
    57  
    58 #define DECL_STMT(codename,confname) \ 
    59 static char *codename = NULL; \ 
    60 static const char *codename##_conf = "/stratcon/database/statements/" #confname 
    61  
    62 DECL_STMT(storage_post_connect, storagepostconnect); 
    63 DECL_STMT(metanode_post_connect, metanodepostconnect); 
    64 DECL_STMT(find_storage, findstoragenode); 
    65 DECL_STMT(all_storage, allstoragenodes); 
    66 DECL_STMT(check_map, mapchecktostoragenode); 
    67 DECL_STMT(check_mapall, mapallchecks); 
    68 DECL_STMT(check_loadall, allchecks); 
    69 DECL_STMT(check_find, findcheck); 
    70 DECL_STMT(check_insert, check); 
    71 DECL_STMT(status_insert, status); 
    72 DECL_STMT(metric_insert_numeric, metric_numeric); 
    73 DECL_STMT(metric_insert_text, metric_text); 
    74 DECL_STMT(config_insert, config); 
    75 DECL_STMT(config_get, findconfig); 
    7656 
    7757static noit_log_stream_t ds_err = NULL; 
     
    7959static noit_log_stream_t ds_pool_deb = NULL; 
    8060static noit_log_stream_t ingest_err = NULL; 
    81  
     61static char *basejpath = NULL; 
     62 
     63static ingestor_api_t *ingestor = NULL; 
    8264static int ds_system_enabled = 1; 
    8365int stratcon_datastore_get_enabled() { return ds_system_enabled; } 
    8466void stratcon_datastore_set_enabled(int n) { ds_system_enabled = n; } 
     67int stratcon_datastore_set_ingestor(ingestor_api_t *ni) { 
     68  if(ingestor) return -1; 
     69  ingestor = ni; 
     70  return 0; 
     71} 
    8572 
    8673static struct datastore_onlooker_list { 
     
    9077} *onlookers = NULL; 
    9178 
    92 #define GET_QUERY(a) do { \ 
    93   if(a == NULL) \ 
    94     if(!noit_conf_get_string(NULL, a ## _conf, &(a))) \ 
    95       goto bad_row; \ 
    96 } while(0) 
    97  
    98 struct conn_q; 
    99  
    100 typedef struct { 
    101   char            *queue_name; /* the key fqdn+remote_sn */ 
    102   eventer_jobq_t  *jobq; 
    103   struct conn_q   *head; 
    104   pthread_mutex_t  lock; 
    105   pthread_cond_t   cv; 
    106   int              ttl; 
    107   int              in_pool; 
    108   int              outstanding; 
    109   int              max_allocated; 
    110   int              max_in_pool; 
    111 } conn_pool; 
    112 typedef struct conn_q { 
    113   time_t           last_use; 
    114   char            *dsn;        /* Pg connect string */ 
    115   char            *remote_str; /* the IP of the noit*/ 
    116   char            *remote_cn;  /* the Cert CN of the noit */ 
    117   char            *fqdn;       /* the fqdn of the storage node */ 
    118   conn_pool       *pool; 
    119   struct conn_q   *next; 
    120   /* Postgres specific stuff */ 
    121   PGconn          *dbh; 
    122 } conn_q; 
    123  
    124  
    125 #define MAX_PARAMS 8 
    126 #define POSTGRES_PARTS \ 
    127   PGresult *res; \ 
    128   int rv; \ 
    129   time_t whence; \ 
    130   int nparams; \ 
    131   int metric_type; \ 
    132   char *paramValues[MAX_PARAMS]; \ 
    133   int paramLengths[MAX_PARAMS]; \ 
    134   int paramFormats[MAX_PARAMS]; \ 
    135   int paramAllocd[MAX_PARAMS]; 
    136  
    137 typedef struct ds_single_detail { 
    138   POSTGRES_PARTS 
    139 } ds_single_detail; 
    140 typedef struct { 
    141   /* Postgres specific stuff */ 
    142   POSTGRES_PARTS 
    143   struct realtime_tracker *rt; 
    144   conn_q *cq; /* connection on which to perform this job */ 
    145   eventer_t completion_event; /* This event should be registered if non NULL */ 
    146 } ds_rt_detail; 
    147 typedef struct ds_line_detail { 
    148   /* Postgres specific stuff */ 
    149   POSTGRES_PARTS 
    150   char *data; 
    151   int problematic; 
    152   struct ds_line_detail *next; 
    153 } ds_line_detail; 
    154  
    15579typedef struct { 
    15680  noit_hash_table *ws; 
     
    15882} syncset_t; 
    15983 
    160 typedef struct { 
    161   char *remote_str; 
    162   char *remote_cn; 
    163   char *fqdn; 
    164   int storagenode_id; 
    165   int fd; 
    166   char *filename; 
    167   conn_pool *cpool; 
    168 } interim_journal_t; 
    169  
    170 static int stratcon_database_connect(conn_q *cq); 
    171 static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn); 
    172 static int storage_node_quick_lookup(const char *uuid_str, 
    173                                      const char *remote_cn, 
    174                                      int *sid_out, int *storagenode_id_out, 
    175                                      const char **remote_cn_out, 
    176                                      const char **fqdn_out, 
    177                                      const char **dsn_out); 
     84noit_hash_table working_sets; 
    17885 
    17986static void 
    180 free_params(ds_single_detail *d) { 
    181   int i; 
    182   for(i=0; i<d->nparams; i++) 
    183     if(d->paramAllocd[i] && d->paramValues[i]) 
    184       free(d->paramValues[i]); 
    185 
    186  
    187 char *basejpath = NULL; 
    188 pthread_mutex_t ds_conns_lock; 
    189 noit_hash_table ds_conns; 
    190 noit_hash_table working_sets; 
    191  
    192 /* the fqdn cache needs to be thread safe */ 
    193 typedef struct { 
    194   char *uuid_str; 
    195   char *remote_cn; 
    196   int storagenode_id; 
    197   int sid; 
    198 } uuid_info; 
    199 typedef struct { 
    200   int storagenode_id; 
    201   char *fqdn; 
    202   char *dsn; 
    203 } storagenode_info; 
    204 noit_hash_table uuid_to_info_cache; 
    205 pthread_mutex_t storagenode_to_info_cache_lock; 
    206 noit_hash_table storagenode_to_info_cache; 
    207  
    208 /* Thread-safe connection pools */ 
    209  
    210 /* Forcefree -> 1 prevents it from going to the pool and it gets freed */ 
    211 static void 
    212 release_conn_q_forceable(conn_q *cq, int forcefree) { 
    213   int putback = 0; 
    214   cq->last_use = time(NULL); 
    215   pthread_mutex_lock(&cq->pool->lock); 
    216   cq->pool->outstanding--; 
    217   if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) { 
    218     putback = 1; 
    219     cq->next = cq->pool->head; 
    220     cq->pool->head = cq; 
    221     cq->pool->in_pool++; 
    222   } 
    223   pthread_mutex_unlock(&cq->pool->lock); 
    224   noitL(ds_pool_deb, "[%p] release %s [%s]\n", (void *)pthread_self(), 
    225         putback ? "to pool" : "and destroy", cq->pool->queue_name); 
    226   pthread_cond_signal(&cq->pool->cv); 
    227   if(putback) return; 
    228  
    229   /* Not put back, release it */ 
    230   if(cq->dbh) PQfinish(cq->dbh); 
    231   if(cq->remote_str) free(cq->remote_str); 
    232   if(cq->remote_cn) free(cq->remote_cn); 
    233   if(cq->fqdn) free(cq->fqdn); 
    234   if(cq->dsn) free(cq->dsn); 
    235   free(cq); 
    236 
    237 static void 
    238 ttl_purge_conn_pool(conn_pool *pool) { 
    239   int old_cnt, new_cnt; 
    240   time_t now = time(NULL); 
    241   conn_q *cq, *prev = NULL, *iter; 
    242   /* because we always replace on the head and update the last_use time when 
    243      doing so, we know they are ordered LRU on the end.  So, once we hit an 
    244      old one, we know all the others are old too. 
    245    */ 
    246   if(!pool->head) return; /* hack short circuit for no locks */ 
    247   pthread_mutex_lock(&pool->lock); 
    248   old_cnt = pool->in_pool; 
    249   cq = pool->head; 
    250   while(cq) { 
    251     if(cq->last_use + cq->pool->ttl < now) { 
    252       if(prev) prev->next = NULL; 
    253       else pool->head = NULL; 
    254       break; 
    255     } 
    256     prev = cq; 
    257     cq = cq->next; 
    258   } 
    259   /* Now pool->head is a chain of unexpired and cq is a chain of expired */ 
    260   /* Fix accounting */ 
    261   for(iter=cq; iter; iter=iter->next) pool->in_pool--; 
    262   new_cnt = pool->in_pool; 
    263   pthread_mutex_unlock(&pool->lock); 
    264  
    265   /* Force release these without holding the lock */ 
    266   while(cq) { 
    267     cq = cq->next; 
    268     release_conn_q_forceable(cq, 1); 
    269   } 
    270   if(old_cnt != new_cnt) 
    271     noitL(ds_pool_deb, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt, 
    272           pool->queue_name); 
    273 
    274 static void 
    275 release_conn_q(conn_q *cq) { 
    276   ttl_purge_conn_pool(cq->pool); 
    277   release_conn_q_forceable(cq, 0); 
    278 
    279 static conn_pool * 
    280 get_conn_pool_for_remote(const char *remote_str, 
    281                          const char *remote_cn, const char *fqdn) { 
    282   void *vcpool; 
    283   conn_pool *cpool = NULL; 
    284   char queue_name[256] = "datastore_"; 
    285   snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s", 
    286            (remote_str && *remote_str) ? remote_str : "0.0.0.0", 
    287            fqdn ? fqdn : "default", 
    288            remote_cn ? remote_cn : "default"); 
    289   pthread_mutex_lock(&ds_conns_lock); 
    290   if(noit_hash_retrieve(&ds_conns, (const char *)queue_name, 
    291                         strlen(queue_name), &vcpool)) 
    292     cpool = vcpool; 
    293   pthread_mutex_unlock(&ds_conns_lock); 
    294   if(!cpool) { 
    295     vcpool = cpool = calloc(1, sizeof(*cpool)); 
    296     cpool->queue_name = strdup(queue_name); 
    297     pthread_mutex_init(&cpool->lock, NULL); 
    298     pthread_cond_init(&cpool->cv, NULL); 
    299     cpool->in_pool = 0; 
    300     cpool->outstanding = 0; 
    301     cpool->max_in_pool = 1; 
    302     cpool->max_allocated = 1; 
    303     pthread_mutex_lock(&ds_conns_lock); 
    304     if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name), 
    305                         cpool)) { 
    306       noit_hash_retrieve(&ds_conns, (const char *)queue_name, 
    307                          strlen(queue_name), &vcpool); 
    308     } 
    309     pthread_mutex_unlock(&ds_conns_lock); 
    310     if(vcpool != cpool) { 
    311       /* someone beat us to it */ 
    312       free(cpool->queue_name); 
    313       pthread_mutex_destroy(&cpool->lock); 
    314       pthread_cond_destroy(&cpool->cv); 
    315       free(cpool); 
    316     } 
    317     else { 
    318       int i; 
    319       /* Our job to setup the pool */ 
    320       cpool->jobq = calloc(1, sizeof(*cpool->jobq)); 
    321       eventer_jobq_init(cpool->jobq, queue_name); 
    322       cpool->jobq->backq = eventer_default_backq(); 
    323       /* Add one thread */ 
    324       for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++) 
    325         eventer_jobq_increase_concurrency(cpool->jobq); 
    326     } 
    327     cpool = vcpool; 
    328   } 
    329   return cpool; 
    330 
    331 static conn_q * 
    332 get_conn_q_for_remote(const char *remote_str, 
    333                       const char *remote_cn, const char *fqdn, 
    334                       const char *dsn) { 
    335   conn_pool *cpool; 
    336   conn_q *cq; 
    337   cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn); 
    338   noitL(ds_pool_deb, "[%p] requesting [%s]\n", (void *)pthread_self(), 
    339         cpool->queue_name); 
    340   pthread_mutex_lock(&cpool->lock); 
    341  again: 
    342   if(cpool->head) { 
    343     assert(cpool->in_pool > 0); 
    344     cq = cpool->head; 
    345     cpool->head = cq->next; 
    346     cpool->in_pool--; 
    347     cpool->outstanding++; 
    348     cq->next = NULL; 
    349     pthread_mutex_unlock(&cpool->lock); 
    350     return cq; 
    351   } 
    352   if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) { 
    353     noitL(ds_pool_deb, "[%p] over-subscribed, waiting [%s]\n", 
    354           (void *)pthread_self(), cpool->queue_name); 
    355     pthread_cond_wait(&cpool->cv, &cpool->lock); 
    356     noitL(ds_pool_deb, "[%p] waking up and trying again [%s]\n", 
    357           (void *)pthread_self(), cpool->queue_name); 
    358     goto again; 
    359   } 
    360   else { 
    361     cpool->outstanding++; 
    362     pthread_mutex_unlock(&cpool->lock); 
    363   } 
    364   
    365   cq = calloc(1, sizeof(*cq)); 
    366   cq->pool = cpool; 
    367   cq->remote_str = remote_str ? strdup(remote_str) : NULL; 
    368   cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL; 
    369   cq->fqdn = fqdn ? strdup(fqdn) : NULL; 
    370   cq->dsn = dsn ? strdup(dsn) : NULL; 
    371   return cq; 
    372 
    373 static conn_q * 
    374 get_conn_q_for_metanode() { 
    375   return get_conn_q_for_remote(NULL,NULL,NULL,NULL); 
    376 
    377  
    378 typedef enum { 
    379   DS_EXEC_SUCCESS = 0, 
    380   DS_EXEC_ROW_FAILED = 1, 
    381   DS_EXEC_TXN_FAILED = 2, 
    382 } execute_outcome_t; 
    383  
    384 #define DECLARE_PARAM_STR(str, len) do { \ 
    385   d->paramValues[d->nparams] = noit__strndup(str, len); \ 
    386   d->paramLengths[d->nparams] = len; \ 
    387   d->paramFormats[d->nparams] = 0; \ 
    388   d->paramAllocd[d->nparams] = 1; \ 
    389   if(!strcmp(d->paramValues[d->nparams], "[[null]]")) { \ 
    390     free(d->paramValues[d->nparams]); \ 
    391     d->paramValues[d->nparams] = NULL; \ 
    392     d->paramLengths[d->nparams] = 0; \ 
    393     d->paramAllocd[d->nparams] = 0; \ 
    394   } \ 
    395   d->nparams++; \ 
    396 } while(0) 
    397 #define DECLARE_PARAM_INT(i) do { \ 
    398   int buffer__len; \ 
    399   char buffer__[32]; \ 
    400   snprintf(buffer__, sizeof(buffer__), "%d", (i)); \ 
    401   buffer__len = strlen(buffer__); \ 
    402   DECLARE_PARAM_STR(buffer__, buffer__len); \ 
    403 } while(0) 
    404  
    405 #define PG_GET_STR_COL(dest, row, name) do { \ 
    406   int colnum = PQfnumber(d->res, name); \ 
    407   dest = NULL; \ 
    408   if (colnum >= 0) \ 
    409     dest = PQgetisnull(d->res, row, colnum) \ 
    410          ? NULL : PQgetvalue(d->res, row, colnum); \ 
    411 } while(0) 
    412  
    413 #define PG_EXEC(cmd) do { \ 
    414   d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ 
    415                         (const char * const *)d->paramValues, \ 
    416                         d->paramLengths, d->paramFormats, 0); \ 
    417   d->rv = PQresultStatus(d->res); \ 
    418   if(d->rv != PGRES_COMMAND_OK && \ 
    419      d->rv != PGRES_TUPLES_OK) { \ 
    420     const char *pgerr = PQresultErrorMessage(d->res); \ 
    421     const char *pgerr_end = strchr(pgerr, '\n'); \ 
    422     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \ 
    423     noitL(ds_err, "[%s] stratcon_datasource.c:%d bad (%d): %.*s\n", \ 
    424           cq->fqdn ? cq->fqdn : "metanode", __LINE__, d->rv, \ 
    425           (int)(pgerr_end - pgerr), pgerr); \ 
    426     PQclear(d->res); \ 
    427     goto bad_row; \ 
    428   } \ 
    429 } while(0) 
    430  
    431 #define PG_TM_EXEC(cmd, whence) do { \ 
    432   time_t __w = whence; \ 
    433   char cmdbuf[4096]; \ 
    434   struct tm tbuf, *tm; \ 
    435   tm = gmtime_r(&__w, &tbuf); \ 
    436   strftime(cmdbuf, sizeof(cmdbuf), cmd, tm); \ 
    437   d->res = PQexecParams(cq->dbh, cmdbuf, d->nparams, NULL, \ 
    438                         (const char * const *)d->paramValues, \ 
    439                         d->paramLengths, d->paramFormats, 0); \ 
    440   d->rv = PQresultStatus(d->res); \ 
    441   if(d->rv != PGRES_COMMAND_OK && \ 
    442      d->rv != PGRES_TUPLES_OK) { \ 
    443     const char *pgerr = PQresultErrorMessage(d->res); \ 
    444     const char *pgerr_end = strchr(pgerr, '\n'); \ 
    445     if(!pgerr_end) pgerr_end = pgerr + strlen(pgerr); \ 
    446     noitL(ds_err, "stratcon_datasource.c:%d bad (%d): %.*s time: %llu\n", \ 
    447           __LINE__, d->rv, (int)(pgerr_end - pgerr), pgerr, \ 
    448           (long long unsigned)whence); \ 
    449     PQclear(d->res); \ 
    450     goto bad_row; \ 
    451   } \ 
    452 } while(0) 
    453  
    454 static void * 
    455 stratcon_datastore_check_loadall(void *vsn) { 
    456   storagenode_info *sn = vsn; 
    457   ds_single_detail *d; 
    458   int i, row_count = 0, good = 0; 
    459   char buff[1024]; 
    460   conn_q *cq = NULL; 
    461  
    462   d = calloc(1, sizeof(*d)); 
    463   GET_QUERY(check_loadall); 
    464   cq = get_conn_q_for_remote(NULL,NULL,sn->fqdn,sn->dsn); 
    465   i = 0; 
    466   while(stratcon_database_connect(cq)) { 
    467     if(i++ > 4) { 
    468       noitL(noit_error, "giving up on storage node: %s\n", sn->fqdn); 
    469       release_conn_q(cq); 
    470       return (void *)(vpsized_int)good; 
    471     } 
    472     sleep(1); 
    473   } 
    474   PG_EXEC(check_loadall); 
    475   row_count = PQntuples(d->res); 
    476    
    477   for(i=0; i<row_count; i++) { 
    478     int rv; 
    479     int8_t family; 
    480     struct sockaddr *sin; 
    481     struct sockaddr_in sin4 = { .sin_family = AF_INET }; 
    482     struct sockaddr_in6 sin6 = { .sin6_family = AF_INET6 }; 
    483     char *remote, *id, *target, *module, *name; 
    484     PG_GET_STR_COL(remote, i, "remote_address"); 
    485     PG_GET_STR_COL(id, i, "id"); 
    486     PG_GET_STR_COL(target, i, "target"); 
    487     PG_GET_STR_COL(module, i, "module"); 
    488     PG_GET_STR_COL(name, i, "name"); 
    489     snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name); 
    490  
    491     family = AF_INET; 
    492     sin = (struct sockaddr *)&sin4; 
    493     rv = inet_pton(family, remote, &sin4.sin_addr); 
    494     if(rv != 1) { 
    495       family = AF_INET6; 
    496       sin = (struct sockaddr *)&sin6; 
    497       rv = inet_pton(family, remote, &sin6.sin6_addr); 
    498       if(rv != 1) { 
    499         noitL(noit_stderr, "Cannot translate '%s' to IP\n", remote); 
    500         sin = NULL; 
    501       } 
    502     } 
    503  
    504     /* stratcon_iep_line_processor takes an allocated operand and frees it */ 
    505     stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL); 
    506     good++; 
    507   } 
    508   noitL(noit_error, "Staged %d/%d remembered checks from %s into IEP\n", 
    509         good, row_count, sn->fqdn); 
    510  bad_row: 
    511   free_params((ds_single_detail *)d); 
    512   free(d); 
    513   if(cq) release_conn_q(cq); 
    514   return (void *)(vpsized_int)good; 
    515 
    516 static int 
    517 stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure, 
    518                                     struct timeval *now) { 
    519   storagenode_info self = { 0, NULL, NULL }, **sns = NULL; 
    520   pthread_t *jobs = NULL; 
    521   int nodes, i = 0, tcnt = 0; 
    522   if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    523   if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
    524  
    525   pthread_mutex_lock(&storagenode_to_info_cache_lock); 
    526   nodes = storagenode_to_info_cache.size; 
    527   jobs = calloc(MAX(1,nodes), sizeof(*jobs)); 
    528   sns = calloc(MAX(1,nodes), sizeof(*sns)); 
    529   if(nodes == 0) sns[nodes++] = &self; 
    530   else { 
    531     noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
    532     const char *k; 
    533     void *v; 
    534     int klen; 
    535     while(noit_hash_next(&storagenode_to_info_cache, 
    536                          &iter, &k, &klen, &v)) { 
    537       sns[i++] = (storagenode_info *)v; 
    538     } 
    539   } 
    540   pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
    541  
    542   for(i=0; i<nodes; i++) { 
    543     if(pthread_create(&jobs[i], NULL, 
    544                       stratcon_datastore_check_loadall, sns[i]) != 0) { 
    545       noitL(noit_error, "Failed to spawn thread: %s\n", strerror(errno)); 
    546     } 
    547   } 
    548   for(i=0; i<nodes; i++) { 
    549     void *good; 
    550     pthread_join(jobs[i], &good); 
    551     tcnt += (int)(vpsized_int)good; 
    552   } 
    553   noitL(noit_error, "Loaded all %d check states.\n", tcnt); 
    554   return 0; 
    555 
    556 void 
    557 stratcon_datastore_iep_check_preload() { 
    558   eventer_t e; 
    559   conn_pool *cpool; 
    560  
    561   cpool = get_conn_pool_for_remote(NULL,NULL,NULL); 
    562   e = eventer_alloc(); 
    563   e->mask = EVENTER_ASYNCH; 
    564   e->callback = stratcon_datastore_asynch_drive_iep; 
    565   e->closure = NULL; 
    566   eventer_add_asynch(cpool->jobq, e); 
    567 
    568 execute_outcome_t 
    569 stratcon_datastore_find(ds_rt_detail *d) { 
    570   conn_q *cq; 
    571   char *val; 
    572   int row_count; 
    573   struct realtime_tracker *node; 
    574  
    575   for(node = d->rt; node; node = node->next) { 
    576     char uuid_str[UUID_STR_LEN+1]; 
    577     const char *fqdn, *dsn, *remote_cn; 
    578     char remote_ip[32]; 
    579     int storagenode_id; 
    580  
    581     uuid_unparse_lower(node->checkid, uuid_str); 
    582     if(storage_node_quick_lookup(uuid_str, NULL, &node->sid, 
    583                                  &storagenode_id, &remote_cn, &fqdn, &dsn)) 
    584       continue; 
    585  
    586     noitL(noit_debug, "stratcon_datastore_find <- (%d, %s) @ %s\n", 
    587           node->sid, remote_cn ? remote_cn : "(null)", dsn ? dsn : "(null)"); 
    588  
    589     /* We might be able to find the IP from our config if someone has 
    590      * specified the expected cn in the noit definition. 
    591      */ 
    592     if(stratcon_find_noit_ip_by_cn(remote_cn, 
    593                                    remote_ip, sizeof(remote_ip)) == 0) { 
    594       node->noit = strdup(remote_ip); 
    595       noitL(noit_debug, "lookup(cache): %s -> %s\n", remote_cn, node->noit); 
    596       continue; 
    597     } 
    598  
    599     cq = get_conn_q_for_remote(NULL, remote_cn, fqdn, dsn); 
    600     stratcon_database_connect(cq); 
    601  
    602     GET_QUERY(check_find); 
    603     DECLARE_PARAM_INT(node->sid); 
    604     PG_EXEC(check_find); 
    605     row_count = PQntuples(d->res); 
    606     if(row_count != 1) { 
    607       noitL(noit_debug, "lookup (sid:%d): NOT THERE!\n", node->sid); 
    608       PQclear(d->res); 
    609       goto bad_row; 
    610     } 
    611  
    612     /* Get the remote_address (which noit owns this) */ 
    613     PG_GET_STR_COL(val, 0, "remote_address"); 
    614     if(!val) { 
    615       noitL(noit_debug, "lookup: %s -> NOT THERE!\n", remote_cn); 
    616       PQclear(d->res); 
    617       goto bad_row; 
    618     } 
    619     node->noit = strdup(val); 
    620     noitL(noit_debug, "lookup: %s -> %s\n", remote_cn, node->noit); 
    621    bad_row:  
    622     free_params((ds_single_detail *)d); 
    623     d->nparams = 0; 
    624     release_conn_q(cq); 
    625   } 
    626   return DS_EXEC_SUCCESS; 
    627 
    628 execute_outcome_t 
    629 stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn, 
    630                            ds_line_detail *d) { 
    631   int type, len, sid; 
    632   char *final_buff; 
    633   uLong final_len, actual_final_len; 
    634   char *token; 
    635   char raddr_blank[1] = ""; 
    636   const char *raddr; 
    637  
    638   type = d->data[0]; 
    639   raddr = r ? r : raddr_blank; 
    640  
    641   /* Parse the log line, but only if we haven't already */ 
    642   if(!d->nparams) { 
    643     char *scp, *ecp; 
    644  
    645     scp = d->data; 
    646 #define PROCESS_NEXT_FIELD(t,l) do { \ 
    647   if(!*scp) goto bad_row; \ 
    648   ecp = strchr(scp, '\t'); \ 
    649   if(!ecp) goto bad_row; \ 
    650   token = scp; \ 
    651   len = (ecp-scp); \ 
    652   scp = ecp + 1; \ 
    653 } while(0) 
    654 #define PROCESS_LAST_FIELD(t,l) do { \ 
    655   if(!*scp) ecp = scp; \ 
    656   else { \ 
    657     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \ 
    658     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \ 
    659   } \ 
    660   t = scp; \ 
    661   l = (ecp-scp); \ 
    662 } while(0) 
    663  
    664     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */ 
    665     switch(type) { 
    666       /* See noit_check_log.c for log description */ 
    667       case 'n': 
    668         DECLARE_PARAM_STR(raddr, strlen(raddr)); 
    669         DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 
    670         DECLARE_PARAM_STR("noitd",5); /* node_type */ 
    671         PROCESS_NEXT_FIELD(token,len); 
    672         d->whence = (time_t)strtoul(token, NULL, 10); 
    673         DECLARE_PARAM_STR(token,len); /* timestamp */ 
    674  
    675         /* This is the expected uncompressed len */ 
    676         PROCESS_NEXT_FIELD(token,len); 
    677         final_len = atoi(token); 
    678         final_buff = malloc(final_len); 
    679         if(!final_buff) goto bad_row; 
    680    
    681         /* The last token is b64 endoded and compressed. 
    682          * we need to decode it, declare it and then free it. 
    683          */ 
    684         PROCESS_LAST_FIELD(token, len); 
    685         /* We can in-place decode this */ 
    686         len = noit_b64_decode((char *)token, len, 
    687                               (unsigned char *)token, len); 
    688         if(len <= 0) { 
    689           noitL(noit_error, "noitd config base64 decoding error.\n"); 
    690           free(final_buff); 
    691           goto bad_row; 
    692         } 
    693         actual_final_len = final_len; 
    694         if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len, 
    695                               (unsigned char *)token, len)) { 
    696           noitL(noit_error, "noitd config decompression failure.\n"); 
    697           free(final_buff); 
    698           goto bad_row; 
    699         } 
    700         if(final_len != actual_final_len) { 
    701           noitL(noit_error, "noitd config decompression error.\n"); 
    702           free(final_buff); 
    703           goto bad_row; 
    704         } 
    705         DECLARE_PARAM_STR(final_buff, final_len); 
    706         free(final_buff); 
    707         break; 
    708       case 'C': 
    709         DECLARE_PARAM_STR(raddr, strlen(raddr)); 
    710         PROCESS_NEXT_FIELD(token,len); 
    711         DECLARE_PARAM_STR(token,len); /* timestamp */ 
    712         d->whence = (time_t)strtoul(token, NULL, 10); 
    713         PROCESS_NEXT_FIELD(token, len); 
    714         /* uuid is last 36 bytes */ 
    715         if(len > 36) { token += (len-36); len = 36; } 
    716         sid = uuid_to_sid(token, remote_cn); 
    717         if(sid == 0) goto bad_row; 
    718         DECLARE_PARAM_INT(sid); /* sid */ 
    719         DECLARE_PARAM_STR(token,len); /* uuid */ 
    720         PROCESS_NEXT_FIELD(token, len); 
    721         DECLARE_PARAM_STR(token,len); /* target */ 
    722         PROCESS_NEXT_FIELD(token, len); 
    723         DECLARE_PARAM_STR(token,len); /* module */ 
    724         PROCESS_LAST_FIELD(token, len); 
    725         DECLARE_PARAM_STR(token,len); /* name */ 
    726         break; 
    727       case 'M': 
    728         PROCESS_NEXT_FIELD(token,len); 
    729         DECLARE_PARAM_STR(token,len); /* timestamp */ 
    730         d->whence = (time_t)strtoul(token, NULL, 10); 
    731         PROCESS_NEXT_FIELD(token, len); 
    732         /* uuid is last 36 bytes */ 
    733         if(len > 36) { token += (len-36); len = 36; } 
    734         sid = uuid_to_sid(token, remote_cn); 
    735         if(sid == 0) goto bad_row; 
    736         DECLARE_PARAM_INT(sid); /* sid */ 
    737         PROCESS_NEXT_FIELD(token, len); 
    738         DECLARE_PARAM_STR(token,len); /* name */ 
    739         PROCESS_NEXT_FIELD(token,len); 
    740         d->metric_type = *token; 
    741         PROCESS_LAST_FIELD(token,len); 
    742         DECLARE_PARAM_STR(token,len); /* value */ 
    743         break; 
    744       case 'S': 
    745         PROCESS_NEXT_FIELD(token,len); 
    746         DECLARE_PARAM_STR(token,len); /* timestamp */ 
    747         d->whence = (time_t)strtoul(token, NULL, 10); 
    748         PROCESS_NEXT_FIELD(token, len); 
    749         /* uuid is last 36 bytes */ 
    750         if(len > 36) { token += (len-36); len = 36; } 
    751         sid = uuid_to_sid(token, remote_cn); 
    752         if(sid == 0) goto bad_row; 
    753         DECLARE_PARAM_INT(sid); /* sid */ 
    754         PROCESS_NEXT_FIELD(token, len); 
    755         DECLARE_PARAM_STR(token,len); /* state */ 
    756         PROCESS_NEXT_FIELD(token, len); 
    757         DECLARE_PARAM_STR(token,len); /* availability */ 
    758         PROCESS_NEXT_FIELD(token, len); 
    759         DECLARE_PARAM_STR(token,len); /* duration */ 
    760         PROCESS_LAST_FIELD(token,len); 
    761         DECLARE_PARAM_STR(token,len); /* status */ 
    762         break; 
    763       default: 
    764         goto bad_row; 
    765     } 
    766  
    767   } 
    768  
    769   /* Now execute the query */ 
    770   switch(type) { 
    771     case 'n': 
    772       GET_QUERY(config_insert); 
    773       PG_EXEC(config_insert); 
    774       PQclear(d->res); 
    775       break; 
    776     case 'C': 
    777       GET_QUERY(check_insert); 
    778       PG_TM_EXEC(check_insert, d->whence); 
    779       PQclear(d->res); 
    780       break; 
    781     case 'S': 
    782       GET_QUERY(status_insert); 
    783       PG_TM_EXEC(status_insert, d->whence); 
    784       PQclear(d->res); 
    785       break; 
    786     case 'M': 
    787       switch(d->metric_type) { 
    788         case METRIC_INT32: 
    789         case METRIC_UINT32: 
    790         case METRIC_INT64: 
    791         case METRIC_UINT64: 
    792         case METRIC_DOUBLE: 
    793           GET_QUERY(metric_insert_numeric); 
    794           PG_TM_EXEC(metric_insert_numeric, d->whence); 
    795           PQclear(d->res); 
    796           break; 
    797         case METRIC_STRING: 
    798           GET_QUERY(metric_insert_text); 
    799           PG_TM_EXEC(metric_insert_text, d->whence); 
    800           PQclear(d->res); 
    801           break; 
    802         default: 
    803           goto bad_row; 
    804       } 
    805       break; 
    806     default: 
    807       /* should never get here */ 
    808       goto bad_row; 
    809   } 
    810   return DS_EXEC_SUCCESS; 
    811  bad_row: 
    812   return DS_EXEC_ROW_FAILED; 
    813 
    814 static int 
    815 stratcon_database_post_connect(conn_q *cq) { 
    816   int rv = 0; 
    817   ds_single_detail _d = { 0 }, *d = &_d; 
    818   if(cq->fqdn) { 
    819     char *remote_str, *remote_cn; 
    820     /* This is the silly way we get null's in through our declare_param_str */ 
    821     remote_str = cq->remote_str ? cq->remote_str : "[[null]]"; 
    822     remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]"; 
    823     /* This is a storage node, it gets the storage node post_connect */ 
    824     GET_QUERY(storage_post_connect); 
    825     rv = -1; /* now we're serious */ 
    826     DECLARE_PARAM_STR(remote_str, strlen(remote_str)); 
    827     DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 
    828     PG_EXEC(storage_post_connect); 
    829     PQclear(d->res); 
    830     rv = 0; 
    831   } 
    832   else { 
    833     /* Metanode post_connect */ 
    834     GET_QUERY(metanode_post_connect); 
    835     rv = -1; /* now we're serious */ 
    836     PG_EXEC(metanode_post_connect); 
    837     PQclear(d->res); 
    838     rv = 0; 
    839   } 
    840  bad_row: 
    841   free_params(d); 
    842   if(rv == -1) { 
    843     /* Post-connect intentions are serious and fatal */ 
    844     PQfinish(cq->dbh); 
    845     cq->dbh = NULL; 
    846   } 
    847   return rv; 
    848 
    849 static int 
    850 stratcon_database_connect(conn_q *cq) { 
    851   char *dsn, dsn_meta[512]; 
    852   noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
    853   const char *k, *v; 
    854   int klen; 
    855   noit_hash_table *t; 
    856  
    857   dsn_meta[0] = '\0'; 
    858   if(!cq->dsn) { 
    859     t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 
    860     while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 
    861       if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta)); 
    862       strlcat(dsn_meta, k, sizeof(dsn_meta)); 
    863       strlcat(dsn_meta, "=", sizeof(dsn_meta)); 
    864       strlcat(dsn_meta, v, sizeof(dsn_meta)); 
    865     } 
    866     noit_hash_destroy(t, free, free); 
    867     free(t); 
    868     dsn = dsn_meta; 
    869   } 
    870   else { 
    871     char options[32]; 
    872     strlcpy(dsn_meta, cq->dsn, sizeof(dsn_meta)); 
    873     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/user", 
    874                                options, sizeof(options))) { 
    875       strlcat(dsn_meta, " ", sizeof(dsn_meta)); 
    876       strlcat(dsn_meta, "user", sizeof(dsn_meta)); 
    877       strlcat(dsn_meta, "=", sizeof(dsn_meta)); 
    878       strlcat(dsn_meta, options, sizeof(dsn_meta)); 
    879     } 
    880     if(noit_conf_get_stringbuf(NULL, "/stratcon/database/dbconfig/password", 
    881                                options, sizeof(options))) { 
    882       strlcat(dsn_meta, " ", sizeof(dsn_meta)); 
    883       strlcat(dsn_meta, "password", sizeof(dsn_meta)); 
    884       strlcat(dsn_meta, "=", sizeof(dsn_meta)); 
    885       strlcat(dsn_meta, options, sizeof(dsn_meta)); 
    886     } 
    887     dsn = dsn_meta; 
    888   } 
    889  
    890   if(cq->dbh) { 
    891     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    892     PQreset(cq->dbh); 
    893     if(PQstatus(cq->dbh) != CONNECTION_OK) { 
    894       noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", 
    895             dsn, PQerrorMessage(cq->dbh)); 
    896       return -1; 
    897     } 
    898     if(stratcon_database_post_connect(cq)) return -1; 
    899     if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    900     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", 
    901           dsn, PQerrorMessage(cq->dbh)); 
    902     return -1; 
    903   } 
    904  
    905   cq->dbh = PQconnectdb(dsn); 
    906   if(!cq->dbh) return -1; 
    907   if(PQstatus(cq->dbh) != CONNECTION_OK) { 
    908     noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", 
    909           dsn, PQerrorMessage(cq->dbh)); 
    910     return -1; 
    911   } 
    912   if(stratcon_database_post_connect(cq)) return -1; 
    913   if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    914   noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", 
    915         dsn, PQerrorMessage(cq->dbh)); 
    916   return -1; 
    917 
    918 static int 
    919 stratcon_datastore_savepoint_op(conn_q *cq, const char *p, 
    920                                 const char *name) { 
    921   int rv = -1; 
    922   PGresult *res; 
    923   char cmd[128]; 
    924   strlcpy(cmd, p, sizeof(cmd)); 
    925   strlcat(cmd, name, sizeof(cmd)); 
    926   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; 
    927   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; 
    928   PQclear(res); 
    929   return rv; 
    930 
    931 static int 
    932 stratcon_datastore_do(conn_q *cq, const char *cmd) { 
    933   PGresult *res; 
    934   int rv = -1; 
    935   if((res = PQexec(cq->dbh, cmd)) == NULL) return -1; 
    936   if(PQresultStatus(res) == PGRES_COMMAND_OK) rv = 0; 
    937   PQclear(res); 
    938   return rv; 
    939 
    940 #define BUSTED(cq) do { \ 
    941   PQfinish((cq)->dbh); \ 
    942   (cq)->dbh = NULL; \ 
    943   goto full_monty; \ 
    944 } while(0) 
    945 #define SAVEPOINT(name) do { \ 
    946   if(stratcon_datastore_savepoint_op(cq, "SAVEPOINT ", name)) BUSTED(cq); \ 
    947   last_sp = current; \ 
    948 } while(0) 
    949 #define ROLLBACK_TO_SAVEPOINT(name) do { \ 
    950   if(stratcon_datastore_savepoint_op(cq, "ROLLBACK TO SAVEPOINT ", name)) \ 
    951     BUSTED(cq); \ 
    952   last_sp = NULL; \ 
    953 } while(0) 
    954 #define RELEASE_SAVEPOINT(name) do { \ 
    955   if(stratcon_datastore_savepoint_op(cq, "RELEASE SAVEPOINT ", name)) \ 
    956     BUSTED(cq); \ 
    957   last_sp = NULL; \ 
    958 } while(0) 
    959 int 
    960 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, 
    961                                  struct timeval *now) { 
    962   ds_rt_detail *dsjd = closure; 
    963   if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    964   if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
    965  
    966   assert(dsjd->rt); 
    967   stratcon_datastore_find(dsjd); 
    968   if(dsjd->completion_event) 
    969     eventer_add(dsjd->completion_event); 
    970  
    971   free_params((ds_single_detail *)dsjd); 
    972   free(dsjd); 
    973   return 0; 
    974 
    975 static const char * 
    976 get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) { 
    977   void *vinfo; 
    978   char *dsn = NULL, *fqdn = NULL; 
    979   int found = 0; 
    980   storagenode_info *info = NULL; 
    981   pthread_mutex_lock(&storagenode_to_info_cache_lock); 
    982   if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id), 
    983                         &vinfo)) { 
    984     found = 1; 
    985     info = vinfo; 
    986   } 
    987   pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
    988   if(found) { 
    989     if(fqdn_out) *fqdn_out = info->fqdn; 
    990     return info->dsn; 
    991   } 
    992  
    993   if(!found && can_use_db) { 
    994     ds_single_detail *d; 
    995     conn_q *cq; 
    996     int row_count; 
    997     /* Look it up and store it */ 
    998     d = calloc(1, sizeof(*d)); 
    999     cq = get_conn_q_for_metanode(); 
    1000     GET_QUERY(find_storage); 
    1001     DECLARE_PARAM_INT(id); 
    1002     PG_EXEC(find_storage); 
    1003     row_count = PQntuples(d->res); 
    1004     if(row_count) { 
    1005       PG_GET_STR_COL(dsn, 0, "dsn"); 
    1006       PG_GET_STR_COL(fqdn, 0, "fqdn"); 
    1007       fqdn = fqdn ? strdup(fqdn) : NULL; 
    1008       dsn = dsn ? strdup(dsn) : NULL; 
    1009     } 
    1010     PQclear(d->res); 
    1011    bad_row: 
    1012     free_params(d); 
    1013     free(d); 
    1014     release_conn_q(cq); 
    1015   } 
    1016   if(fqdn) { 
    1017     info = calloc(1, sizeof(*info)); 
    1018     info->fqdn = fqdn; 
    1019     if(fqdn_out) *fqdn_out = info->fqdn; 
    1020     info->dsn = dsn; 
    1021     info->storagenode_id = id; 
    1022     pthread_mutex_lock(&storagenode_to_info_cache_lock); 
    1023     noit_hash_store(&storagenode_to_info_cache, 
    1024                     (void *)&info->storagenode_id, sizeof(int), info); 
    1025     pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
    1026   } 
    1027   return info ? info->dsn : NULL; 
    1028 
    1029 static ds_line_detail * 
    1030 build_insert_batch(interim_journal_t *ij) { 
    1031   int rv; 
    1032   off_t len; 
    1033   const char *buff, *cp, *lcp; 
    1034   struct stat st; 
    1035   ds_line_detail *head = NULL, *last = NULL, *next = NULL; 
    1036  
    1037   if(ij->fd < 0) { 
    1038     ij->fd = open(ij->filename, O_RDONLY); 
    1039     if(ij->fd < 0) { 
    1040       noitL(noit_error, "Cannot open interim journal '%s': %s\n", 
    1041             ij->filename, strerror(errno)); 
    1042       assert(ij->fd >= 0); 
    1043     } 
    1044   } 
    1045   while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR); 
    1046   if(rv == -1) { 
    1047       noitL(noit_error, "Cannot stat interim journal '%s': %s\n", 
    1048             ij->filename, strerror(errno)); 
    1049     assert(rv != -1); 
    1050   } 
    1051   len = st.st_size; 
    1052   if(len > 0) { 
    1053     buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0); 
    1054     if(buff == (void *)-1) { 
    1055       noitL(noit_error, "mmap(%d, %d)(%s) => %s\n", (int)len, ij->fd, 
    1056             ij->filename, strerror(errno)); 
    1057       assert(buff != (void *)-1); 
    1058     } 
    1059     lcp = buff; 
    1060     while(lcp < (buff + len) && 
    1061           NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) { 
    1062       next = calloc(1, sizeof(*next)); 
    1063       next->data = malloc(cp - lcp + 1); 
    1064       memcpy(next->data, lcp, cp - lcp); 
    1065       next->data[cp - lcp] = '\0'; 
    1066       if(!head) head = next; 
    1067       if(last) last->next = next; 
    1068       last = next; 
    1069       lcp = cp + 1; 
    1070     } 
    1071     munmap((void *)buff, len); 
    1072   } 
    1073   close(ij->fd); 
    1074   return head; 
    1075 
    1076 static void 
    1077 interim_journal_remove(interim_journal_t *ij) { 
    1078   unlink(ij->filename); 
     87interim_journal_free(void *vij) { 
     88  interim_journal_t *ij = vij; 
    107989  if(ij->filename) free(ij->filename); 
    108090  if(ij->remote_str) free(ij->remote_str); 
     
    108292  if(ij->fqdn) free(ij->fqdn); 
    108393  free(ij); 
    1084 } 
    1085 int 
    1086 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, 
    1087                                   struct timeval *now) { 
    1088   int i, total, success, sp_total, sp_success; 
    1089   interim_journal_t *ij; 
    1090   ds_line_detail *head = NULL, *current, *last_sp; 
    1091   const char *dsn; 
    1092   conn_q *cq; 
    1093   if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    1094   if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
    1095  
    1096   ij = closure; 
    1097   if(ij->fqdn == NULL) { 
    1098     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn); 
    1099     if(ij->fqdn) ij->fqdn = strdup(ij->fqdn); /* fqdn is now ours */ 
    1100   } 
    1101   else { 
    1102     dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, NULL); 
    1103   } 
    1104   cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn, 
    1105                              ij->fqdn, dsn); 
    1106   noitL(ds_deb, "stratcon_datastore_asynch_execute[%s,%s,%s]\n", 
    1107         ij->remote_str, ij->remote_cn, ij->fqdn); 
    1108  full_monty: 
    1109   /* Make sure we have a connection */ 
    1110   i = 1; 
    1111   while(stratcon_database_connect(cq)) { 
    1112     noitL(noit_error, "Error connecting to database: %s\n", 
    1113           ij->fqdn ? ij->fqdn : "(null)"); 
    1114     sleep(i); 
    1115     i *= 2; 
    1116     i = MIN(i, 16); 
    1117   } 
    1118  
    1119   if(head == NULL) head = build_insert_batch(ij); 
    1120   noitL(ds_deb, "Starting batch from %s/%s to %s\n", 
    1121         ij->remote_str ? ij->remote_str : "(null)", 
    1122         ij->remote_cn ? ij->remote_cn : "(null)", 
    1123         ij->fqdn ? ij->fqdn : "(null)"); 
    1124   current = head;  
    1125   last_sp = NULL; 
    1126   total = success = sp_total = sp_success = 0; 
    1127   if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq); 
    1128   while(current) { 
    1129     execute_outcome_t rv; 
    1130     if(current->data) { 
    1131       if(!last_sp) { 
    1132         SAVEPOINT("batch"); 
    1133         sp_success = success; 
    1134         sp_total = total; 
    1135       } 
    1136   
    1137       if(current->problematic) { 
    1138         RELEASE_SAVEPOINT("batch"); 
    1139         current = current->next; 
    1140         total++; 
    1141         continue; 
    1142       }  
    1143       rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn, 
    1144                                       current); 
    1145       switch(rv) { 
    1146         case DS_EXEC_SUCCESS: 
    1147           total++; 
    1148           success++; 
    1149           current = current->next; 
    1150           break; 
    1151         case DS_EXEC_ROW_FAILED: 
    1152           /* rollback to savepoint, mark this record as bad and start again */ 
    1153           if(current->data[0] != 'n') 
    1154             noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data); 
    1155           current->problematic = 1; 
    1156           current = last_sp; 
    1157           success = sp_success; 
    1158           total = sp_total; 
    1159           ROLLBACK_TO_SAVEPOINT("batch"); 
    1160           break; 
    1161         case DS_EXEC_TXN_FAILED: 
    1162           noitL(noit_error, "txn failed '%s', retrying\n", ij->filename); 
    1163           BUSTED(cq); 
    1164       } 
    1165     } 
    1166   } 
    1167   if(last_sp) RELEASE_SAVEPOINT("batch"); 
    1168   if(stratcon_datastore_do(cq, "COMMIT")) { 
    1169     noitL(noit_error, "txn commit failed '%s', retrying\n", ij->filename); 
    1170     BUSTED(cq); 
    1171   } 
    1172   /* Cleanup the mess */ 
    1173   while(head) { 
    1174     ds_line_detail *tofree; 
    1175     tofree = head; 
    1176     head = head->next; 
    1177     if(tofree->data) free(tofree->data); 
    1178     free_params((ds_single_detail *)tofree); 
    1179     free(tofree); 
    1180   } 
    1181   noitL(ds_deb, "Finished batch %s/%s to %s [%d/%d]\n", 
    1182         ij->remote_str ? ij->remote_str : "(null)", 
    1183         ij->remote_cn ? ij->remote_cn : "(null)", 
    1184         ij->fqdn ? ij->fqdn : "(null)", success, total); 
    1185   interim_journal_remove(ij); 
    1186   release_conn_q(cq); 
    1187   return 0; 
    118894} 
    118995static int 
     
    1209115  noitL(ds_deb, "Syncing journal sets...\n"); 
    1210116  while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) { 
    1211     char tmppath[PATH_MAX]
     117    char tmppath[PATH_MAX], id_str[32]
    1212118    int suffix_idx; 
    1213     eventer_t ingest; 
    1214119    ij = vij; 
    1215120    noitL(ds_deb, "Syncing journal set [%s,%s,%s]\n", 
     
    1233138    close(ij->fd); 
    1234139    ij->fd = -1; 
    1235     ingest = eventer_alloc(); 
    1236     ingest->mask = EVENTER_ASYNCH; 
    1237     ingest->callback = stratcon_datastore_asynch_execute; 
    1238     ingest->closure = ij; 
    1239     eventer_add_asynch(ij->cpool->jobq, ingest); 
    1240   } 
    1241   noit_hash_destroy(syncset->ws, free, NULL); 
     140    snprintf(id_str, sizeof(id_str), "%d", ij->storagenode_id); 
     141    ingestor->launch_file_ingestion(ij->filename, ij->remote_str, 
     142                                    ij->remote_cn, id_str); 
     143  } 
     144  noit_hash_destroy(syncset->ws, free, interim_journal_free); 
    1242145  free(syncset->ws); 
    1243146  return 0; 
     
    1279182    ij->storagenode_id = storagenode_id; 
    1280183    ij->filename = strdup(jpath); 
    1281     ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 
    1282                                          ij->fqdn); 
    1283184    ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640); 
    1284185    if(ij->fd < 0 && errno == ENOENT) { 
     
    1306207 
    1307208  return ij; 
    1308 } 
    1309 static int 
    1310 storage_node_quick_lookup(const char *uuid_str, const char *remote_cn, 
    1311                           int *sid_out, int *storagenode_id_out, 
    1312                           const char **remote_cn_out, 
    1313                           const char **fqdn_out, const char **dsn_out) { 
    1314   /* only called from the main thread -- no safety issues */ 
    1315   void *vuuidinfo, *vinfo; 
    1316   uuid_info *uuidinfo; 
    1317   storagenode_info *info = NULL; 
    1318   char *fqdn = NULL; 
    1319   char *dsn = NULL; 
    1320   char *new_remote_cn = NULL; 
    1321   int storagenode_id = 0, sid = 0; 
    1322   if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str), 
    1323                          &vuuidinfo)) { 
    1324     int row_count = 0; 
    1325     char *tmpint; 
    1326     ds_single_detail *d; 
    1327     conn_q *cq; 
    1328  
    1329     /* We can't do a database lookup without the remote_cn */ 
    1330     if(!remote_cn) { 
    1331       if(stratcon_datastore_get_enabled()) { 
    1332         /* We have an authoritatively maintained cache, we don't do lookups */ 
    1333         return -1; 
    1334       } 
    1335       else 
    1336         remote_cn = "[[null]]"; 
    1337     } 
    1338  
    1339     d = calloc(1, sizeof(*d)); 
    1340     cq = get_conn_q_for_metanode(); 
    1341     if(stratcon_database_connect(cq) == 0) { 
    1342       /* Blocking call to service the cache miss */ 
    1343       GET_QUERY(check_map); 
    1344       DECLARE_PARAM_STR(uuid_str, strlen(uuid_str)); 
    1345       DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 
    1346       PG_EXEC(check_map); 
    1347       row_count = PQntuples(d->res); 
    1348       if(row_count != 1) { 
    1349         PQclear(d->res); 
    1350         goto bad_row; 
    1351       } 
    1352       PG_GET_STR_COL(tmpint, 0, "sid"); 
    1353       if(!tmpint) { 
    1354         row_count = 0; 
    1355         PQclear(d->res); 
    1356         goto bad_row; 
    1357       } 
    1358       sid = atoi(tmpint); 
    1359       PG_GET_STR_COL(tmpint, 0, "storage_node_id"); 
    1360       if(tmpint) storagenode_id = atoi(tmpint); 
    1361       PG_GET_STR_COL(fqdn, 0, "fqdn"); 
    1362       PG_GET_STR_COL(dsn, 0, "dsn"); 
    1363       PG_GET_STR_COL(new_remote_cn, 0, "remote_cn"); 
    1364       fqdn = fqdn ? strdup(fqdn) : NULL; 
    1365       dsn = dsn ? strdup(dsn) : NULL; 
    1366       new_remote_cn = new_remote_cn ? strdup(new_remote_cn) : NULL; 
    1367       PQclear(d->res); 
    1368     } 
    1369    bad_row: 
    1370     free_params((ds_single_detail *)d); 
    1371     free(d); 
    1372     release_conn_q(cq); 
    1373     if(row_count != 1) { 
    1374       return -1; 
    1375     } 
    1376     /* Place in cache */ 
    1377     uuidinfo = calloc(1, sizeof(*uuidinfo)); 
    1378     uuidinfo->sid = sid; 
    1379     uuidinfo->uuid_str = strdup(uuid_str); 
    1380     uuidinfo->storagenode_id = storagenode_id; 
    1381     uuidinfo->remote_cn = new_remote_cn ? strdup(new_remote_cn) : strdup(remote_cn); 
    1382     noit_hash_store(&uuid_to_info_cache, 
    1383                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 
    1384     /* Also, we may have just witnessed a new storage node, store it */ 
    1385     if(storagenode_id) { 
    1386       int needs_free = 0; 
    1387       info = calloc(1, sizeof(*info)); 
    1388       info->storagenode_id = storagenode_id; 
    1389       info->dsn = dsn ? strdup(dsn) : NULL; 
    1390       info->fqdn = fqdn ? strdup(fqdn) : NULL; 
    1391       pthread_mutex_lock(&storagenode_to_info_cache_lock); 
    1392       if(!noit_hash_retrieve(&storagenode_to_info_cache, 
    1393                              (void *)&storagenode_id, sizeof(int), &vinfo)) { 
    1394         /* hack to save memory -- we *never* remove from these caches, 
    1395            so we can use the same fqdn value in the above cache for the key 
    1396            in the cache below -- (no strdup) */ 
    1397         noit_hash_store(&storagenode_to_info_cache, 
    1398                         (void *)&info->storagenode_id, sizeof(int), info); 
    1399       } 
    1400       else needs_free = 1; 
    1401       pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
    1402       if(needs_free) { 
    1403         if(info->dsn) free(info->dsn); 
    1404         if(info->fqdn) free(info->fqdn); 
    1405         free(info); 
    1406       } 
    1407     } 
    1408   } 
    1409   else 
    1410     uuidinfo = vuuidinfo; 
    1411  
    1412   if(uuidinfo && uuidinfo->storagenode_id) { 
    1413     if((!dsn && dsn_out) || (!fqdn && fqdn_out)) { 
    1414       /* we don't have dsn and we actually want it */ 
    1415       pthread_mutex_lock(&storagenode_to_info_cache_lock); 
    1416       if(noit_hash_retrieve(&storagenode_to_info_cache, 
    1417                             (void *)&uuidinfo->storagenode_id, sizeof(int), 
    1418                             &vinfo)) 
    1419         info = vinfo; 
    1420       pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
    1421     } 
    1422   } 
    1423  
    1424   if(fqdn_out) *fqdn_out = info ? info->fqdn : NULL; 
    1425   if(dsn_out) *dsn_out = info ? info->dsn : NULL; 
    1426   assert(uuidinfo); 
    1427   if(remote_cn_out) *remote_cn_out = uuidinfo->remote_cn; 
    1428   if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id; 
    1429   if(sid_out) *sid_out = uuidinfo->sid; 
    1430   if(fqdn) free(fqdn); 
    1431   if(dsn) free(dsn); 
    1432   if(new_remote_cn) free(new_remote_cn); 
    1433   return 0; 
    1434 } 
    1435 static int 
    1436 uuid_to_sid(const char *uuid_str_in, const char *remote_cn) { 
    1437   char uuid_str[UUID_STR_LEN+1]; 
    1438   int sid = 0; 
    1439   strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str)); 
    1440   storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL, NULL); 
    1441   return sid; 
    1442209} 
    1443210static void 
     
    1460227        strlcpy(uuid_str, cp2 - UUID_STR_LEN, sizeof(uuid_str)); 
    1461228        if(!uuid_parse(uuid_str, checkid)) { 
    1462           storage_node_quick_lookup(uuid_str, remote_cn, NULL, 
    1463                                     &storagenode_id, NULL, &fqdn, &dsn); 
     229          ingestor->storage_node_lookup(uuid_str, remote_cn, NULL, 
     230                                        &storagenode_id, NULL, 
     231                                        &fqdn, &dsn); 
    1464232          ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn); 
    1465233        } 
     
    1506274                        const char *remote_cn, void *operand, 
    1507275                        eventer_t completion) { 
    1508   conn_pool *cpool; 
    1509276  syncset_t *syncset; 
    1510277  eventer_t e; 
    1511   ds_rt_detail *rtdetail
     278  struct realtime_tracker *rt
    1512279  struct datastore_onlooker_list *nnode; 
    1513280 
     
    1530297      break; 
    1531298    case DS_OP_FIND_COMPLETE: 
    1532       cpool = get_conn_pool_for_remote(NULL,NULL,NULL); 
    1533       rtdetail = calloc(1, sizeof(*rtdetail)); 
    1534       rtdetail->rt = operand; 
    1535       rtdetail->completion_event = completion; 
    1536       e = eventer_alloc(); 
    1537       e->mask = EVENTER_ASYNCH; 
    1538       e->callback = stratcon_datastore_asynch_lookup; 
    1539       e->closure = rtdetail; 
    1540       eventer_add_asynch(cpool->jobq, e); 
    1541       break; 
    1542   } 
    1543 
    1544  
    1545 int 
    1546 stratcon_datastore_saveconfig(void *unused) { 
    1547   int rv = -1; 
    1548   char *buff; 
    1549   ds_single_detail _d = { 0 }, *d = &_d; 
    1550   conn_q *cq; 
    1551   char ipv4_str[32]; 
    1552   struct in_addr r, l; 
    1553  
    1554   r.s_addr = htonl((4 << 24) || (2 << 16) || (2 << 8) || 1); 
    1555   memset(&l, 0, sizeof(l)); 
    1556   noit_getip_ipv4(r, &l); 
    1557   /* Ignore the error.. what are we going to do anyway */ 
    1558   if(inet_ntop(AF_INET, &l, ipv4_str, sizeof(ipv4_str)) == NULL) 
    1559     strlcpy(ipv4_str, "0.0.0.0", sizeof(ipv4_str)); 
    1560  
    1561   cq = get_conn_q_for_metanode(); 
    1562  
    1563   if(stratcon_database_connect(cq) == 0) { 
    1564     char time_as_str[20]; 
    1565     size_t len; 
    1566     buff = noit_conf_xml_in_mem(&len); 
    1567     if(!buff) goto bad_row; 
    1568  
    1569     snprintf(time_as_str, sizeof(time_as_str), "%lu", (long unsigned int)time(NULL)); 
    1570     DECLARE_PARAM_STR(ipv4_str, strlen(ipv4_str)); 
    1571     DECLARE_PARAM_STR("", 0); 
    1572     DECLARE_PARAM_STR("stratcond", 9); 
    1573     DECLARE_PARAM_STR(time_as_str, strlen(time_as_str)); 
    1574     DECLARE_PARAM_STR(buff, len); 
    1575     free(buff); 
    1576  
    1577     GET_QUERY(config_insert); 
    1578     PG_EXEC(config_insert); 
    1579     PQclear(d->res); 
    1580     rv = 0; 
    1581  
    1582     bad_row: 
    1583       free_params(d); 
    1584   } 
    1585   release_conn_q(cq); 
    1586   return rv; 
     299      rt = operand; 
     300      ingestor->submit_realtime_lookup(rt, completion); 
     301      break; 
     302  } 
    1587303} 
    1588304 
     
    1600316    nnode->next = onlookers; 
    1601317} 
    1602 static void 
    1603 stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn, 
    1604                                          char *id_str, char *file) { 
    1605   char path[PATH_MAX]; 
    1606   interim_journal_t *ij; 
    1607   eventer_t ingest; 
    1608  
    1609   snprintf(path, sizeof(path), "%s/%s/%s/%s/%s", 
    1610            basejpath, remote_str, remote_cn, id_str, file); 
    1611   ij = calloc(1, sizeof(*ij)); 
    1612   ij->fd = open(path, O_RDONLY); 
    1613   if(ij->fd < 0) { 
    1614     noitL(noit_error, "cannot open journal '%s': %s\n", 
    1615           path, strerror(errno)); 
    1616     free(ij); 
    1617     return; 
    1618   } 
    1619   close(ij->fd); 
    1620   ij->fd = -1; 
    1621   ij->filename = strdup(path); 
    1622   ij->remote_str = strdup(remote_str); 
    1623   ij->remote_cn = strdup(remote_cn); 
    1624   ij->storagenode_id = atoi(id_str); 
    1625   ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 
    1626                                        ij->fqdn); 
    1627   noitL(noit_error, "ingesting old payload: %s\n", ij->filename); 
    1628   ingest = eventer_alloc(); 
    1629   ingest->mask = EVENTER_ASYNCH; 
    1630   ingest->callback = stratcon_datastore_asynch_execute; 
    1631   ingest->closure = ij; 
    1632   eventer_add_asynch(ij->cpool->jobq, ingest); 
    1633 } 
    1634 static void 
    1635 stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) { 
    1636   char path[PATH_MAX]; 
    1637   DIR *root; 
    1638   struct dirent *de, *entry; 
    1639   int i = 0, cnt = 0; 
    1640   char **entries; 
    1641   int size = 0; 
    1642  
    1643   snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 
    1644            first ? "/" : "", first ? first : "", 
    1645            second ? "/" : "", second ? second : "", 
    1646            third ? "/" : "", third ? third : ""); 
    1647 #ifdef _PC_NAME_MAX 
    1648   size = pathconf(path, _PC_NAME_MAX); 
    1649 #endif 
    1650   size = MIN(size, PATH_MAX + 128); 
    1651   de = alloca(size); 
    1652   root = opendir(path); 
    1653   if(!root) return; 
    1654   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) cnt++; 
    1655   closedir(root); 
    1656   root = opendir(path); 
    1657   if(!root) return; 
    1658   entries = malloc(sizeof(*entries) * cnt); 
    1659   while(portable_readdir_r(root, de, &entry) == 0 && entry != NULL) { 
    1660     if(i < cnt) { 
    1661       entries[i++] = strdup(entry->d_name); 
    1662     } 
    1663   } 
    1664   closedir(root); 
    1665   cnt = i; /* could have changed, directories are fickle */ 
    1666   qsort(entries, i, sizeof(*entries), 
    1667         (int (*)(const void *, const void *))strcasecmp); 
    1668   for(i=0; i<cnt; i++) { 
    1669     if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 
    1670     noitL(ds_deb, "Processing L%d entry '%s'\n", 
    1671           third ? 4 : second ? 3 : first ? 2 : 1, entries[i]); 
    1672     if(!first) 
    1673       stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL); 
    1674     else if(!second) 
    1675       stratcon_datastore_sweep_journals_int(first, entries[i], NULL); 
    1676     else if(!third) 
    1677       stratcon_datastore_sweep_journals_int(first, second, entries[i]); 
    1678     else if(strlen(entries[i]) == 16) 
    1679       stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]); 
    1680   } 
    1681   for(i=0; i<cnt; i++) 
    1682     free(entries[i]); 
    1683   free(entries); 
    1684 } 
    1685 static void 
    1686 stratcon_datastore_sweep_journals() { 
    1687   stratcon_datastore_sweep_journals_int(NULL,NULL,NULL); 
    1688 } 
    1689  
    1690 int 
    1691 stratcon_datastore_ingest_all_storagenode_info() { 
    1692   int i, cnt = 0; 
    1693   ds_single_detail _d = { 0 }, *d = &_d; 
    1694   conn_q *cq; 
    1695   cq = get_conn_q_for_metanode(); 
    1696  
    1697   while(stratcon_database_connect(cq)) { 
    1698     noitL(noit_error, "Error connecting to database\n"); 
    1699     sleep(1); 
    1700   } 
    1701  
    1702   GET_QUERY(all_storage); 
    1703   PG_EXEC(all_storage); 
    1704   cnt = PQntuples(d->res); 
    1705   for(i=0; i<cnt; i++) { 
    1706     void *vinfo; 
    1707     char *tmpint, *fqdn, *dsn; 
    1708     int storagenode_id; 
    1709     PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
    1710     storagenode_id = atoi(tmpint); 
    1711     PG_GET_STR_COL(fqdn, i, "fqdn"); 
    1712     PG_GET_STR_COL(dsn, i, "dsn"); 
    1713     PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
    1714     storagenode_id = tmpint ? atoi(tmpint) : 0; 
    1715  
    1716     if(!noit_hash_retrieve(&storagenode_to_info_cache, 
    1717                            (void *)&storagenode_id, sizeof(int), &vinfo)) { 
    1718       storagenode_info *info; 
    1719       info = calloc(1, sizeof(*info)); 
    1720       info->storagenode_id = storagenode_id; 
    1721       info->fqdn = fqdn ? strdup(fqdn) : NULL; 
    1722       info->dsn = dsn ? strdup(dsn) : NULL; 
    1723       noit_hash_store(&storagenode_to_info_cache, 
    1724                       (void *)&info->storagenode_id, sizeof(int), info); 
    1725       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n", 
    1726             info->storagenode_id, 
    1727             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : ""); 
    1728     } 
    1729   } 
    1730   PQclear(d->res); 
    1731  bad_row: 
    1732   free_params(d); 
    1733  
    1734   release_conn_q(cq); 
    1735   noitL(noit_error, "Loaded %d storage nodes\n", cnt); 
    1736   return cnt; 
    1737 } 
    1738 int 
    1739 stratcon_datastore_ingest_all_check_info() { 
    1740   int i, cnt, loaded = 0; 
    1741   ds_single_detail _d = { 0 }, *d = &_d; 
    1742   conn_q *cq; 
    1743   cq = get_conn_q_for_metanode(); 
    1744  
    1745   while(stratcon_database_connect(cq)) { 
    1746     noitL(noit_error, "Error connecting to database\n"); 
    1747     sleep(1); 
    1748   } 
    1749  
    1750   GET_QUERY(check_mapall); 
    1751   PG_EXEC(check_mapall); 
    1752   cnt = PQntuples(d->res); 
    1753   for(i=0; i<cnt; i++) { 
    1754     void *vinfo; 
    1755     char *tmpint, *fqdn, *dsn, *uuid_str, *remote_cn; 
    1756     int sid, storagenode_id; 
    1757     uuid_info *uuidinfo; 
    1758     PG_GET_STR_COL(uuid_str, i, "id"); 
    1759     if(!uuid_str) continue; 
    1760     PG_GET_STR_COL(tmpint, i, "sid"); 
    1761     if(!tmpint) continue; 
    1762     sid = atoi(tmpint); 
    1763     PG_GET_STR_COL(fqdn, i, "fqdn"); 
    1764     PG_GET_STR_COL(dsn, i, "dsn"); 
    1765     PG_GET_STR_COL(remote_cn, i, "remote_cn"); 
    1766     PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
    1767     storagenode_id = tmpint ? atoi(tmpint) : 0; 
    1768  
    1769     uuidinfo = calloc(1, sizeof(*uuidinfo)); 
    1770     uuidinfo->uuid_str = strdup(uuid_str); 
    1771     uuidinfo->remote_cn = strdup(remote_cn); 
    1772     uuidinfo->storagenode_id = storagenode_id; 
    1773     uuidinfo->sid = sid; 
    1774     noit_hash_store(&uuid_to_info_cache, 
    1775                     uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 
    1776     noitL(ds_deb, "CHECK[%s] -> { remote_cn: '%s', storagenode_id: '%d' }\n", 
    1777           uuidinfo->uuid_str, uuidinfo->remote_cn, uuidinfo->storagenode_id); 
    1778     loaded++; 
    1779     if(!noit_hash_retrieve(&storagenode_to_info_cache, 
    1780                            (void *)&storagenode_id, sizeof(int), &vinfo)) { 
    1781       storagenode_info *info; 
    1782       info = calloc(1, sizeof(*info)); 
    1783       info->storagenode_id = storagenode_id; 
    1784       info->fqdn = fqdn ? strdup(fqdn) : NULL; 
    1785       info->dsn = dsn ? strdup(dsn) : NULL; 
    1786       noit_hash_store(&storagenode_to_info_cache, 
    1787                       (void *)&info->storagenode_id, sizeof(int), info); 
    1788       noitL(ds_deb, "SN[%d] -> { fqdn: '%s', dsn: '%s' }\n", 
    1789             info->storagenode_id, 
    1790             info->fqdn ? info->fqdn : "", info->dsn ? info->dsn : ""); 
    1791     } 
    1792   } 
    1793   PQclear(d->res); 
    1794  bad_row: 
    1795   free_params(d); 
    1796  
    1797   release_conn_q(cq); 
    1798   noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded); 
    1799   return loaded; 
    1800 } 
    1801318 
    1802319static int 
     
    1804321                     int npats, char **pats) { 
    1805322  noit_http_session_ctx *ctx = restc->http_ctx; 
    1806   ds_single_detail *d; 
    1807   int row_count = 0; 
    1808   const char *xml = NULL; 
    1809   conn_q *cq = NULL; 
     323  char *xml = NULL; 
    1810324 
    1811325  if(npats != 0) { 
     
    1814328    return 0; 
    1815329  } 
    1816   d = calloc(1, sizeof(*d)); 
    1817   GET_QUERY(config_get); 
    1818   cq = get_conn_q_for_metanode(); 
    1819   if(!cq) { 
    1820     noit_http_response_server_error(ctx, "text/xml"); 
    1821     goto bad_row; 
    1822   } 
    1823  
    1824   DECLARE_PARAM_STR(restc->remote_cn, 
    1825                     restc->remote_cn ? strlen(restc->remote_cn) : 0); 
    1826   PG_EXEC(config_get); 
    1827   row_count = PQntuples(d->res); 
    1828   if(row_count == 1) PG_GET_STR_COL(xml, 0, "config"); 
     330 
     331  xml = ingestor->get_noit_config(restc->remote_cn); 
    1829332 
    1830333  if(xml == NULL) { 
     
    1832335    snprintf(buff, sizeof(buff), "<error><remote_cn>%s</remote_cn>" 
    1833336                                 "<row_count>%d</row_count></error>\n", 
    1834              restc->remote_cn, row_count); 
     337             restc->remote_cn, 0); 
    1835338    noit_http_response_append(ctx, buff, strlen(buff)); 
    1836339    noit_http_response_not_found(ctx, "text/xml"); 
     
    1840343    noit_http_response_ok(ctx, "text/xml"); 
    1841344  } 
    1842  bad_row: 
    1843   free_params((ds_single_detail *)d); 
    1844   d->nparams = 0; 
    1845   if(cq) release_conn_q(cq); 
    1846  
     345 
     346  if(xml) free(xml); 
    1847347  noit_http_response_end(ctx); 
    1848348  return 0; 
     
    1850350 
    1851351void 
     352stratcon_datastore_iep_check_preload() { 
     353  ingestor->iep_check_preload(); 
     354} 
     355 
     356int 
     357stratcon_datastore_saveconfig(void *unused) { 
     358  return ingestor->save_config(); 
     359} 
     360 
     361void 
    1852362stratcon_datastore_init() { 
    1853   pthread_mutex_init(&ds_conns_lock, NULL); 
    1854   pthread_mutex_init(&storagenode_to_info_cache_lock, NULL); 
    1855363  ds_err = noit_log_stream_find("error/datastore"); 
    1856364  ds_deb = noit_log_stream_find("debug/datastore"); 
     
    1864372    exit(-1); 
    1865373  } 
    1866   stratcon_datastore_ingest_all_check_info(); 
    1867   stratcon_datastore_ingest_all_storagenode_info(); 
    1868   stratcon_datastore_sweep_journals(); 
    1869374 
    1870375  assert(noit_http_rest_register_auth( 
  • src/stratcon_datastore.h

    r35d80a0 r5f816fc  
    3737#include "eventer/eventer.h" 
    3838#include "utils/noit_hash.h" 
     39#include "stratcon_realtime_http.h" 
    3940 
    4041#include <sys/types.h> 
    4142#include <sys/socket.h> 
     43 
     44typedef struct { 
     45  void (*launch_file_ingestion)(const char *file, const char *ip, 
     46                                const char *cn, const char *store); 
     47  void (*iep_check_preload)(); 
     48  int (*storage_node_lookup)(const char *uuid_str, const char *remote_cn, 
     49                             int *sid_out, int *storagenode_id_out, 
     50                             const char **remote_cn_out, 
     51                             const char **fqdn_out, const char **dsn_out); 
     52  void (*submit_realtime_lookup)(struct realtime_tracker *rt, 
     53                                 eventer_t completion); 
     54  char *(*get_noit_config)(const char *cn); 
     55  int (*save_config)(); 
     56} ingestor_api_t; 
     57 
     58API_EXPORT(int) stratcon_datastore_set_ingestor(ingestor_api_t *ni); 
     59 
     60typedef struct { 
     61  char *remote_str; 
     62  char *remote_cn; 
     63  char *fqdn; 
     64  int storagenode_id; 
     65  int fd;  
     66  char *filename; 
     67} interim_journal_t; 
    4268 
    4369typedef enum { 
  • test/t/testconfig.pm

    r09ecd28 r5f816fc  
    463463  my $options = shift; 
    464464  $options->{cwd} ||= cwd(); 
    465   $options->{generics} ||= { 'stomp_driver' => { image => 'stomp_driver' } }; 
     465  $options->{generics} ||= { 'stomp_driver' => { image => 'stomp_driver' }, 
     466                             'postgres_ingestor' => { image => 'postgres_ingestor' } }; 
    466467  $options->{rest_acls} ||= [ { type => 'deny', rules => [ { type => 'allow' } ] } ]; 
    467468  $options->{iep}->{mq} ||= { 'stomp' => {} };