Changeset 5360a1ee7f4ade3bd6299e566204ee1820aeffa3

Show
Ignore:
Timestamp:
10/19/09 01:22:39 (4 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1255915359 +0000
git-parent:

[08e498f88b47f53edf390c85d08577fa1d7686ab]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1255915359 +0000
Message:

hefty patch. First part of stratcond support for horizontal partitioning of storage. refs #150

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon.conf.in

    r8504a3b r5360a1e  
    99      <log name="error/iep"/> 
    1010      <log name="error/eventer" disabled="true"/> 
     11      <log name="error/datastore" disabled="true"/> 
    1112      <log name="debug/eventer" disabled="true"/> 
    1213    </console_output> 
     
    3132  </noits> 
    3233 
    33   <iep disabled="false"> <!-- false the default --> 
     34  <iep disabled="true"> <!-- false the default --> 
    3435    <start directory="%iepdbdir%" 
    3536           command="%iepbindir%/run-iep.sh" /> 
     
    6263 
    6364  <database> 
     65    <journal> 
     66      <path>/var/log/stratcon.persist</path> 
     67    </journal> 
    6468    <dbconfig> 
    6569      <host>localhost</host> 
     
    6973    </dbconfig> 
    7074    <statements> 
     75      <!-- These are optional and used for stuff like setting search paths --> 
     76      <!-- 
     77      <metanodepostconnect><![CDATA[ 
     78        SELECT do_some_magic(); 
     79      ]]></metanodepostconnect> 
     80      <storagepostconnect><![CDATA[ 
     81        SELECT do_some_magic($1,$2); 
     82      ]]></storagepostconnect> 
     83      --> 
    7184      <allchecks><![CDATA[ 
    7285        SELECT remote_address, id, target, module, name 
    73           FROM stratcon.mv_loading_dock_check_s 
     86          FROM noit.get_checks() 
    7487      ]]></allchecks> 
    7588      <findcheck><![CDATA[ 
    76         SELECT remote_address, id 
    77           FROM stratcon.mv_loading_dock_check_s 
    78          WHERE sid = $1 
     89        SELECT remote_address, id, target, module, name 
     90          FROM noit.get_check($1,$2) 
    7991      ]]></findcheck> 
     92      <allstoragenodes><![CDATA[ 
     93        SELECT storage_node_id, fqdn, dsn 
     94          FROM stratcon.storage_node 
     95      ]]></allstoragenodes> 
     96      <findstoragenode><![CDATA[ 
     97        SELECT fqdn, dsn 
     98          FROM stratcon.storage_node 
     99         WHERE storage_node_id = $1 
     100      ]]></findstoragenode> 
     101      <mapallchecks><![CDATA[ 
     102        SELECT id, sid, storage_node_id, fqdn, dsn 
     103          FROM stratcon.map_uuid_to_sid LEFT JOIN stratcon.storage_node USING (storage_node_id) 
     104      ]]></mapallchecks> 
     105      <mapchecktostoragenode><![CDATA[ 
     106        SELECT o_storage_node_id as storage_node_id, o_sid as sid, o_fqdn as fqdn, o_dsn as dsn from stratcon.map_uuid_to_sid($1,$2) 
     107      ]]></mapchecktostoragenode> 
    80108      <check><![CDATA[ 
    81109        INSERT INTO stratcon.loading_dock_check_s 
  • src/stratcon_datastore.c

    r4201a42 r5360a1e  
    11/* 
    2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc. 
     2 * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc. 
    33 * All rights reserved. 
    44 * 
     
    3535#include "utils/noit_log.h" 
    3636#include "utils/noit_b64.h" 
     37#include "utils/noit_str.h" 
     38#include "utils/noit_mkdir.h" 
    3739#include "stratcon_datastore.h" 
    3840#include "stratcon_realtime_http.h" 
     
    4143#include "noit_check.h" 
    4244#include <unistd.h> 
     45#include <fcntl.h> 
    4346#include <netinet/in.h> 
    4447#include <sys/un.h> 
     48#include <dirent.h> 
    4549#include <arpa/inet.h> 
     50#include <sys/mman.h> 
    4651#include <libpq-fe.h> 
    4752#include <zlib.h> 
    4853#include <assert.h> 
    49  
    50 static char *check_loadall = NULL; 
    51 static const char *check_loadall_conf = "/stratcon/database/statements/allchecks"; 
    52 static char *check_find = NULL; 
    53 static const char *check_find_conf = "/stratcon/database/statements/findcheck"; 
    54 static char *check_insert = NULL; 
    55 static const char *check_insert_conf = "/stratcon/database/statements/check"; 
    56 static char *status_insert = NULL; 
    57 static const char *status_insert_conf = "/stratcon/database/statements/status"; 
    58 static char *metric_insert_numeric = NULL; 
    59 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric"; 
    60 static char *metric_insert_text = NULL; 
    61 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text"; 
    62 static char *config_insert = NULL; 
    63 static const char *config_insert_conf = "/stratcon/database/statements/config"; 
     54#include <errno.h> 
     55 
     56#define DECL_STMT(codename,confname) \ 
     57static char *codename = NULL; \ 
     58static const char *codename##_conf = "/stratcon/database/statements/" #confname 
     59 
     60DECL_STMT(storage_post_connect, storagepostconnect); 
     61DECL_STMT(metanode_post_connect, metanodepostconnect); 
     62DECL_STMT(find_storage, findstoragenode); 
     63DECL_STMT(all_storage, allstoragenodes); 
     64DECL_STMT(check_map, mapchecktostoragenode); 
     65DECL_STMT(check_mapall, mapallchecks); 
     66DECL_STMT(check_loadall, allchecks); 
     67DECL_STMT(check_find, findcheck); 
     68DECL_STMT(check_insert, check); 
     69DECL_STMT(status_insert, status); 
     70DECL_STMT(metric_insert_numeric, metric_numeric); 
     71DECL_STMT(metric_insert_text, metric_text); 
     72DECL_STMT(config_insert, config); 
     73 
     74static noit_log_stream_t ds_err = NULL; 
     75static noit_log_stream_t ingest_err = NULL; 
    6476 
    6577static struct datastore_onlooker_list { 
    66   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, void *); 
     78  void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, 
     79                   const char *, void *); 
    6780  struct datastore_onlooker_list *next; 
    6881} *onlookers = NULL; 
     
    7386      goto bad_row; \ 
    7487} while(0) 
     88 
     89struct conn_q; 
     90 
     91typedef struct { 
     92  char            *queue_name; /* the key fqdn+remote_sn */ 
     93  eventer_jobq_t  *jobq; 
     94  struct conn_q   *head; 
     95  pthread_mutex_t  lock; 
     96  pthread_cond_t   cv; 
     97  int              ttl; 
     98  int              in_pool; 
     99  int              outstanding; 
     100  int              max_allocated; 
     101  int              max_in_pool; 
     102} conn_pool; 
     103typedef struct conn_q { 
     104  time_t           last_use; 
     105  char            *dsn;        /* Pg connect string */ 
     106  char            *remote_str; /* the IP of the noit*/ 
     107  char            *remote_cn;  /* the Cert CN of the noit */ 
     108  char            *fqdn;       /* the fqdn of the storage node */ 
     109  conn_pool       *pool; 
     110  struct conn_q   *next; 
     111  /* Postgres specific stuff */ 
     112  PGconn          *dbh; 
     113} conn_q; 
     114 
    75115 
    76116#define MAX_PARAMS 8 
     
    89129  POSTGRES_PARTS 
    90130} ds_single_detail; 
    91 typedef struct ds_job_detail
     131typedef struct
    92132  /* Postgres specific stuff */ 
    93133  POSTGRES_PARTS 
    94  
    95   char *data;  /* The raw string, NULL means the stream is done -- commit. */ 
    96134  struct realtime_tracker *rt; 
    97  
     135  conn_q *cq; /* connection on which to perform this job */ 
     136  eventer_t completion_event; /* This event should be registered if non NULL */ 
     137} ds_rt_detail; 
     138typedef struct ds_line_detail { 
     139  /* Postgres specific stuff */ 
     140  POSTGRES_PARTS 
     141  char *data; 
    98142  int problematic; 
    99   eventer_t completion_event; /* This event should be registered if non NULL */ 
    100   struct ds_job_detail *next; 
    101 } ds_job_detail; 
     143  struct ds_line_detail *next; 
     144} ds_line_detail; 
    102145 
    103146typedef struct { 
    104   struct sockaddr *remote; 
    105   eventer_jobq_t  *jobq; 
    106   /* Postgres specific stuff */ 
    107   PGconn          *dbh; 
    108   ds_job_detail   *head; 
    109   ds_job_detail   *tail; 
    110 } conn_q; 
     147  noit_hash_table *ws; 
     148  eventer_t completion; 
     149} syncset_t; 
     150 
     151typedef struct { 
     152  char *remote_str; 
     153  char *remote_cn; 
     154  char *fqdn; 
     155  int storagenode_id; 
     156  int fd; 
     157  char *filename; 
     158  conn_pool *cpool; 
     159} interim_journal_t; 
    111160 
    112161static int stratcon_database_connect(conn_q *cq); 
     
    120169} 
    121170 
    122 static void 
    123 __append(conn_q *q, ds_job_detail *d) { 
    124   d->next = NULL; 
    125   if(!q->head) q->head = q->tail = d; 
    126   else { 
    127     q->tail->next = d; 
    128     q->tail = d; 
    129   } 
    130 
    131 static void 
    132 __remove_until(conn_q *q, ds_job_detail *d) { 
    133   ds_job_detail *next; 
    134   while(q->head && q->head != d) { 
    135     next = q->head; 
    136     q->head = q->head->next; 
    137     free_params((ds_single_detail *)next); 
    138     if(next->data) free(next->data); 
    139     free(next); 
    140   } 
    141   if(!q->head) q->tail = NULL; 
    142 
    143  
     171char *basejpath = NULL; 
     172pthread_mutex_t ds_conns_lock; 
    144173noit_hash_table ds_conns; 
    145  
    146 conn_q * 
    147 __get_conn_q_for_remote(struct sockaddr *remote) { 
    148   void *vcq; 
    149   conn_q *cq; 
    150   char queue_name[128] = "datastore_"; 
    151   static const char __zeros[4] = { 0 }; 
    152   int len = 0; 
     174noit_hash_table working_sets; 
     175 
     176/* the fqdn cache needs to be thread safe */ 
     177typedef struct { 
     178  char *uuid_str; 
     179  int storagenode_id; 
     180  int sid; 
     181} uuid_info; 
     182typedef struct { 
     183  int storagenode_id; 
     184  char *fqdn; 
     185  char *dsn; 
     186} storagenode_info; 
     187noit_hash_table uuid_to_info_cache; 
     188pthread_mutex_t fqdn_to_info_cache_lock; 
     189noit_hash_table fqdn_to_info_cache; 
     190 
     191int 
     192convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) { 
     193  char name[128] = ""; 
     194  buff[0] = '\0'; 
    153195  if(remote) { 
     196    int len = 0; 
    154197    switch(remote->sa_family) { 
    155198      case AF_INET: 
    156199        len = sizeof(struct sockaddr_in); 
    157200        inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr, 
    158                   queue_name + strlen("datastore_"), len); 
     201                  name, len); 
    159202        break; 
    160203      case AF_INET6: 
    161204       len = sizeof(struct sockaddr_in6); 
    162205        inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr, 
    163                   queue_name + strlen("datastore_"), len); 
     206                  name, len); 
    164207       break; 
    165208      case AF_UNIX: 
    166209        len = SUN_LEN(((struct sockaddr_un *)remote)); 
    167         snprintf(queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path); 
     210        snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path); 
    168211        break; 
    169       default: return NULL; 
    170     } 
     212      default: return 0; 
     213    } 
     214  } 
     215  strlcpy(buff, name, blen); 
     216  return strlen(buff); 
     217
     218 
     219/* Thread-safe connection pools */ 
     220 
     221/* Forcefree -> 1 prevents it from going to the pool and it gets freed */ 
     222static void 
     223release_conn_q_forceable(conn_q *cq, int forcefree) { 
     224  int putback = 0; 
     225  cq->last_use = time(NULL); 
     226  pthread_mutex_lock(&cq->pool->lock); 
     227  cq->pool->outstanding--; 
     228  if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) { 
     229    putback = 1; 
     230    cq->next = cq->pool->head; 
     231    cq->pool->head = cq; 
     232    cq->pool->in_pool++; 
     233  } 
     234  pthread_mutex_unlock(&cq->pool->lock); 
     235  noitL(noit_debug, "[%p] release %s [%s]\n", (void *)pthread_self(), 
     236        putback ? "to pool" : "and destroy", cq->pool->queue_name); 
     237  pthread_cond_signal(&cq->pool->cv); 
     238  if(putback) return; 
     239 
     240  /* Not put back, release it */ 
     241  if(cq->dbh) PQfinish(cq->dbh); 
     242  if(cq->remote_str) free(cq->remote_str); 
     243  if(cq->remote_cn) free(cq->remote_cn); 
     244  if(cq->fqdn) free(cq->fqdn); 
     245  if(cq->dsn) free(cq->dsn); 
     246  free(cq); 
     247
     248static void 
     249ttl_purge_conn_pool(conn_pool *pool) { 
     250  int old_cnt, new_cnt; 
     251  time_t now = time(NULL); 
     252  conn_q *cq, *prev = NULL, *iter; 
     253  /* because we always replace on the head and update the last_use time when 
     254     doing so, we know they are ordered LRU on the end.  So, once we hit an 
     255     old one, we know all the others are old too. 
     256   */ 
     257  if(!pool->head) return; /* hack short circuit for no locks */ 
     258  pthread_mutex_lock(&pool->lock); 
     259  old_cnt = pool->in_pool; 
     260  cq = pool->head; 
     261  while(cq) { 
     262    if(cq->last_use + cq->pool->ttl < now) { 
     263      if(prev) prev->next = NULL; 
     264      else pool->head = NULL; 
     265      break; 
     266    } 
     267    prev = cq; 
     268    cq = cq->next; 
     269  } 
     270  /* Now pool->head is a chain of unexpired and cq is a chain of expired */ 
     271  /* Fix accounting */ 
     272  for(iter=cq; iter; iter=iter->next) pool->in_pool--; 
     273  new_cnt = pool->in_pool; 
     274  pthread_mutex_unlock(&pool->lock); 
     275 
     276  /* Force release these without holding the lock */ 
     277  while(cq) { 
     278    prev = cq; 
     279    cq = cq->next; 
     280    release_conn_q_forceable(cq, 1); 
     281  } 
     282  if(old_cnt != new_cnt) 
     283    noitL(noit_debug, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt, 
     284          pool->queue_name); 
     285
     286static void 
     287release_conn_q(conn_q *cq) { 
     288  ttl_purge_conn_pool(cq->pool); 
     289  release_conn_q_forceable(cq, 0); 
     290
     291static conn_pool * 
     292get_conn_pool_for_remote(const char *remote_str, 
     293                         const char *remote_cn, const char *fqdn) { 
     294  void *vcpool; 
     295  conn_pool *cpool = NULL; 
     296  char queue_name[256] = "datastore_"; 
     297  snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s", 
     298           (remote_str && *remote_str) ? remote_str : "0.0.0.0", 
     299           fqdn ? fqdn : "default", 
     300           remote_cn ? remote_cn : "default"); 
     301  pthread_mutex_lock(&ds_conns_lock); 
     302  if(noit_hash_retrieve(&ds_conns, (const char *)queue_name, 
     303                        strlen(queue_name), &vcpool)) 
     304    cpool = vcpool; 
     305  pthread_mutex_unlock(&ds_conns_lock); 
     306  if(!cpool) { 
     307    vcpool = cpool = calloc(1, sizeof(*cpool)); 
     308    cpool->queue_name = strdup(queue_name); 
     309    pthread_mutex_init(&cpool->lock, NULL); 
     310    pthread_cond_init(&cpool->cv, NULL); 
     311    cpool->in_pool = 0; 
     312    cpool->outstanding = 0; 
     313    cpool->max_in_pool = 1; 
     314    cpool->max_allocated = 1; 
     315    pthread_mutex_lock(&ds_conns_lock); 
     316    if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name), 
     317                        cpool)) { 
     318      noit_hash_retrieve(&ds_conns, (const char *)queue_name, 
     319                         strlen(queue_name), &vcpool); 
     320    } 
     321    pthread_mutex_unlock(&ds_conns_lock); 
     322    if(vcpool != cpool) { 
     323      /* someone beat us to it */ 
     324      free(cpool->queue_name); 
     325      pthread_mutex_destroy(&cpool->lock); 
     326      pthread_cond_destroy(&cpool->cv); 
     327      free(cpool); 
     328    } 
     329    else { 
     330      int i; 
     331      /* Our job to setup the pool */ 
     332      cpool->jobq = calloc(1, sizeof(*cpool->jobq)); 
     333      eventer_jobq_init(cpool->jobq, queue_name); 
     334      cpool->jobq->backq = eventer_default_backq(); 
     335      /* Add one thread */ 
     336      for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++) 
     337        eventer_jobq_increase_concurrency(cpool->jobq); 
     338    } 
     339    cpool = vcpool; 
     340  } 
     341  return cpool; 
     342
     343static conn_q * 
     344get_conn_q_for_remote(const char *remote_str, 
     345                      const char *remote_cn, const char *fqdn, 
     346                      const char *dsn) { 
     347  conn_pool *cpool; 
     348  conn_q *cq; 
     349  cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn); 
     350 again: 
     351  noitL(noit_debug, "[%p] requesting [%s]\n", (void *)pthread_self(), 
     352        cpool->queue_name); 
     353  pthread_mutex_lock(&cpool->lock); 
     354  if(cpool->head) { 
     355    assert(cpool->in_pool > 0); 
     356    cq = cpool->head; 
     357    cpool->head = cq->next; 
     358    cpool->in_pool--; 
     359    cpool->outstanding++; 
     360    cq->next = NULL; 
     361    pthread_mutex_unlock(&cpool->lock); 
     362    return cq; 
     363  } 
     364  if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) { 
     365    noitL(noit_debug, "[%p] over-subscribed, waiting [%s]\n", 
     366          (void *)pthread_self(), cpool->queue_name); 
     367    pthread_cond_wait(&cpool->cv, &cpool->lock); 
     368    goto again; 
    171369  } 
    172370  else { 
    173     /* This is a dummy connection */ 
    174     remote = (struct sockaddr *)__zeros; 
    175     snprintf(queue_name, sizeof(queue_name), "datastore_default"); 
    176     len = 4; 
    177   } 
    178   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, &vcq)) 
    179     return vcq; 
     371    cpool->outstanding++; 
     372    pthread_mutex_unlock(&cpool->lock); 
     373  } 
     374  
    180375  cq = calloc(1, sizeof(*cq)); 
    181   cq->remote = malloc(len); 
    182   memcpy(cq->remote, remote, len); 
    183   cq->jobq = calloc(1, sizeof(*cq->jobq)); 
    184   eventer_jobq_init(cq->jobq, queue_name); 
    185   cq->jobq->backq = eventer_default_backq(); 
    186   /* Add one thread */ 
    187   eventer_jobq_increase_concurrency(cq->jobq); 
    188   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq); 
     376  cq->pool = cpool; 
     377  cq->remote_str = remote_str ? strdup(remote_str) : NULL; 
     378  cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL; 
     379  cq->fqdn = fqdn ? strdup(fqdn) : NULL; 
     380  cq->dsn = dsn ? strdup(dsn) : NULL; 
    189381  return cq; 
     382} 
     383static conn_q * 
     384get_conn_q_for_metanode() { 
     385  return get_conn_q_for_remote(NULL,NULL,NULL,NULL); 
    190386} 
    191387 
     
    243439  if(d->rv != PGRES_COMMAND_OK && \ 
    244440     d->rv != PGRES_TUPLES_OK) { \ 
    245     noitL(noit_error, "stratcon datasource bad (%d): %s\n'%s'\n", \ 
     441    noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s'\n", \ 
    246442          d->rv, PQresultErrorMessage(d->res), cmd); \ 
    247443    PQclear(d->res); \ 
     
    262458  if(d->rv != PGRES_COMMAND_OK && \ 
    263459     d->rv != PGRES_TUPLES_OK) { \ 
    264     noitL(noit_error, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \ 
     460    noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \ 
    265461          d->rv, PQresultErrorMessage(d->res), cmdbuf, \ 
    266462          (long long unsigned)whence); \ 
     
    274470                                    struct timeval *now) { 
    275471  conn_q *cq = closure; 
    276   ds_job_detail *d; 
     472  ds_single_detail *d; 
    277473  int i, row_count = 0, good = 0; 
    278474  char buff[1024]; 
     
    315511 
    316512    /* stratcon_iep_line_processor takes an allocated operand and frees it */ 
    317     stratcon_iep_line_processor(DS_OP_INSERT, sin, strdup(buff)); 
     513    stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL); 
    318514    good++; 
    319515  } 
    320516  noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count); 
    321517 bad_row: 
    322   PQclear(d->res); 
     518  free_params((ds_single_detail *)d); 
    323519  free(d); 
     520  release_conn_q(cq); 
    324521  return 0; 
    325522} 
     
    328525  eventer_t e; 
    329526  conn_q *cq; 
    330   cq = __get_conn_q_for_remote(NULL); 
     527  cq = get_conn_q_for_metanode(); 
    331528 
    332529  e = eventer_alloc(); 
     
    334531  e->callback = stratcon_datastore_asynch_drive_iep; 
    335532  e->closure = cq; 
    336   eventer_add_asynch(cq->jobq, e); 
     533  eventer_add_asynch(cq->pool->jobq, e); 
    337534} 
    338535execute_outcome_t 
    339 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) { 
     536stratcon_datastore_find(conn_q *cq, ds_rt_detail *d) { 
    340537  char *val; 
    341538  int row_count; 
    342  
    343   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid); 
     539  struct realtime_tracker *node; 
     540 
    344541  GET_QUERY(check_find); 
    345   PG_EXEC(check_find); 
    346   row_count = PQntuples(d->res); 
    347   if(row_count != 1) goto bad_row; 
    348  
    349   /* Get the check uuid */ 
    350   PG_GET_STR_COL(val, 0, "id"); 
    351   if(!val) goto bad_row; 
    352   if(uuid_parse(val, d->rt->checkid)) goto bad_row; 
    353  
    354   /* Get the remote_address (which noit owns this) */ 
    355   PG_GET_STR_COL(val, 0, "remote_address"); 
    356   if(!val) goto bad_row; 
    357   d->rt->noit = strdup(val); 
    358  
    359   PQclear(d->res); 
     542  for(node = d->rt; node; node = node->next) { 
     543    DECLARE_PARAM_INT(node->sid); 
     544    PG_EXEC(check_find); 
     545    row_count = PQntuples(d->res); 
     546    if(row_count != 1) { 
     547      PQclear(d->res); 
     548      goto bad_row; 
     549    } 
     550 
     551    /* Get the check uuid */ 
     552    PG_GET_STR_COL(val, 0, "id"); 
     553    if(!val) { 
     554      PQclear(d->res); 
     555      goto bad_row; 
     556    } 
     557    if(uuid_parse(val, node->checkid)) { 
     558      PQclear(d->res); 
     559      goto bad_row; 
     560    } 
     561   
     562    /* Get the remote_address (which noit owns this) */ 
     563    PG_GET_STR_COL(val, 0, "remote_address"); 
     564    if(!val) { 
     565      PQclear(d->res); 
     566      goto bad_row; 
     567    } 
     568    node->noit = strdup(val); 
     569  
     570   bad_row:  
     571    free_params((ds_single_detail *)d); 
     572    d->nparams = 0; 
     573  } 
    360574  return DS_EXEC_SUCCESS; 
    361  bad_row: 
    362   return DS_EXEC_ROW_FAILED; 
    363575} 
    364576execute_outcome_t 
    365 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) { 
     577stratcon_datastore_execute(conn_q *cq, const char *r, ds_line_detail *d) { 
    366578  int type, len; 
    367579  char *final_buff; 
    368   uLong final_len, actual_final_len;; 
     580  uLong final_len, actual_final_len; 
    369581  char *token; 
     582  char raddr_blank[1] = ""; 
     583  const char *raddr; 
    370584 
    371585  type = d->data[0]; 
     586  raddr = r ? r : raddr_blank; 
    372587 
    373588  /* Parse the log line, but only if we haven't already */ 
    374589  if(!d->nparams) { 
    375     char raddr[128]; 
    376590    char *scp, *ecp; 
    377591 
    378     /* setup our remote address */ 
    379     raddr[0] = '\0'; 
    380     switch(r->sa_family) { 
    381       case AF_INET: 
    382         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr), 
    383                   raddr, sizeof(raddr)); 
    384         break; 
    385       case AF_INET6: 
    386         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr), 
    387                   raddr, sizeof(raddr)); 
    388         break; 
    389       default: 
    390         noitL(noit_error, "remote address of family %d\n", r->sa_family); 
    391     } 
    392   
    393592    scp = d->data; 
    394593#define PROCESS_NEXT_FIELD(t,l) do { \ 
     
    547746} 
    548747static int 
     748stratcon_database_post_connect(conn_q *cq) { 
     749  int rv = 0; 
     750  ds_single_detail _d = { 0 }, *d = &_d; 
     751  if(cq->remote_str) { 
     752    char *remote_str, *remote_cn; 
     753    /* This is the silly way we get null's in through our declare_param_str */ 
     754    remote_str = cq->remote_str ? cq->remote_str : "[[null]]"; 
     755    remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]"; 
     756    /* This is a storage node, it gets the storage node post_connect */ 
     757    GET_QUERY(storage_post_connect); 
     758    rv = -1; /* now we're serious */ 
     759    DECLARE_PARAM_STR(remote_str, strlen(remote_str)); 
     760    DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 
     761    PG_EXEC(storage_post_connect); 
     762    PQclear(d->res); 
     763    rv = 0; 
     764  } 
     765  else { 
     766    /* Metanode post_connect */ 
     767    GET_QUERY(metanode_post_connect); 
     768    rv = -1; /* now we're serious */ 
     769    PG_EXEC(metanode_post_connect); 
     770    PQclear(d->res); 
     771    rv = 0; 
     772  } 
     773 bad_row: 
     774  free_params(d); 
     775  if(rv == -1) { 
     776    /* Post-connect intentions are serious and fatal */ 
     777    PQfinish(cq->dbh); 
     778    cq->dbh = NULL; 
     779  } 
     780  return rv; 
     781} 
     782static int 
    549783stratcon_database_connect(conn_q *cq) { 
    550   char dsn[512]; 
     784  char *dsn, dsn_meta[512]; 
    551785  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
    552786  const char *k, *v; 
     
    554788  noit_hash_table *t; 
    555789 
    556   dsn[0] = '\0'; 
    557   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 
    558   while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 
    559     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); 
    560     strlcat(dsn, k, sizeof(dsn)); 
    561     strlcat(dsn, "=", sizeof(dsn)); 
    562     strlcat(dsn, v, sizeof(dsn)); 
    563   } 
    564   noit_hash_destroy(t, free, free); 
    565   free(t); 
     790  dsn_meta[0] = '\0'; 
     791  if(!cq->dsn) { 
     792    t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 
     793    while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 
     794      if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta)); 
     795      strlcat(dsn_meta, k, sizeof(dsn_meta)); 
     796      strlcat(dsn_meta, "=", sizeof(dsn_meta)); 
     797      strlcat(dsn_meta, v, sizeof(dsn_meta)); 
     798    } 
     799    noit_hash_destroy(t, free, free); 
     800    free(t); 
     801    dsn = dsn_meta; 
     802  } 
     803  else dsn = cq->dsn; 
    566804 
    567805  if(cq->dbh) { 
    568806    if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    569807    PQreset(cq->dbh); 
     808    if(stratcon_database_post_connect(cq)) return -1; 
    570809    if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    571810    noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", 
     
    576815  cq->dbh = PQconnectdb(dsn); 
    577816  if(!cq->dbh) return -1; 
     817  if(stratcon_database_post_connect(cq)) return -1; 
    578818  if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    579819  noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", 
     
    625865stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, 
    626866                                 struct timeval *now) { 
    627   conn_q *cq = closure; 
    628   ds_job_detail *current, *next
     867  ds_rt_detail *dsjd = closure; 
     868  conn_q *cq = dsjd->cq
    629869  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    630870 
    631   if(!cq->head) return 0;  
    632  
    633871  stratcon_database_connect(cq); 
    634  
    635   current = cq->head;  
    636   while(current) { 
    637     if(current->rt) { 
    638       next = current->next; 
    639       stratcon_datastore_find(cq, current); 
    640       current = next; 
    641     } 
    642     else if(current->completion_event) { 
    643       next = current->next; 
    644       eventer_add(current->completion_event); 
    645       current = next; 
    646       __remove_until(cq, current); 
    647     } 
    648     else current = current->next; 
    649   } 
     872  assert(dsjd->rt); 
     873  stratcon_datastore_find(cq, dsjd); 
     874  if(dsjd->completion_event) 
     875    eventer_add(dsjd->completion_event); 
     876 
     877  free_params((ds_single_detail *)dsjd); 
     878  free(dsjd); 
     879  release_conn_q(cq); 
    650880  return 0; 
     881} 
     882static const char * 
     883get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) { 
     884  void *vinfo; 
     885  const char *dsn = NULL, *fqdn = NULL; 
     886  int found = 0; 
     887  storagenode_info *info = NULL; 
     888  pthread_mutex_lock(&fqdn_to_info_cache_lock); 
     889  if(noit_hash_retrieve(&fqdn_to_info_cache, (void *)&id, sizeof(id), 
     890                        &vinfo)) { 
     891    found = 1; 
     892    info = vinfo; 
     893  } 
     894  pthread_mutex_unlock(&fqdn_to_info_cache_lock); 
     895  if(found) { 
     896    if(fqdn_out) *fqdn_out = info->fqdn; 
     897    return info->dsn; 
     898  } 
     899 
     900  if(!found && can_use_db) { 
     901    ds_single_detail *d; 
     902    conn_q *cq; 
     903    int row_count; 
     904    /* Look it up and store it */ 
     905    d = calloc(1, sizeof(*d)); 
     906    cq = get_conn_q_for_metanode(); 
     907    GET_QUERY(find_storage); 
     908    DECLARE_PARAM_INT(id); 
     909    PG_EXEC(find_storage); 
     910    row_count = PQntuples(d->res); 
     911    if(row_count) { 
     912      PG_GET_STR_COL(dsn, 0, "dsn"); 
     913      PG_GET_STR_COL(fqdn, 0, "fqdn"); 
     914    } 
     915    PQclear(d->res); 
     916   bad_row: 
     917    free_params(d); 
     918    free(d); 
     919  } 
     920  if(fqdn) { 
     921    info = calloc(1, sizeof(*info)); 
     922    info->fqdn = strdup(fqdn); 
     923    if(fqdn_out) *fqdn_out = info->fqdn; 
     924    info->dsn = dsn ? strdup(dsn) : NULL; 
     925    info->storagenode_id = id; 
     926    pthread_mutex_lock(&fqdn_to_info_cache_lock); 
     927    noit_hash_store(&fqdn_to_info_cache, 
     928                    (void *)&info->storagenode_id, sizeof(int), info); 
     929    pthread_mutex_unlock(&fqdn_to_info_cache_lock); 
     930  } 
     931  return info ? info->dsn : NULL; 
     932} 
     933static ds_line_detail * 
     934build_insert_batch(interim_journal_t *ij) { 
     935  int rv; 
     936  off_t len; 
     937  const char *buff, *cp, *lcp; 
     938  struct stat st; 
     939  ds_line_detail *head = NULL, *last = NULL, *next = NULL; 
     940 
     941  while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR); 
     942  assert(rv != -1); 
     943  len = st.st_size; 
     944  buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0); 
     945  if(buff == (void *)-1) { 
     946    noitL(noit_error, "mmap(%s) => %s\n", ij->filename, strerror(errno)); 
     947    assert(buff != (void *)-1); 
     948  } 
     949  lcp = buff; 
     950  while(lcp < (buff + len) && 
     951        NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) { 
     952    next = calloc(1, sizeof(*next)); 
     953    next->data = malloc(cp - lcp + 1); 
     954    memcpy(next->data, lcp, cp - lcp); 
     955    next->data[cp - lcp] = '\0'; 
     956    if(!head) head = next; 
     957    if(last) last->next = next; 
     958    last = next; 
     959    lcp = cp + 1; 
     960  } 
     961  munmap((void *)buff, len); 
     962  return head; 
     963} 
     964static void 
     965interim_journal_remove(interim_journal_t *ij) { 
     966  unlink(ij->filename); 
     967  if(ij->filename) free(ij->filename); 
     968  if(ij->remote_str) free(ij->remote_str); 
     969  if(ij->remote_cn) free(ij->remote_cn); 
     970  if(ij->fqdn) free(ij->fqdn); 
    651971} 
    652972int 
     
    654974                                  struct timeval *now) { 
    655975  int i; 
    656   conn_q *cq = closure; 
    657   ds_job_detail *current, *last_sp; 
     976  interim_journal_t *ij; 
     977  ds_line_detail *head, *current, *last_sp; 
     978  const char *dsn; 
     979  conn_q *cq; 
    658980  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    659981  if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
    660   if(!cq->head) return 0;  
    661  
     982 
     983  ij = closure; 
     984  dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn); 
     985  cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn, 
     986                             ij->fqdn, dsn); 
     987  noitL(noit_debug, "stratcon_datastore_asynch_execute[%s,%s,%s]\n", 
     988        ij->remote_str, ij->remote_cn, ij->fqdn); 
    662989 full_monty: 
    663990  /* Make sure we have a connection */ 
     
    670997  } 
    671998 
    672   current = cq->head;  
     999  head = build_insert_batch(ij); 
     1000  current = head;  
    6731001  last_sp = NULL; 
    6741002  if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq); 
     
    6791007  
    6801008      if(current->problematic) { 
    681         noitL(noit_error, "[%s] Failed noit line: %s", cq->jobq->queue_name, current->data); 
     1009        noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data); 
    6821010        RELEASE_SAVEPOINT("batch"); 
    6831011        current = current->next; 
    6841012        continue; 
    6851013      }  
    686       rv = stratcon_datastore_execute(cq, cq->remote, current); 
     1014      rv = stratcon_datastore_execute(cq, cq->remote_str, current); 
    6871015      switch(rv) { 
    6881016        case DS_EXEC_SUCCESS: 
     
    6991027      } 
    7001028    } 
    701     if(current->completion_event) { 
    702       if(last_sp) RELEASE_SAVEPOINT("batch"); 
    703       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); 
    704       eventer_add(current->completion_event); 
    705       current = current->next; 
    706       __remove_until(cq, current); 
    707     } 
    708   } 
     1029  } 
     1030  if(last_sp) RELEASE_SAVEPOINT("batch"); 
     1031  if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); 
     1032  /* Cleanup the mess */ 
     1033  while(head) { 
     1034    ds_line_detail *tofree; 
     1035    tofree = head; 
     1036    head = head->next; 
     1037    if(tofree->data) free(tofree->data); 
     1038    free_params((ds_single_detail *)tofree); 
     1039    free(tofree); 
     1040  } 
     1041  interim_journal_remove(ij); 
     1042  release_conn_q(cq); 
    7091043  return 0; 
     1044} 
     1045static int 
     1046stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure, 
     1047                                struct timeval *now) { 
     1048  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     1049  const char *k; 
     1050  int klen; 
     1051  void *vij; 
     1052  interim_journal_t *ij; 
     1053  syncset_t *syncset = closure; 
     1054 
     1055  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     1056  if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
     1057 
     1058  noitL(noit_debug, "Syncing journal sets...\n"); 
     1059  while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) { 
     1060    eventer_t ingest; 
     1061    ij = vij; 
     1062    noitL(noit_debug, "Syncing journal set [%s,%s,%s]\n", 
     1063          ij->remote_str, ij->remote_cn, ij->fqdn); 
     1064    fsync(ij->fd); 
     1065    ingest = eventer_alloc(); 
     1066    ingest->mask = EVENTER_ASYNCH; 
     1067    ingest->callback = stratcon_datastore_asynch_execute; 
     1068    ingest->closure = ij; 
     1069    eventer_add_asynch(ij->cpool->jobq, ingest); 
     1070  } 
     1071  noit_hash_destroy(syncset->ws, free, NULL); 
     1072  free(syncset->ws); 
     1073  eventer_add(syncset->completion); 
     1074  free(syncset); 
     1075  return 0; 
     1076} 
     1077static interim_journal_t * 
     1078interim_journal_get(struct sockaddr *remote, const char *remote_cn_in, 
     1079                    int storagenode_id, const char *fqdn_in) { 
     1080  void *vhash, *vij; 
     1081  noit_hash_table *working_set; 
     1082  interim_journal_t *ij; 
     1083  struct timeval now; 
     1084  char jpath[PATH_MAX]; 
     1085  char remote_str[128]; 
     1086  const char *remote_cn = remote_cn_in ? remote_cn_in : "default"; 
     1087  const char *fqdn = fqdn_in ? fqdn_in : "default"; 
     1088 
     1089  convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote); 
     1090  if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str)); 
     1091 
     1092  /* Lookup the working set */ 
     1093  if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) { 
     1094    working_set = calloc(1, sizeof(*working_set)); 
     1095    noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn), 
     1096                    working_set); 
     1097  } 
     1098  else 
     1099    working_set = vhash; 
     1100 
     1101  /* Lookup the interim journal within the working set */ 
     1102  if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) { 
     1103    ij = calloc(1, sizeof(*ij)); 
     1104    gettimeofday(&now, NULL); 
     1105    snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x", 
     1106             basejpath, remote_str, remote_cn, storagenode_id, 
     1107             (unsigned int)now.tv_sec, (unsigned int)now.tv_usec); 
     1108    ij->remote_str = strdup(remote_str); 
     1109    ij->remote_cn = strdup(remote_cn); 
     1110    ij->fqdn = strdup(fqdn); 
     1111    ij->storagenode_id = storagenode_id; 
     1112    ij->filename = strdup(jpath); 
     1113    ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 
     1114                                         ij->fqdn); 
     1115    ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640); 
     1116    if(ij->fd < 0 && errno == ENOENT) { 
     1117      if(mkdir_for_file(ij->filename, 0750)) { 
     1118        noitL(noit_error, "Failed to create dir for '%s': %s\n", 
     1119              ij->filename, strerror(errno)); 
     1120        exit(-1); 
     1121      } 
     1122      ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640); 
     1123    } 
     1124    if(ij->fd < 0) { 
     1125      noitL(noit_error, "Failed to open interim journal '%s': %s\n", 
     1126            ij->filename, strerror(errno)); 
     1127      exit(-1); 
     1128    } 
     1129    noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij); 
     1130  } 
     1131  else 
     1132    ij = vij; 
     1133 
     1134  return ij; 
     1135} 
     1136static void 
     1137storage_node_quick_lookup(const char *uuid_str, const char *remote_cn, 
     1138                          int *sid_out, int *storagenode_id_out, 
     1139                          char **fqdn_out, char **dsn_out) { 
     1140  /* only called from the main thread -- no safety issues */ 
     1141  void *vuuidinfo, *vinfo; 
     1142  uuid_info *uuidinfo; 
     1143  storagenode_info *info = NULL; 
     1144  char *fqdn = NULL; 
     1145  char *dsn = NULL; 
     1146  int storagenode_id = 0, sid = 0; 
     1147  if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str), 
     1148                         &vuuidinfo)) { 
     1149    int row_count; 
     1150    char *tmpint; 
     1151    ds_single_detail *d; 
     1152    conn_q *cq; 
     1153    d = calloc(1, sizeof(*d)); 
     1154    cq = get_conn_q_for_metanode(); 
     1155    if(stratcon_database_connect(cq) == 0) { 
     1156      /* Blocking call to service the cache miss */ 
     1157      GET_QUERY(check_map); 
     1158      DECLARE_PARAM_STR(uuid_str, strlen(uuid_str)); 
     1159      DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 
     1160      PG_EXEC(check_map); 
     1161      row_count = PQntuples(d->res); 
     1162      if(row_count != 1) { 
     1163        PQclear(d->res); 
     1164        goto bad_row; 
     1165      } 
     1166      PG_GET_STR_COL(tmpint, 0, "sid"); 
     1167      sid = atoi(tmpint); 
     1168      PG_GET_STR_COL(tmpint, 0, "storage_node_id"); 
     1169      if(tmpint) storagenode_id = atoi(tmpint); 
     1170      PG_GET_STR_COL(fqdn, 0, "fqdn"); 
     1171      PG_GET_STR_COL(dsn, 0, "dsn"); 
     1172      PQclear(d->res); 
     1173    } 
     1174   bad_row: 
     1175    free_params((ds_single_detail *)d); 
     1176    free(d); 
     1177    release_conn_q(cq); 
     1178    /* Place in cache */ 
     1179    if(fqdn) fqdn = strdup(fqdn); 
     1180    uuidinfo = calloc(1, sizeof(*uuidinfo)); 
     1181    uuidinfo->sid = sid; 
     1182    uuidinfo->uuid_str = strdup(uuid_str); 
     1183    noit_hash_store(&uuid_to_info_cache, 
     1184                    uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 
     1185    /* Also, we may have just witnessed a new storage node, store it */ 
     1186    if(storagenode_id) { 
     1187      int needs_free = 0; 
     1188      info = calloc(1, sizeof(*info)); 
     1189      info->storagenode_id = storagenode_id; 
     1190      info->dsn = dsn ? strdup(dsn) : NULL; 
     1191      info->fqdn = fqdn ? strdup(fqdn) : NULL; 
     1192      pthread_mutex_lock(&fqdn_to_info_cache_lock); 
     1193      if(!noit_hash_retrieve(&fqdn_to_info_cache, 
     1194                             (void *)&storagenode_id, sizeof(int), &vinfo)) { 
     1195        /* hack to save memory -- we *never* remove from these caches, 
     1196           so we can use the same fqdn value in the above cache for the key 
     1197           in the cache below -- (no strdup) */ 
     1198        noit_hash_store(&fqdn_to_info_cache, 
     1199                        (void *)&info->storagenode_id, sizeof(int), info); 
     1200      } 
     1201      else needs_free = 1; 
     1202      pthread_mutex_unlock(&fqdn_to_info_cache_lock); 
     1203      if(needs_free) { 
     1204        if(info->dsn) free(info->dsn); 
     1205        if(info->fqdn) free(info->fqdn); 
     1206        free(info); 
     1207      } 
     1208    } 
     1209  } 
     1210  else 
     1211    uuidinfo = vuuidinfo; 
     1212 
     1213  if(storagenode_id) { 
     1214    if(uuidinfo && 
     1215       ((!dsn && dsn_out) || (!fqdn && fqdn_out))) { 
     1216      /* we don't have dsn and we actually want it */ 
     1217      pthread_mutex_lock(&fqdn_to_info_cache_lock); 
     1218      if(noit_hash_retrieve(&fqdn_to_info_cache, 
     1219                            (void *)&storagenode_id, sizeof(int), &vinfo)) 
     1220        info = vinfo; 
     1221      pthread_mutex_unlock(&fqdn_to_info_cache_lock); 
     1222    } 
     1223  } 
     1224 
     1225  if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL); 
     1226  if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL); 
     1227  if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id; 
     1228  if(sid_out) *sid_out = uuidinfo->sid; 
     1229} 
     1230static void 
     1231stratcon_datastore_journal(struct sockaddr *remote, 
     1232                           const char *remote_cn, const char *line) { 
     1233  interim_journal_t *ij = NULL; 
     1234  char uuid_str[UUID_STR_LEN+1], *cp, *fqdn, *dsn; 
     1235  int storagenode_id = 0; 
     1236  uuid_t checkid; 
     1237  if(!line) return; 
     1238  /* if it is a UUID based thing, find the storage node */ 
     1239  switch(*line) { 
     1240    case 'C': 
     1241    case 'S': 
     1242    case 'M': 
     1243      if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) { 
     1244        strlcpy(uuid_str, cp + 1, sizeof(uuid_str)); 
     1245        if(!uuid_parse(uuid_str, checkid)) { 
     1246          storage_node_quick_lookup(uuid_str, remote_cn, NULL, 
     1247                                    &storagenode_id, &fqdn, &dsn); 
     1248          ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn); 
     1249        } 
     1250      } 
     1251      break; 
     1252    case 'n': 
     1253      ij = interim_journal_get(remote,remote_cn,0,NULL); 
     1254      break; 
     1255    default: 
     1256      break; 
     1257  } 
     1258  if(!ij && fqdn) { 
     1259    noitL(ingest_err, "%d\t%s\n", storagenode_id, line); 
     1260  } 
     1261  else { 
     1262    int len; 
     1263    len = write(ij->fd, line, strlen(line)); 
     1264    if(len < 0) { 
     1265      noitL(noit_error, "write to %s failed: %s\n", 
     1266            ij->filename, strerror(errno)); 
     1267    } 
     1268  } 
     1269  return; 
     1270} 
     1271static noit_hash_table * 
     1272stratcon_datastore_journal_remove(struct sockaddr *remote, 
     1273                                  const char *remote_cn) { 
     1274  void *vhash = NULL; 
     1275  if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) { 
     1276    /* pluck it out */ 
     1277    noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL); 
     1278  } 
     1279  else { 
     1280    noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n", 
     1281          remote_cn); 
     1282    abort(); 
     1283  } 
     1284  return vhash; 
    7101285} 
    7111286void 
    7121287stratcon_datastore_push(stratcon_datastore_op_t op, 
    713                         struct sockaddr *remote, void *operand) { 
     1288                        struct sockaddr *remote, 
     1289                        const char *remote_cn, void *operand, 
     1290                        eventer_t completion) { 
    7141291  conn_q *cq; 
     1292  syncset_t *syncset; 
    7151293  eventer_t e; 
    716   ds_job_detail *dsjd
     1294  ds_rt_detail *rtdetail
    7171295  struct datastore_onlooker_list *nnode; 
    7181296 
    7191297  for(nnode = onlookers; nnode; nnode = nnode->next) 
    720     nnode->dispatch(op,remote,operand); 
    721  
    722   cq = __get_conn_q_for_remote(remote); 
    723   dsjd = calloc(1, sizeof(*dsjd)); 
     1298    nnode->dispatch(op,remote,remote_cn,operand); 
     1299 
    7241300  switch(op) { 
    725     case DS_OP_FIND: 
    726       dsjd->rt = operand; 
    727       __append(cq, dsjd); 
     1301    case DS_OP_INSERT: 
     1302      stratcon_datastore_journal(remote, remote_cn, (const char *)operand); 
    7281303      break; 
    729     case DS_OP_INSERT: 
    730       dsjd->data = operand; 
    731       __append(cq, dsjd); 
     1304    case DS_OP_CHKPT: 
     1305      e = eventer_alloc(); 
     1306      syncset = calloc(1, sizeof(*syncset)); 
     1307      e->mask = EVENTER_ASYNCH; 
     1308      e->callback = stratcon_datastore_journal_sync; 
     1309      syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn); 
     1310      syncset->completion = completion; 
     1311      e->closure = syncset; 
     1312      eventer_add(e); 
    7321313      break; 
    7331314    case DS_OP_FIND_COMPLETE: 
    734     case DS_OP_CHKPT: 
    735       dsjd->completion_event = operand; 
    736       __append(cq,dsjd); 
     1315      cq = get_conn_q_for_metanode(); 
     1316      rtdetail = calloc(1, sizeof(*rtdetail)); 
     1317      rtdetail->rt = operand; 
     1318      rtdetail->completion_event = completion; 
    7371319      e = eventer_alloc(); 
    7381320      e->mask = EVENTER_ASYNCH; 
    739       if(op == DS_OP_FIND_COMPLETE) 
    740         e->callback = stratcon_datastore_asynch_lookup; 
    741       else if(op == DS_OP_CHKPT) 
    742         e->callback = stratcon_datastore_asynch_execute; 
    743       e->closure = cq; 
    744       eventer_add_asynch(cq->jobq, e); 
     1321      e->callback = stratcon_datastore_asynch_lookup; 
     1322      e->closure = rtdetail; 
     1323      eventer_add_asynch(cq->pool->jobq, e); 
    7451324      break; 
    7461325  } 
     
    7501329stratcon_datastore_saveconfig(void *unused) { 
    7511330  int rv = -1; 
    752   conn_q _cq = { 0 }, *cq = &_cq; 
    7531331  char *buff; 
    7541332  ds_single_detail _d = { 0 }, *d = &_d; 
     1333  conn_q *cq; 
     1334  cq = get_conn_q_for_metanode(); 
    7551335 
    7561336  if(stratcon_database_connect(cq) == 0) { 
     
    7751355      free_params(d); 
    7761356  } 
    777   if(cq->dbh) PQfinish(cq->dbh); 
     1357  release_conn_q(cq); 
    7781358  return rv; 
    7791359} 
     
    7811361void 
    7821362stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t, 
    783                                                struct sockaddr *, void *)) { 
     1363                                               struct sockaddr *, 
     1364                                               const char *, void *)) { 
    7841365  struct datastore_onlooker_list *nnode; 
    7851366  nnode = calloc(1, sizeof(*nnode)); 
     
    7891370    nnode->next = onlookers; 
    7901371} 
     1372static void 
     1373stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn, 
     1374                                         char *id_str, char *file) { 
     1375  char path[PATH_MAX]; 
     1376  interim_journal_t *ij; 
     1377  eventer_t ingest; 
     1378 
     1379  snprintf(path, sizeof(path), "%s/%s/%s/%s/%s", 
     1380           basejpath, remote_str, remote_cn, id_str, file); 
     1381  ij = calloc(1, sizeof(*ij)); 
     1382  ij->fd = open(path, O_RDONLY); 
     1383  if(ij->fd < 0) { 
     1384    noitL(noit_error, "cannot open journal '%s': %s\n", 
     1385          path, strerror(errno)); 
     1386    free(ij); 
     1387    return; 
     1388  } 
     1389  ij->filename = strdup(path); 
     1390  ij->remote_str = strdup(remote_str); 
     1391  ij->remote_cn = strdup(remote_cn); 
     1392  ij->storagenode_id = atoi(id_str); 
     1393  ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 
     1394                                       ij->fqdn); 
     1395 
     1396  noitL(noit_error, "ingesting old payload: %s\n", ij->filename); 
     1397  ingest = eventer_alloc(); 
     1398  ingest->mask = EVENTER_ASYNCH; 
     1399  ingest->callback = stratcon_datastore_asynch_execute; 
     1400  ingest->closure = ij; 
     1401  eventer_add_asynch(ij->cpool->jobq, ingest); 
     1402} 
     1403static void 
     1404stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) { 
     1405  char path[PATH_MAX]; 
     1406  DIR *root; 
     1407  struct dirent de, *entry; 
     1408  int i = 0, cnt = 0; 
     1409  char **entries; 
     1410 
     1411  snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 
     1412           first ? "/" : "", first ? first : "", 
     1413           second ? "/" : "", second ? second : "", 
     1414           third ? "/" : "", third ? third : ""); 
     1415  root = opendir(path); 
     1416  if(!root) return; 
     1417  while(readdir_r(root, &de, &entry) == 0 && entry != NULL) cnt++; 
     1418  rewinddir(root); 
     1419  entries = malloc(sizeof(*entries) * cnt); 
     1420  while(readdir_r(root, &de, &entry) == 0 && entry != NULL) { 
     1421    if(i < cnt) { 
     1422      entries[i++] = strdup(entry->d_name); 
     1423    } 
     1424  } 
     1425  closedir(root); 
     1426  cnt = i; /* could have changed, directories are fickle */ 
     1427  qsort(entries, i, sizeof(*entries), 
     1428        (int (*)(const void *, const void *))strcasecmp); 
     1429  for(i=0; i<cnt; i++) { 
     1430    if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 
     1431    if(!first) 
     1432      stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL); 
     1433    else if(!second) 
     1434      stratcon_datastore_sweep_journals_int(first, entries[i], NULL); 
     1435    else if(!third) 
     1436      stratcon_datastore_sweep_journals_int(first, second, entries[i]); 
     1437    else if(strlen(entries[i]) == 16) 
     1438      stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]); 
     1439  } 
     1440} 
     1441static void 
     1442stratcon_datastore_sweep_journals() { 
     1443  stratcon_datastore_sweep_journals_int(NULL,NULL,NULL); 
     1444} 
     1445 
     1446int 
     1447stratcon_datastore_ingest_all_storagenode_info() { 
     1448  int i, cnt; 
     1449  ds_single_detail _d = { 0 }, *d = &_d; 
     1450  conn_q *cq; 
     1451  cq = get_conn_q_for_metanode(); 
     1452 
     1453  while(stratcon_database_connect(cq)) { 
     1454    noitL(noit_error, "Error connecting to database\n"); 
     1455    sleep(1); 
     1456  } 
     1457 
     1458  GET_QUERY(all_storage); 
     1459  PG_EXEC(all_storage); 
     1460  cnt = PQntuples(d->res); 
     1461  for(i=0; i<cnt; i++) { 
     1462    void *vinfo; 
     1463    char *tmpint, *fqdn, *dsn; 
     1464    int storagenode_id; 
     1465    PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
     1466    storagenode_id = atoi(tmpint); 
     1467    PG_GET_STR_COL(fqdn, i, "fqdn"); 
     1468    PG_GET_STR_COL(dsn, i, "dsn"); 
     1469    PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
     1470    storagenode_id = tmpint ? atoi(tmpint) : 0; 
     1471 
     1472    if(!noit_hash_retrieve(&fqdn_to_info_cache, 
     1473                           (void *)&storagenode_id, sizeof(int), &vinfo)) { 
     1474      storagenode_info *info; 
     1475      info = calloc(1, sizeof(*info)); 
     1476      info->storagenode_id = storagenode_id; 
     1477      info->fqdn = fqdn ? strdup(fqdn) : NULL; 
     1478      info->dsn = dsn ? strdup(dsn) : NULL; 
     1479      noit_hash_store(&fqdn_to_info_cache, 
     1480                      (void *)&info->storagenode_id, sizeof(int), info); 
     1481    } 
     1482  } 
     1483  PQclear(d->res); 
     1484 bad_row: 
     1485  free_params(d); 
     1486 
     1487  release_conn_q(cq); 
     1488  noitL(noit_error, "Loaded %d storage nodes\n", cnt); 
     1489  return cnt; 
     1490} 
     1491int 
     1492stratcon_datastore_ingest_all_check_info() { 
     1493  int i, cnt, loaded = 0; 
     1494  ds_single_detail _d = { 0 }, *d = &_d; 
     1495  conn_q *cq; 
     1496  cq = get_conn_q_for_metanode(); 
     1497 
     1498  while(stratcon_database_connect(cq)) { 
     1499    noitL(noit_error, "Error connecting to database\n"); 
     1500    sleep(1); 
     1501  } 
     1502 
     1503  GET_QUERY(check_mapall); 
     1504  PG_EXEC(check_mapall); 
     1505  cnt = PQntuples(d->res); 
     1506  for(i=0; i<cnt; i++) { 
     1507    void *vinfo; 
     1508    char *tmpint, *fqdn, *dsn, *uuid_str; 
     1509    int sid, storagenode_id; 
     1510    uuid_info *uuidinfo; 
     1511    PG_GET_STR_COL(uuid_str, i, "id"); 
     1512    if(!uuid_str) continue; 
     1513    PG_GET_STR_COL(tmpint, i, "sid"); 
     1514    if(!tmpint) continue; 
     1515    sid = atoi(tmpint); 
     1516    PG_GET_STR_COL(fqdn, i, "fqdn"); 
     1517    PG_GET_STR_COL(dsn, i, "dsn"); 
     1518    PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
     1519    storagenode_id = tmpint ? atoi(tmpint) : 0; 
     1520 
     1521    uuidinfo = calloc(1, sizeof(*uuidinfo)); 
     1522    uuidinfo->uuid_str = strdup(uuid_str); 
     1523    uuidinfo->sid = sid; 
     1524    noit_hash_store(&uuid_to_info_cache, 
     1525                    uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 
     1526    loaded++; 
     1527    if(!noit_hash_retrieve(&fqdn_to_info_cache, 
     1528                           (void *)&storagenode_id, sizeof(int), &vinfo)) { 
     1529      storagenode_info *info; 
     1530      info = calloc(1, sizeof(*info)); 
     1531      info->storagenode_id = storagenode_id; 
     1532      info->fqdn = fqdn ? strdup(fqdn) : NULL; 
     1533      info->dsn = dsn ? strdup(dsn) : NULL; 
     1534      noit_hash_store(&fqdn_to_info_cache, 
     1535                      (void *)&info->storagenode_id, sizeof(int), info); 
     1536    } 
     1537  } 
     1538  PQclear(d->res); 
     1539 bad_row: 
     1540  free_params(d); 
     1541 
     1542  release_conn_q(cq); 
     1543  noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded); 
     1544  return loaded; 
     1545} 
     1546void 
     1547stratcon_datastore_init() { 
     1548  pthread_mutex_init(&ds_conns_lock, NULL); 
     1549  pthread_mutex_init(&fqdn_to_info_cache_lock, NULL); 
     1550  ds_err = noit_log_stream_find("error/datastore"); 
     1551  ingest_err = noit_log_stream_find("error/ingest"); 
     1552  if(!ds_err) ds_err = noit_error; 
     1553  if(!ingest_err) ingest_err = noit_error; 
     1554  if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path", 
     1555                           &basejpath)) { 
     1556    noitL(noit_error, "/stratcon/database/journal/path is unspecified\n"); 
     1557    exit(-1); 
     1558  } 
     1559  stratcon_datastore_ingest_all_check_info(); 
     1560  stratcon_datastore_ingest_all_storagenode_info(); 
     1561  stratcon_datastore_sweep_journals(); 
     1562} 
  • src/stratcon_datastore.h

    r88a7178 r5360a1e  
    4444 DS_OP_INSERT = 1, 
    4545 DS_OP_CHKPT = 2, 
    46  DS_OP_FIND = 3, 
    47  DS_OP_FIND_COMPLETE = 4 
     46 DS_OP_FIND_COMPLETE = 3 
    4847} stratcon_datastore_op_t; 
    4948 
    5049API_EXPORT(void) 
    5150  stratcon_datastore_push(stratcon_datastore_op_t, 
    52                           struct sockaddr *, void *); 
     51                          struct sockaddr *, const char *, void *, eventer_t); 
    5352 
    5453API_EXPORT(void) 
    5554  stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t, 
    56                                                  struct sockaddr *, void *)); 
     55                                                 struct sockaddr *, 
     56                                                 const char *, void *)); 
     57 
     58API_EXPORT(void) 
     59  stratcon_datastore_init(); 
    5760 
    5861API_EXPORT(int) 
  • src/stratcon_iep.c

    r0335d9d r5360a1e  
    142142  } 
    143143  noitL(noit_error, "submitting statement: %s\n", line); 
    144   stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 
     144  stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL); 
    145145  stmt->marked = 1; 
    146146} 
     
    163163  /* Phase 1: sweep in all the statements */ 
    164164  for(i=0; i<cnt; i++) { 
    165     char id[UUID_STR_LEN]; 
     165    char id[UUID_STR_LEN+1]; 
    166166    char provides[256]; 
    167167    char *statement; 
     
    202202  /* Phase 2: load the requires graph */ 
    203203  for(i=0; i<cnt; i++) { 
    204     char id[UUID_STR_LEN]; 
     204    char id[UUID_STR_LEN+1]; 
    205205    int rcnt, j; 
    206206    char *requires; 
     
    280280  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 
    281281  for(i=0; i<cnt; i++) { 
    282     char id[UUID_STR_LEN]; 
     282    char id[UUID_STR_LEN+1]; 
    283283    char topic[256]; 
    284284    char *query; 
     
    314314      query++; 
    315315    } 
    316     stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 
     316    stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL); 
    317317  } 
    318318  free(query_configs); 
     
    419419setup_iep_connection_callback(eventer_t e, int mask, void *closure, 
    420420                              struct timeval *now) { 
    421   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL); 
     421  stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, NULL, NULL); 
    422422  return 0; 
    423423} 
     
    526526void 
    527527stratcon_iep_line_processor(stratcon_datastore_op_t op, 
    528                             struct sockaddr *remote, void *operand) { 
     528                            struct sockaddr *remote, const char *remote_cn, 
     529                            void *operand, eventer_t completion) { 
    529530  int len; 
    530531  char remote_str[128]; 
  • src/stratcon_iep.h

    r8ad126b r5360a1e  
    4848API_EXPORT(void) 
    4949  stratcon_iep_line_processor(stratcon_datastore_op_t op, 
    50                               struct sockaddr *remote, void *operand); 
     50                              struct sockaddr *remote, const char *remote_cn, 
     51                              void *operand, eventer_t completion); 
    5152 
    5253API_EXPORT(jlog_streamer_ctx_t *) 
  • src/stratcon_jlog_streamer.c

    ra060696 r5360a1e  
    371371        FULLREAD(e, ctx, (unsigned long)ctx->header.message_len); 
    372372        if(ctx->header.message_len > 0) 
    373           ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 
     373          ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn, 
     374                    ctx->buffer, NULL); 
    374375        else if(ctx->buffer) 
    375376          free(ctx->buffer); 
     
    385386          completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION; 
    386387          ctx->state = JLOG_STREAMER_IS_ASYNC; 
    387           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 
     388          ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn, 
     389                    NULL, completion_e); 
    388390          noitL(noit_debug, "Pushing batch asynch...\n"); 
    389391          return 0; 
  • src/stratcon_jlog_streamer.h

    rbe29b6f r5360a1e  
    9191  u_int64_t total_bytes_read; 
    9292 
    93   void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *); 
     93  void (*push)(stratcon_datastore_op_t, struct sockaddr *, const char *, void *, eventer_t); 
    9494} jlog_streamer_ctx_t; 
    9595 
  • src/stratcon_realtime_http.c

    r3c56016 r5360a1e  
    427427    for(node = rc->checklist; node; node = node->next) { 
    428428      noit_atomic_inc32(&ctx->ref_cnt); 
    429       stratcon_datastore_push(DS_OP_FIND, NULL, node); 
    430429      noitL(noit_error, "Resolving sid: %d\n", node->sid); 
    431430    } 
     
    435434    completion->closure = ctx; 
    436435    gettimeofday(&completion->whence, NULL); 
    437     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion); 
     436    stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL, 
     437                            rc->checklist, completion); 
    438438  } 
    439439  return EVENTER_EXCEPTION; 
  • src/stratcond.c

    r8ad126b r5360a1e  
    171171  noit_watchdog_child_eventer_heartbeat(); 
    172172 
     173  stratcon_datastore_init(); 
    173174  noit_console_init(APPNAME); 
    174175  noit_console_conf_init();