Changeset 06c1b7054cfec11099d48bb3a27833d5e8ab1974

Show
Ignore:
Timestamp:
10/23/09 22:13:47 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1256336027 +0000
git-parent:

[ecb7471db3eae36779832e1ef85f3365d57b9d63], [3b7392c035db2b897f4df6be372b3d1a6f14d62c]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1256336027 +0000
Message:

merge skeksis back into trunk... we're sharded

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • sql/Makefile.in

    r0a06b2c rfe61445  
    2626MODULES_DIR=@MODULES_DIR@ 
    2727 
    28 all: 
     28all:   reconnoiter_ddl_dump.sql 
    2929 
    30 install: 
     30reconnoiter_ddl_dump.sql: 
     31        ./build_ddl.pl scaffolding.sql > reconnoiter_ddl_dump.sql 
     32 
     33install:        reconnoiter_ddl_dump.sql 
    3134        $(top_srcdir)/buildtools/mkinstalldirs $(DESTDIR)$(datarootdir)/reconnoiter 
    3235        $(INSTALL) -m 0644 reconnoiter_ddl_dump.sql $(DESTDIR)$(datarootdir)/reconnoiter/schema.sql 
  • sql/README

    r6adb5a7 rfe61445  
    1 == Snap the schema off a live machine == 
     1== Management == 
    22 
    3 pg_dump -s -n stratcon -n prism -U postgres reconnoiter  > reconnoiter_ddl_dump.sql 
     3Tables are in tables/, stored procedures/functions are in sprocs/ 
     4 
     5The scaffolding.sql file is used to glue them all together in the right order? 
     6Convoluted? perhaps.  It allows us to manipulate small sections of the schema 
     7as code, stored in version control, and reconstitute the database easily for 
     8testing. 
  • src/stratcon.conf.in

    r8504a3b rf742d08  
    99      <log name="error/iep"/> 
    1010      <log name="error/eventer" disabled="true"/> 
     11      <log name="error/datastore" disabled="true"/> 
    1112      <log name="debug/eventer" disabled="true"/> 
    1213    </console_output> 
     
    3132  </noits> 
    3233 
    33   <iep disabled="false"> <!-- false the default --> 
     34  <iep disabled="true"> <!-- false the default --> 
    3435    <start directory="%iepdbdir%" 
    3536           command="%iepbindir%/run-iep.sh" /> 
     
    6263 
    6364  <database> 
     65    <journal> 
     66      <path>/var/log/stratcon.persist</path> 
     67    </journal> 
    6468    <dbconfig> 
    6569      <host>localhost</host> 
     
    6973    </dbconfig> 
    7074    <statements> 
     75      <!-- These are optional and used for stuff like setting search paths --> 
     76      <!-- 
     77      <metanodepostconnect><![CDATA[ 
     78        SELECT do_some_magic(); 
     79      ]]></metanodepostconnect> 
     80      <storagepostconnect><![CDATA[ 
     81        SELECT do_some_magic($1,$2); 
     82      ]]></storagepostconnect> 
     83      --> 
    7184      <allchecks><![CDATA[ 
    7285        SELECT remote_address, id, target, module, name 
    73           FROM stratcon.mv_loading_dock_check_s 
     86          FROM noit.get_checks() 
    7487      ]]></allchecks> 
    7588      <findcheck><![CDATA[ 
    76         SELECT remote_address, id 
    77           FROM stratcon.mv_loading_dock_check_s 
    78          WHERE sid = $1 
     89        SELECT remote_address, id, target, module, name 
     90          FROM noit.get_check($1,$2) 
    7991      ]]></findcheck> 
     92      <allstoragenodes><![CDATA[ 
     93        SELECT storage_node_id, fqdn, dsn 
     94          FROM stratcon.storage_node 
     95      ]]></allstoragenodes> 
     96      <findstoragenode><![CDATA[ 
     97        SELECT fqdn, dsn 
     98          FROM stratcon.storage_node 
     99         WHERE storage_node_id = $1 
     100      ]]></findstoragenode> 
     101      <mapallchecks><![CDATA[ 
     102        SELECT id, sid, storage_node_id, fqdn, dsn 
     103          FROM stratcon.map_uuid_to_sid LEFT JOIN stratcon.storage_node USING (storage_node_id) 
     104      ]]></mapallchecks> 
     105      <mapchecktostoragenode><![CDATA[ 
     106        SELECT o_storage_node_id as storage_node_id, o_sid as sid, 
     107               o_fqdn as fqdn, o_dsn as dsn 
     108          FROM stratcon.map_uuid_to_sid($1,$2) 
     109      ]]></mapchecktostoragenode> 
    80110      <check><![CDATA[ 
    81         INSERT INTO stratcon.loading_dock_check_s 
     111        INSERT INTO check_archive 
    82112                    (remote_address, whence, sid, id, target, module, name) 
    83113             VALUES ($1, 'epoch'::timestamptz + ($2 || ' seconds')::interval, 
    84                      stratcon.generate_sid_from_id($3), $3, $4, $5, $6
     114                     $3, $4, $5, $6, $7
    85115      ]]></check> 
    86116      <status><![CDATA[ 
    87         INSERT INTO stratcon.loading_dock_status_archive_%Y%m 
    88                     ( whence,sid, state, availability, 
    89                      duration, status) 
     117        INSERT INTO check_status_archive 
     118                    (whence, sid, state, availability, duration, status) 
    90119             VALUES ('epoch'::timestamptz + ($1 || ' seconds')::interval, 
    91                      stratcon.generate_sid_from_id($2), $3, $4, $5, $6) 
     120                     $2, $3, $4, $5, $6) 
    92121      ]]></status> 
    93122      <metric_numeric><![CDATA[ 
    94         INSERT INTO stratcon.loading_dock_metric_numeric_archive_%Y%m 
     123        INSERT INTO metric_numeric_archive 
    95124                    (whence, sid, name, value) 
    96              VALUES ( 'epoch'::timestamptz + ($1 || ' seconds')::interval, 
    97                      stratcon.generate_sid_from_id($2), $3, $4) 
     125             VALUES ('epoch'::timestamptz + ($1 || ' seconds')::interval, 
     126                     $2, $3, $4) 
    98127      ]]></metric_numeric> 
    99128      <metric_text><![CDATA[ 
    100         INSERT INTO stratcon.loading_dock_metric_text_archive_%Y%m 
     129        INSERT INTO metric_text_archive 
    101130                    ( whence, sid, name,value) 
    102131             VALUES ('epoch'::timestamptz + ($1 || ' seconds')::interval, 
    103                      stratcon.generate_sid_from_id($2), $3, $4) 
     132                     $2, $3, $4) 
    104133      ]]></metric_text> 
    105134      <config><![CDATA[ 
  • src/stratcon_datastore.c

    r4201a42 rcdb26b4  
    11/* 
    2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc. 
     2 * Copyright (c) 2007-2009, OmniTI Computer Consulting, Inc. 
    33 * All rights reserved. 
    44 * 
     
    3535#include "utils/noit_log.h" 
    3636#include "utils/noit_b64.h" 
     37#include "utils/noit_str.h" 
     38#include "utils/noit_mkdir.h" 
    3739#include "stratcon_datastore.h" 
    3840#include "stratcon_realtime_http.h" 
     
    4143#include "noit_check.h" 
    4244#include <unistd.h> 
     45#include <fcntl.h> 
    4346#include <netinet/in.h> 
    4447#include <sys/un.h> 
     48#include <dirent.h> 
    4549#include <arpa/inet.h> 
     50#include <sys/mman.h> 
    4651#include <libpq-fe.h> 
    4752#include <zlib.h> 
    4853#include <assert.h> 
    49  
    50 static char *check_loadall = NULL; 
    51 static const char *check_loadall_conf = "/stratcon/database/statements/allchecks"; 
    52 static char *check_find = NULL; 
    53 static const char *check_find_conf = "/stratcon/database/statements/findcheck"; 
    54 static char *check_insert = NULL; 
    55 static const char *check_insert_conf = "/stratcon/database/statements/check"; 
    56 static char *status_insert = NULL; 
    57 static const char *status_insert_conf = "/stratcon/database/statements/status"; 
    58 static char *metric_insert_numeric = NULL; 
    59 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric"; 
    60 static char *metric_insert_text = NULL; 
    61 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text"; 
    62 static char *config_insert = NULL; 
    63 static const char *config_insert_conf = "/stratcon/database/statements/config"; 
     54#include <errno.h> 
     55 
     56#define DECL_STMT(codename,confname) \ 
     57static char *codename = NULL; \ 
     58static const char *codename##_conf = "/stratcon/database/statements/" #confname 
     59 
     60DECL_STMT(storage_post_connect, storagepostconnect); 
     61DECL_STMT(metanode_post_connect, metanodepostconnect); 
     62DECL_STMT(find_storage, findstoragenode); 
     63DECL_STMT(all_storage, allstoragenodes); 
     64DECL_STMT(check_map, mapchecktostoragenode); 
     65DECL_STMT(check_mapall, mapallchecks); 
     66DECL_STMT(check_loadall, allchecks); 
     67DECL_STMT(check_find, findcheck); 
     68DECL_STMT(check_insert, check); 
     69DECL_STMT(status_insert, status); 
     70DECL_STMT(metric_insert_numeric, metric_numeric); 
     71DECL_STMT(metric_insert_text, metric_text); 
     72DECL_STMT(config_insert, config); 
     73 
     74static noit_log_stream_t ds_err = NULL; 
     75static noit_log_stream_t ingest_err = NULL; 
    6476 
    6577static struct datastore_onlooker_list { 
    66   void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, void *); 
     78  void (*dispatch)(stratcon_datastore_op_t, struct sockaddr *, 
     79                   const char *, void *); 
    6780  struct datastore_onlooker_list *next; 
    6881} *onlookers = NULL; 
     
    7386      goto bad_row; \ 
    7487} while(0) 
     88 
     89struct conn_q; 
     90 
     91typedef struct { 
     92  char            *queue_name; /* the key fqdn+remote_sn */ 
     93  eventer_jobq_t  *jobq; 
     94  struct conn_q   *head; 
     95  pthread_mutex_t  lock; 
     96  pthread_cond_t   cv; 
     97  int              ttl; 
     98  int              in_pool; 
     99  int              outstanding; 
     100  int              max_allocated; 
     101  int              max_in_pool; 
     102} conn_pool; 
     103typedef struct conn_q { 
     104  time_t           last_use; 
     105  char            *dsn;        /* Pg connect string */ 
     106  char            *remote_str; /* the IP of the noit*/ 
     107  char            *remote_cn;  /* the Cert CN of the noit */ 
     108  char            *fqdn;       /* the fqdn of the storage node */ 
     109  conn_pool       *pool; 
     110  struct conn_q   *next; 
     111  /* Postgres specific stuff */ 
     112  PGconn          *dbh; 
     113} conn_q; 
     114 
    75115 
    76116#define MAX_PARAMS 8 
     
    89129  POSTGRES_PARTS 
    90130} ds_single_detail; 
    91 typedef struct ds_job_detail
     131typedef struct
    92132  /* Postgres specific stuff */ 
    93133  POSTGRES_PARTS 
    94  
    95   char *data;  /* The raw string, NULL means the stream is done -- commit. */ 
    96134  struct realtime_tracker *rt; 
    97  
     135  conn_q *cq; /* connection on which to perform this job */ 
     136  eventer_t completion_event; /* This event should be registered if non NULL */ 
     137} ds_rt_detail; 
     138typedef struct ds_line_detail { 
     139  /* Postgres specific stuff */ 
     140  POSTGRES_PARTS 
     141  char *data; 
    98142  int problematic; 
    99   eventer_t completion_event; /* This event should be registered if non NULL */ 
    100   struct ds_job_detail *next; 
    101 } ds_job_detail; 
     143  struct ds_line_detail *next; 
     144} ds_line_detail; 
    102145 
    103146typedef struct { 
    104   struct sockaddr *remote; 
    105   eventer_jobq_t  *jobq; 
    106   /* Postgres specific stuff */ 
    107   PGconn          *dbh; 
    108   ds_job_detail   *head; 
    109   ds_job_detail   *tail; 
    110 } conn_q; 
     147  noit_hash_table *ws; 
     148  eventer_t completion; 
     149} syncset_t; 
     150 
     151typedef struct { 
     152  char *remote_str; 
     153  char *remote_cn; 
     154  char *fqdn; 
     155  int storagenode_id; 
     156  int fd; 
     157  char *filename; 
     158  conn_pool *cpool; 
     159} interim_journal_t; 
    111160 
    112161static int stratcon_database_connect(conn_q *cq); 
     162static int uuid_to_sid(const char *uuid_str_in, const char *remote_cn); 
    113163 
    114164static void 
     
    120170} 
    121171 
    122 static void 
    123 __append(conn_q *q, ds_job_detail *d) { 
    124   d->next = NULL; 
    125   if(!q->head) q->head = q->tail = d; 
    126   else { 
    127     q->tail->next = d; 
    128     q->tail = d; 
    129   } 
    130 
    131 static void 
    132 __remove_until(conn_q *q, ds_job_detail *d) { 
    133   ds_job_detail *next; 
    134   while(q->head && q->head != d) { 
    135     next = q->head; 
    136     q->head = q->head->next; 
    137     free_params((ds_single_detail *)next); 
    138     if(next->data) free(next->data); 
    139     free(next); 
    140   } 
    141   if(!q->head) q->tail = NULL; 
    142 
    143  
     172char *basejpath = NULL; 
     173pthread_mutex_t ds_conns_lock; 
    144174noit_hash_table ds_conns; 
    145  
    146 conn_q * 
    147 __get_conn_q_for_remote(struct sockaddr *remote) { 
    148   void *vcq; 
    149   conn_q *cq; 
    150   char queue_name[128] = "datastore_"; 
    151   static const char __zeros[4] = { 0 }; 
    152   int len = 0; 
     175noit_hash_table working_sets; 
     176 
     177/* the fqdn cache needs to be thread safe */ 
     178typedef struct { 
     179  char *uuid_str; 
     180  int storagenode_id; 
     181  int sid; 
     182} uuid_info; 
     183typedef struct { 
     184  int storagenode_id; 
     185  char *fqdn; 
     186  char *dsn; 
     187} storagenode_info; 
     188noit_hash_table uuid_to_info_cache; 
     189pthread_mutex_t storagenode_to_info_cache_lock; 
     190noit_hash_table storagenode_to_info_cache; 
     191 
     192int 
     193convert_sockaddr_to_buff(char *buff, int blen, struct sockaddr *remote) { 
     194  char name[128] = ""; 
     195  buff[0] = '\0'; 
    153196  if(remote) { 
     197    int len = 0; 
    154198    switch(remote->sa_family) { 
    155199      case AF_INET: 
    156200        len = sizeof(struct sockaddr_in); 
    157201        inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr, 
    158                   queue_name + strlen("datastore_"), len); 
     202                  name, len); 
    159203        break; 
    160204      case AF_INET6: 
    161205       len = sizeof(struct sockaddr_in6); 
    162206        inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr, 
    163                   queue_name + strlen("datastore_"), len); 
     207                  name, len); 
    164208       break; 
    165209      case AF_UNIX: 
    166210        len = SUN_LEN(((struct sockaddr_un *)remote)); 
    167         snprintf(queue_name, sizeof(queue_name), "datastore_%s", ((struct sockaddr_un *)remote)->sun_path); 
     211        snprintf(name, sizeof(name), "%s", ((struct sockaddr_un *)remote)->sun_path); 
    168212        break; 
    169       default: return NULL; 
    170     } 
     213      default: return 0; 
     214    } 
     215  } 
     216  strlcpy(buff, name, blen); 
     217  return strlen(buff); 
     218
     219 
     220/* Thread-safe connection pools */ 
     221 
     222/* Forcefree -> 1 prevents it from going to the pool and it gets freed */ 
     223static void 
     224release_conn_q_forceable(conn_q *cq, int forcefree) { 
     225  int putback = 0; 
     226  cq->last_use = time(NULL); 
     227  pthread_mutex_lock(&cq->pool->lock); 
     228  cq->pool->outstanding--; 
     229  if(!forcefree && (cq->pool->in_pool < cq->pool->max_in_pool)) { 
     230    putback = 1; 
     231    cq->next = cq->pool->head; 
     232    cq->pool->head = cq; 
     233    cq->pool->in_pool++; 
     234  } 
     235  pthread_mutex_unlock(&cq->pool->lock); 
     236  noitL(noit_debug, "[%p] release %s [%s]\n", (void *)pthread_self(), 
     237        putback ? "to pool" : "and destroy", cq->pool->queue_name); 
     238  pthread_cond_signal(&cq->pool->cv); 
     239  if(putback) return; 
     240 
     241  /* Not put back, release it */ 
     242  if(cq->dbh) PQfinish(cq->dbh); 
     243  if(cq->remote_str) free(cq->remote_str); 
     244  if(cq->remote_cn) free(cq->remote_cn); 
     245  if(cq->fqdn) free(cq->fqdn); 
     246  if(cq->dsn) free(cq->dsn); 
     247  free(cq); 
     248
     249static void 
     250ttl_purge_conn_pool(conn_pool *pool) { 
     251  int old_cnt, new_cnt; 
     252  time_t now = time(NULL); 
     253  conn_q *cq, *prev = NULL, *iter; 
     254  /* because we always replace on the head and update the last_use time when 
     255     doing so, we know they are ordered LRU on the end.  So, once we hit an 
     256     old one, we know all the others are old too. 
     257   */ 
     258  if(!pool->head) return; /* hack short circuit for no locks */ 
     259  pthread_mutex_lock(&pool->lock); 
     260  old_cnt = pool->in_pool; 
     261  cq = pool->head; 
     262  while(cq) { 
     263    if(cq->last_use + cq->pool->ttl < now) { 
     264      if(prev) prev->next = NULL; 
     265      else pool->head = NULL; 
     266      break; 
     267    } 
     268    prev = cq; 
     269    cq = cq->next; 
     270  } 
     271  /* Now pool->head is a chain of unexpired and cq is a chain of expired */ 
     272  /* Fix accounting */ 
     273  for(iter=cq; iter; iter=iter->next) pool->in_pool--; 
     274  new_cnt = pool->in_pool; 
     275  pthread_mutex_unlock(&pool->lock); 
     276 
     277  /* Force release these without holding the lock */ 
     278  while(cq) { 
     279    prev = cq; 
     280    cq = cq->next; 
     281    release_conn_q_forceable(cq, 1); 
     282  } 
     283  if(old_cnt != new_cnt) 
     284    noitL(noit_debug, "reduced db pool %d -> %d [%s]\n", old_cnt, new_cnt, 
     285          pool->queue_name); 
     286
     287static void 
     288release_conn_q(conn_q *cq) { 
     289  ttl_purge_conn_pool(cq->pool); 
     290  release_conn_q_forceable(cq, 0); 
     291
     292static conn_pool * 
     293get_conn_pool_for_remote(const char *remote_str, 
     294                         const char *remote_cn, const char *fqdn) { 
     295  void *vcpool; 
     296  conn_pool *cpool = NULL; 
     297  char queue_name[256] = "datastore_"; 
     298  snprintf(queue_name, sizeof(queue_name), "datastore_%s_%s_%s", 
     299           (remote_str && *remote_str) ? remote_str : "0.0.0.0", 
     300           fqdn ? fqdn : "default", 
     301           remote_cn ? remote_cn : "default"); 
     302  pthread_mutex_lock(&ds_conns_lock); 
     303  if(noit_hash_retrieve(&ds_conns, (const char *)queue_name, 
     304                        strlen(queue_name), &vcpool)) 
     305    cpool = vcpool; 
     306  pthread_mutex_unlock(&ds_conns_lock); 
     307  if(!cpool) { 
     308    vcpool = cpool = calloc(1, sizeof(*cpool)); 
     309    cpool->queue_name = strdup(queue_name); 
     310    pthread_mutex_init(&cpool->lock, NULL); 
     311    pthread_cond_init(&cpool->cv, NULL); 
     312    cpool->in_pool = 0; 
     313    cpool->outstanding = 0; 
     314    cpool->max_in_pool = 1; 
     315    cpool->max_allocated = 1; 
     316    pthread_mutex_lock(&ds_conns_lock); 
     317    if(!noit_hash_store(&ds_conns, cpool->queue_name, strlen(cpool->queue_name), 
     318                        cpool)) { 
     319      noit_hash_retrieve(&ds_conns, (const char *)queue_name, 
     320                         strlen(queue_name), &vcpool); 
     321    } 
     322    pthread_mutex_unlock(&ds_conns_lock); 
     323    if(vcpool != cpool) { 
     324      /* someone beat us to it */ 
     325      free(cpool->queue_name); 
     326      pthread_mutex_destroy(&cpool->lock); 
     327      pthread_cond_destroy(&cpool->cv); 
     328      free(cpool); 
     329    } 
     330    else { 
     331      int i; 
     332      /* Our job to setup the pool */ 
     333      cpool->jobq = calloc(1, sizeof(*cpool->jobq)); 
     334      eventer_jobq_init(cpool->jobq, queue_name); 
     335      cpool->jobq->backq = eventer_default_backq(); 
     336      /* Add one thread */ 
     337      for(i=0; i<MAX(cpool->max_allocated - cpool->max_in_pool, 1); i++) 
     338        eventer_jobq_increase_concurrency(cpool->jobq); 
     339    } 
     340    cpool = vcpool; 
     341  } 
     342  return cpool; 
     343
     344static conn_q * 
     345get_conn_q_for_remote(const char *remote_str, 
     346                      const char *remote_cn, const char *fqdn, 
     347                      const char *dsn) { 
     348  conn_pool *cpool; 
     349  conn_q *cq; 
     350  cpool = get_conn_pool_for_remote(remote_str, remote_cn, fqdn); 
     351  noitL(noit_debug, "[%p] requesting [%s]\n", (void *)pthread_self(), 
     352        cpool->queue_name); 
     353  pthread_mutex_lock(&cpool->lock); 
     354 again: 
     355  if(cpool->head) { 
     356    assert(cpool->in_pool > 0); 
     357    cq = cpool->head; 
     358    cpool->head = cq->next; 
     359    cpool->in_pool--; 
     360    cpool->outstanding++; 
     361    cq->next = NULL; 
     362    pthread_mutex_unlock(&cpool->lock); 
     363    return cq; 
     364  } 
     365  if(cpool->in_pool + cpool->outstanding >= cpool->max_allocated) { 
     366    noitL(noit_debug, "[%p] over-subscribed, waiting [%s]\n", 
     367          (void *)pthread_self(), cpool->queue_name); 
     368    pthread_cond_wait(&cpool->cv, &cpool->lock); 
     369    noitL(noit_debug, "[%p] waking up and trying again [%s]\n", 
     370          (void *)pthread_self(), cpool->queue_name); 
     371    goto again; 
    171372  } 
    172373  else { 
    173     /* This is a dummy connection */ 
    174     remote = (struct sockaddr *)__zeros; 
    175     snprintf(queue_name, sizeof(queue_name), "datastore_default"); 
    176     len = 4; 
    177   } 
    178   if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, &vcq)) 
    179     return vcq; 
     374    cpool->outstanding++; 
     375    pthread_mutex_unlock(&cpool->lock); 
     376  } 
     377  
    180378  cq = calloc(1, sizeof(*cq)); 
    181   cq->remote = malloc(len); 
    182   memcpy(cq->remote, remote, len); 
    183   cq->jobq = calloc(1, sizeof(*cq->jobq)); 
    184   eventer_jobq_init(cq->jobq, queue_name); 
    185   cq->jobq->backq = eventer_default_backq(); 
    186   /* Add one thread */ 
    187   eventer_jobq_increase_concurrency(cq->jobq); 
    188   noit_hash_store(&ds_conns, (const char *)cq->remote, len, cq); 
     379  cq->pool = cpool; 
     380  cq->remote_str = remote_str ? strdup(remote_str) : NULL; 
     381  cq->remote_cn = remote_cn ? strdup(remote_cn) : NULL; 
     382  cq->fqdn = fqdn ? strdup(fqdn) : NULL; 
     383  cq->dsn = dsn ? strdup(dsn) : NULL; 
    189384  return cq; 
     385} 
     386static conn_q * 
     387get_conn_q_for_metanode() { 
     388  return get_conn_q_for_remote(NULL,NULL,NULL,NULL); 
    190389} 
    191390 
     
    243442  if(d->rv != PGRES_COMMAND_OK && \ 
    244443     d->rv != PGRES_TUPLES_OK) { \ 
    245     noitL(noit_error, "stratcon datasource bad (%d): %s\n'%s'\n", \ 
     444    noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s'\n", \ 
    246445          d->rv, PQresultErrorMessage(d->res), cmd); \ 
    247446    PQclear(d->res); \ 
     
    262461  if(d->rv != PGRES_COMMAND_OK && \ 
    263462     d->rv != PGRES_TUPLES_OK) { \ 
    264     noitL(noit_error, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \ 
     463    noitL(ds_err, "stratcon datasource bad (%d): %s\n'%s' time: %llu\n", \ 
    265464          d->rv, PQresultErrorMessage(d->res), cmdbuf, \ 
    266465          (long long unsigned)whence); \ 
     
    273472stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure, 
    274473                                    struct timeval *now) { 
    275   conn_q *cq = closure; 
    276   ds_job_detail *d; 
     474  ds_single_detail *d; 
    277475  int i, row_count = 0, good = 0; 
    278476  char buff[1024]; 
     477  conn_q *cq; 
     478  cq = get_conn_q_for_metanode(); 
    279479 
    280480  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     
    315515 
    316516    /* stratcon_iep_line_processor takes an allocated operand and frees it */ 
    317     stratcon_iep_line_processor(DS_OP_INSERT, sin, strdup(buff)); 
     517    stratcon_iep_line_processor(DS_OP_INSERT, sin, NULL, strdup(buff), NULL); 
    318518    good++; 
    319519  } 
    320520  noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count); 
    321521 bad_row: 
    322   PQclear(d->res); 
     522  free_params((ds_single_detail *)d); 
    323523  free(d); 
     524  release_conn_q(cq); 
    324525  return 0; 
    325526} 
     
    327528stratcon_datastore_iep_check_preload() { 
    328529  eventer_t e; 
    329   conn_q *cq
    330   cq = __get_conn_q_for_remote(NULL); 
    331  
     530  conn_pool *cpool
     531 
     532  cpool = get_conn_pool_for_remote(NULL,NULL,NULL); 
    332533  e = eventer_alloc(); 
    333534  e->mask = EVENTER_ASYNCH; 
    334535  e->callback = stratcon_datastore_asynch_drive_iep; 
    335   e->closure = cq
    336   eventer_add_asynch(cq->jobq, e); 
     536  e->closure = NULL
     537  eventer_add_asynch(cpool->jobq, e); 
    337538} 
    338539execute_outcome_t 
    339 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) { 
     540stratcon_datastore_find(conn_q *cq, ds_rt_detail *d) { 
    340541  char *val; 
    341542  int row_count; 
    342  
    343   if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid); 
     543  struct realtime_tracker *node; 
     544 
    344545  GET_QUERY(check_find); 
    345   PG_EXEC(check_find); 
    346   row_count = PQntuples(d->res); 
    347   if(row_count != 1) goto bad_row; 
    348  
    349   /* Get the check uuid */ 
    350   PG_GET_STR_COL(val, 0, "id"); 
    351   if(!val) goto bad_row; 
    352   if(uuid_parse(val, d->rt->checkid)) goto bad_row; 
    353  
    354   /* Get the remote_address (which noit owns this) */ 
    355   PG_GET_STR_COL(val, 0, "remote_address"); 
    356   if(!val) goto bad_row; 
    357   d->rt->noit = strdup(val); 
    358  
    359   PQclear(d->res); 
     546  for(node = d->rt; node; node = node->next) { 
     547    DECLARE_PARAM_INT(node->sid); 
     548    PG_EXEC(check_find); 
     549    row_count = PQntuples(d->res); 
     550    if(row_count != 1) { 
     551      PQclear(d->res); 
     552      goto bad_row; 
     553    } 
     554 
     555    /* Get the check uuid */ 
     556    PG_GET_STR_COL(val, 0, "id"); 
     557    if(!val) { 
     558      PQclear(d->res); 
     559      goto bad_row; 
     560    } 
     561    if(uuid_parse(val, node->checkid)) { 
     562      PQclear(d->res); 
     563      goto bad_row; 
     564    } 
     565   
     566    /* Get the remote_address (which noit owns this) */ 
     567    PG_GET_STR_COL(val, 0, "remote_address"); 
     568    if(!val) { 
     569      PQclear(d->res); 
     570      goto bad_row; 
     571    } 
     572    node->noit = strdup(val); 
     573  
     574   bad_row:  
     575    free_params((ds_single_detail *)d); 
     576    d->nparams = 0; 
     577  } 
    360578  return DS_EXEC_SUCCESS; 
    361  bad_row: 
    362   return DS_EXEC_ROW_FAILED; 
    363579} 
    364580execute_outcome_t 
    365 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) { 
    366   int type, len; 
     581stratcon_datastore_execute(conn_q *cq, const char *r, const char *remote_cn, 
     582                           ds_line_detail *d) { 
     583  int type, len, sid; 
    367584  char *final_buff; 
    368   uLong final_len, actual_final_len;; 
     585  uLong final_len, actual_final_len; 
    369586  char *token; 
     587  char raddr_blank[1] = ""; 
     588  const char *raddr; 
    370589 
    371590  type = d->data[0]; 
     591  raddr = r ? r : raddr_blank; 
    372592 
    373593  /* Parse the log line, but only if we haven't already */ 
    374594  if(!d->nparams) { 
    375     char raddr[128]; 
    376595    char *scp, *ecp; 
    377596 
    378     /* setup our remote address */ 
    379     raddr[0] = '\0'; 
    380     switch(r->sa_family) { 
    381       case AF_INET: 
    382         inet_ntop(AF_INET, &(((struct sockaddr_in *)r)->sin_addr), 
    383                   raddr, sizeof(raddr)); 
    384         break; 
    385       case AF_INET6: 
    386         inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)r)->sin6_addr), 
    387                   raddr, sizeof(raddr)); 
    388         break; 
    389       default: 
    390         noitL(noit_error, "remote address of family %d\n", r->sa_family); 
    391     } 
    392   
    393597    scp = d->data; 
    394598#define PROCESS_NEXT_FIELD(t,l) do { \ 
     
    459663        d->whence = (time_t)strtoul(token, NULL, 10); 
    460664        PROCESS_NEXT_FIELD(token, len); 
     665        sid = uuid_to_sid(token, remote_cn); 
     666        if(sid == 0) goto bad_row; 
     667        DECLARE_PARAM_INT(sid); /* sid */ 
    461668        DECLARE_PARAM_STR(token,len); /* uuid */ 
    462669        PROCESS_NEXT_FIELD(token, len); 
     
    472679        d->whence = (time_t)strtoul(token, NULL, 10); 
    473680        PROCESS_NEXT_FIELD(token, len); 
    474         DECLARE_PARAM_STR(token,len); /* uuid */ 
     681        sid = uuid_to_sid(token, remote_cn); 
     682        if(sid == 0) goto bad_row; 
     683        DECLARE_PARAM_INT(sid); /* sid */ 
    475684        PROCESS_NEXT_FIELD(token, len); 
    476685        DECLARE_PARAM_STR(token,len); /* name */ 
     
    485694        d->whence = (time_t)strtoul(token, NULL, 10); 
    486695        PROCESS_NEXT_FIELD(token, len); 
    487         DECLARE_PARAM_STR(token,len); /* uuid */ 
     696        sid = uuid_to_sid(token, remote_cn); 
     697        if(sid == 0) goto bad_row; 
     698        DECLARE_PARAM_INT(sid); /* sid */ 
    488699        PROCESS_NEXT_FIELD(token, len); 
    489700        DECLARE_PARAM_STR(token,len); /* state */ 
     
    547758} 
    548759static int 
     760stratcon_database_post_connect(conn_q *cq) { 
     761  int rv = 0; 
     762  ds_single_detail _d = { 0 }, *d = &_d; 
     763  if(cq->remote_str) { 
     764    char *remote_str, *remote_cn; 
     765    /* This is the silly way we get null's in through our declare_param_str */ 
     766    remote_str = cq->remote_str ? cq->remote_str : "[[null]]"; 
     767    remote_cn = cq->remote_cn ? cq->remote_cn : "[[null]]"; 
     768    /* This is a storage node, it gets the storage node post_connect */ 
     769    GET_QUERY(storage_post_connect); 
     770    rv = -1; /* now we're serious */ 
     771    DECLARE_PARAM_STR(remote_str, strlen(remote_str)); 
     772    DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 
     773    PG_EXEC(storage_post_connect); 
     774    PQclear(d->res); 
     775    rv = 0; 
     776  } 
     777  else { 
     778    /* Metanode post_connect */ 
     779    GET_QUERY(metanode_post_connect); 
     780    rv = -1; /* now we're serious */ 
     781    PG_EXEC(metanode_post_connect); 
     782    PQclear(d->res); 
     783    rv = 0; 
     784  } 
     785 bad_row: 
     786  free_params(d); 
     787  if(rv == -1) { 
     788    /* Post-connect intentions are serious and fatal */ 
     789    PQfinish(cq->dbh); 
     790    cq->dbh = NULL; 
     791  } 
     792  return rv; 
     793} 
     794static int 
    549795stratcon_database_connect(conn_q *cq) { 
    550   char dsn[512]; 
     796  char *dsn, dsn_meta[512]; 
    551797  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
    552798  const char *k, *v; 
     
    554800  noit_hash_table *t; 
    555801 
    556   dsn[0] = '\0'; 
    557   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 
    558   while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 
    559     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); 
    560     strlcat(dsn, k, sizeof(dsn)); 
    561     strlcat(dsn, "=", sizeof(dsn)); 
    562     strlcat(dsn, v, sizeof(dsn)); 
    563   } 
    564   noit_hash_destroy(t, free, free); 
    565   free(t); 
     802  dsn_meta[0] = '\0'; 
     803  if(!cq->dsn) { 
     804    t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 
     805    while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 
     806      if(dsn_meta[0]) strlcat(dsn_meta, " ", sizeof(dsn_meta)); 
     807      strlcat(dsn_meta, k, sizeof(dsn_meta)); 
     808      strlcat(dsn_meta, "=", sizeof(dsn_meta)); 
     809      strlcat(dsn_meta, v, sizeof(dsn_meta)); 
     810    } 
     811    noit_hash_destroy(t, free, free); 
     812    free(t); 
     813    dsn = dsn_meta; 
     814  } 
     815  else dsn = cq->dsn; 
    566816 
    567817  if(cq->dbh) { 
    568818    if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    569819    PQreset(cq->dbh); 
     820    if(stratcon_database_post_connect(cq)) return -1; 
    570821    if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    571822    noitL(noit_error, "Error reconnecting to database: '%s'\nError: %s\n", 
     
    576827  cq->dbh = PQconnectdb(dsn); 
    577828  if(!cq->dbh) return -1; 
     829  if(stratcon_database_post_connect(cq)) return -1; 
    578830  if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
    579831  noitL(noit_error, "Error connection to database: '%s'\nError: %s\n", 
     
    625877stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, 
    626878                                 struct timeval *now) { 
    627   conn_q *cq = closure; 
    628   ds_job_detail *current, *next
     879  ds_rt_detail *dsjd = closure; 
     880  conn_q *cq
    629881  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    630882 
    631   if(!cq->head) return 0;  
    632  
     883  cq = get_conn_q_for_metanode(); 
    633884  stratcon_database_connect(cq); 
    634  
    635   current = cq->head;  
    636   while(current) { 
    637     if(current->rt) { 
    638       next = current->next; 
    639       stratcon_datastore_find(cq, current); 
    640       current = next; 
    641     } 
    642     else if(current->completion_event) { 
    643       next = current->next; 
    644       eventer_add(current->completion_event); 
    645       current = next; 
    646       __remove_until(cq, current); 
    647     } 
    648     else current = current->next; 
    649   } 
     885  assert(dsjd->rt); 
     886  stratcon_datastore_find(cq, dsjd); 
     887  if(dsjd->completion_event) 
     888    eventer_add(dsjd->completion_event); 
     889 
     890  free_params((ds_single_detail *)dsjd); 
     891  free(dsjd); 
     892  release_conn_q(cq); 
    650893  return 0; 
     894} 
     895static const char * 
     896get_dsn_from_storagenode_id(int id, int can_use_db, char **fqdn_out) { 
     897  void *vinfo; 
     898  const char *dsn = NULL, *fqdn = NULL; 
     899  int found = 0; 
     900  storagenode_info *info = NULL; 
     901  pthread_mutex_lock(&storagenode_to_info_cache_lock); 
     902  if(noit_hash_retrieve(&storagenode_to_info_cache, (void *)&id, sizeof(id), 
     903                        &vinfo)) { 
     904    found = 1; 
     905    info = vinfo; 
     906  } 
     907  pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
     908  if(found) { 
     909    if(fqdn_out) *fqdn_out = info->fqdn; 
     910    return info->dsn; 
     911  } 
     912 
     913  if(!found && can_use_db) { 
     914    ds_single_detail *d; 
     915    conn_q *cq; 
     916    int row_count; 
     917    /* Look it up and store it */ 
     918    d = calloc(1, sizeof(*d)); 
     919    cq = get_conn_q_for_metanode(); 
     920    GET_QUERY(find_storage); 
     921    DECLARE_PARAM_INT(id); 
     922    PG_EXEC(find_storage); 
     923    row_count = PQntuples(d->res); 
     924    if(row_count) { 
     925      PG_GET_STR_COL(dsn, 0, "dsn"); 
     926      PG_GET_STR_COL(fqdn, 0, "fqdn"); 
     927    } 
     928    PQclear(d->res); 
     929   bad_row: 
     930    free_params(d); 
     931    free(d); 
     932    release_conn_q(cq); 
     933  } 
     934  if(fqdn) { 
     935    info = calloc(1, sizeof(*info)); 
     936    info->fqdn = strdup(fqdn); 
     937    if(fqdn_out) *fqdn_out = info->fqdn; 
     938    info->dsn = dsn ? strdup(dsn) : NULL; 
     939    info->storagenode_id = id; 
     940    pthread_mutex_lock(&storagenode_to_info_cache_lock); 
     941    noit_hash_store(&storagenode_to_info_cache, 
     942                    (void *)&info->storagenode_id, sizeof(int), info); 
     943    pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
     944  } 
     945  return info ? info->dsn : NULL; 
     946} 
     947static ds_line_detail * 
     948build_insert_batch(interim_journal_t *ij) { 
     949  int rv; 
     950  off_t len; 
     951  const char *buff, *cp, *lcp; 
     952  struct stat st; 
     953  ds_line_detail *head = NULL, *last = NULL, *next = NULL; 
     954 
     955  if(ij->fd < 0) { 
     956    ij->fd = open(ij->filename, O_RDONLY); 
     957    if(ij->fd < 0) { 
     958      noitL(noit_error, "Cannot open interim journal '%s': %s\n", 
     959            ij->filename, strerror(errno)); 
     960      assert(ij->fd >= 0); 
     961    } 
     962  } 
     963  while((rv = fstat(ij->fd, &st)) == -1 && errno == EINTR); 
     964  assert(rv != -1); 
     965  len = st.st_size; 
     966  buff = mmap(NULL, len, PROT_READ, MAP_PRIVATE, ij->fd, 0); 
     967  if(buff == (void *)-1) { 
     968    noitL(noit_error, "mmap(%s) => %s\n", ij->filename, strerror(errno)); 
     969    assert(buff != (void *)-1); 
     970  } 
     971  lcp = buff; 
     972  while(lcp < (buff + len) && 
     973        NULL != (cp = strnstrn("\n", 1, lcp, len - (lcp-buff)))) { 
     974    next = calloc(1, sizeof(*next)); 
     975    next->data = malloc(cp - lcp + 1); 
     976    memcpy(next->data, lcp, cp - lcp); 
     977    next->data[cp - lcp] = '\0'; 
     978    if(!head) head = next; 
     979    if(last) last->next = next; 
     980    last = next; 
     981    lcp = cp + 1; 
     982  } 
     983  munmap((void *)buff, len); 
     984  close(ij->fd); 
     985  return head; 
     986} 
     987static void 
     988interim_journal_remove(interim_journal_t *ij) { 
     989  unlink(ij->filename); 
     990  if(ij->filename) free(ij->filename); 
     991  if(ij->remote_str) free(ij->remote_str); 
     992  if(ij->remote_cn) free(ij->remote_cn); 
     993  if(ij->fqdn) free(ij->fqdn); 
    651994} 
    652995int 
     
    654997                                  struct timeval *now) { 
    655998  int i; 
    656   conn_q *cq = closure; 
    657   ds_job_detail *current, *last_sp; 
     999  interim_journal_t *ij; 
     1000  ds_line_detail *head, *current, *last_sp; 
     1001  const char *dsn; 
     1002  conn_q *cq; 
    6581003  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    6591004  if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
    660   if(!cq->head) return 0;  
    661  
     1005 
     1006  ij = closure; 
     1007  dsn = get_dsn_from_storagenode_id(ij->storagenode_id, 1, &ij->fqdn); 
     1008  cq = get_conn_q_for_remote(ij->remote_str, ij->remote_cn, 
     1009                             ij->fqdn, dsn); 
     1010  noitL(noit_debug, "stratcon_datastore_asynch_execute[%s,%s,%s]\n", 
     1011        ij->remote_str, ij->remote_cn, ij->fqdn); 
    6621012 full_monty: 
    6631013  /* Make sure we have a connection */ 
     
    6701020  } 
    6711021 
    672   current = cq->head;  
     1022  head = build_insert_batch(ij); 
     1023  current = head;  
    6731024  last_sp = NULL; 
    6741025  if(stratcon_datastore_do(cq, "BEGIN")) BUSTED(cq); 
     
    6791030  
    6801031      if(current->problematic) { 
    681         noitL(noit_error, "[%s] Failed noit line: %s", cq->jobq->queue_name, current->data); 
     1032        noitL(ingest_err, "%d\t%s\n", ij->storagenode_id, current->data); 
    6821033        RELEASE_SAVEPOINT("batch"); 
    6831034        current = current->next; 
    6841035        continue; 
    6851036      }  
    686       rv = stratcon_datastore_execute(cq, cq->remote, current); 
     1037      rv = stratcon_datastore_execute(cq, cq->remote_str, cq->remote_cn, 
     1038                                      current); 
    6871039      switch(rv) { 
    6881040        case DS_EXEC_SUCCESS: 
     
    6991051      } 
    7001052    } 
    701     if(current->completion_event) { 
    702       if(last_sp) RELEASE_SAVEPOINT("batch"); 
    703       if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); 
    704       eventer_add(current->completion_event); 
    705       current = current->next; 
    706       __remove_until(cq, current); 
    707     } 
    708   } 
     1053  } 
     1054  if(last_sp) RELEASE_SAVEPOINT("batch"); 
     1055  if(stratcon_datastore_do(cq, "COMMIT")) BUSTED(cq); 
     1056  /* Cleanup the mess */ 
     1057  while(head) { 
     1058    ds_line_detail *tofree; 
     1059    tofree = head; 
     1060    head = head->next; 
     1061    if(tofree->data) free(tofree->data); 
     1062    free_params((ds_single_detail *)tofree); 
     1063    free(tofree); 
     1064  } 
     1065  interim_journal_remove(ij); 
     1066  release_conn_q(cq); 
    7091067  return 0; 
     1068} 
     1069static int 
     1070stratcon_datastore_journal_sync(eventer_t e, int mask, void *closure, 
     1071                                struct timeval *now) { 
     1072  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     1073  const char *k; 
     1074  int klen; 
     1075  void *vij; 
     1076  interim_journal_t *ij; 
     1077  syncset_t *syncset = closure; 
     1078 
     1079  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     1080  if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
     1081 
     1082  noitL(noit_debug, "Syncing journal sets...\n"); 
     1083  while(noit_hash_next(syncset->ws, &iter, &k, &klen, &vij)) { 
     1084    eventer_t ingest; 
     1085    ij = vij; 
     1086    noitL(noit_debug, "Syncing journal set [%s,%s,%s]\n", 
     1087          ij->remote_str, ij->remote_cn, ij->fqdn); 
     1088    fsync(ij->fd); 
     1089    close(ij->fd); 
     1090    ij->fd = -1; 
     1091    ingest = eventer_alloc(); 
     1092    ingest->mask = EVENTER_ASYNCH; 
     1093    ingest->callback = stratcon_datastore_asynch_execute; 
     1094    ingest->closure = ij; 
     1095    eventer_add_asynch(ij->cpool->jobq, ingest); 
     1096  } 
     1097  noit_hash_destroy(syncset->ws, free, NULL); 
     1098  free(syncset->ws); 
     1099  eventer_add(syncset->completion); 
     1100  free(syncset); 
     1101  return 0; 
     1102} 
     1103static interim_journal_t * 
     1104interim_journal_get(struct sockaddr *remote, const char *remote_cn_in, 
     1105                    int storagenode_id, const char *fqdn_in) { 
     1106  void *vhash, *vij; 
     1107  noit_hash_table *working_set; 
     1108  interim_journal_t *ij; 
     1109  struct timeval now; 
     1110  char jpath[PATH_MAX]; 
     1111  char remote_str[128]; 
     1112  const char *remote_cn = remote_cn_in ? remote_cn_in : "default"; 
     1113  const char *fqdn = fqdn_in ? fqdn_in : "default"; 
     1114 
     1115  convert_sockaddr_to_buff(remote_str, sizeof(remote_str), remote); 
     1116  if(!*remote_str) strlcpy(remote_str, "default", sizeof(remote_str)); 
     1117 
     1118  /* Lookup the working set */ 
     1119  if(!noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) { 
     1120    working_set = calloc(1, sizeof(*working_set)); 
     1121    noit_hash_store(&working_sets, strdup(remote_cn), strlen(remote_cn), 
     1122                    working_set); 
     1123  } 
     1124  else 
     1125    working_set = vhash; 
     1126 
     1127  /* Lookup the interim journal within the working set */ 
     1128  if(!noit_hash_retrieve(working_set, fqdn, strlen(fqdn), &vij)) { 
     1129    ij = calloc(1, sizeof(*ij)); 
     1130    gettimeofday(&now, NULL); 
     1131    snprintf(jpath, sizeof(jpath), "%s/%s/%s/%d/%08x%08x", 
     1132             basejpath, remote_str, remote_cn, storagenode_id, 
     1133             (unsigned int)now.tv_sec, (unsigned int)now.tv_usec); 
     1134    ij->remote_str = strdup(remote_str); 
     1135    ij->remote_cn = strdup(remote_cn); 
     1136    ij->fqdn = strdup(fqdn); 
     1137    ij->storagenode_id = storagenode_id; 
     1138    ij->filename = strdup(jpath); 
     1139    ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 
     1140                                         ij->fqdn); 
     1141    ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640); 
     1142    if(ij->fd < 0 && errno == ENOENT) { 
     1143      if(mkdir_for_file(ij->filename, 0750)) { 
     1144        noitL(noit_error, "Failed to create dir for '%s': %s\n", 
     1145              ij->filename, strerror(errno)); 
     1146        exit(-1); 
     1147      } 
     1148      ij->fd = open(ij->filename, O_RDWR | O_CREAT | O_EXCL, 0640); 
     1149    } 
     1150    if(ij->fd < 0) { 
     1151      noitL(noit_error, "Failed to open interim journal '%s': %s\n", 
     1152            ij->filename, strerror(errno)); 
     1153      exit(-1); 
     1154    } 
     1155    noit_hash_store(working_set, strdup(fqdn), strlen(fqdn), ij); 
     1156  } 
     1157  else 
     1158    ij = vij; 
     1159 
     1160  return ij; 
     1161} 
     1162static void 
     1163storage_node_quick_lookup(const char *uuid_str, const char *remote_cn, 
     1164                          int *sid_out, int *storagenode_id_out, 
     1165                          char **fqdn_out, char **dsn_out) { 
     1166  /* only called from the main thread -- no safety issues */ 
     1167  void *vuuidinfo, *vinfo; 
     1168  uuid_info *uuidinfo; 
     1169  storagenode_info *info = NULL; 
     1170  char *fqdn = NULL; 
     1171  char *dsn = NULL; 
     1172  int storagenode_id = 0, sid = 0; 
     1173  if(!noit_hash_retrieve(&uuid_to_info_cache, uuid_str, strlen(uuid_str), 
     1174                         &vuuidinfo)) { 
     1175    int row_count; 
     1176    char *tmpint; 
     1177    ds_single_detail *d; 
     1178    conn_q *cq; 
     1179    d = calloc(1, sizeof(*d)); 
     1180    cq = get_conn_q_for_metanode(); 
     1181    if(stratcon_database_connect(cq) == 0) { 
     1182      /* Blocking call to service the cache miss */ 
     1183      GET_QUERY(check_map); 
     1184      DECLARE_PARAM_STR(uuid_str, strlen(uuid_str)); 
     1185      DECLARE_PARAM_STR(remote_cn, strlen(remote_cn)); 
     1186      PG_EXEC(check_map); 
     1187      row_count = PQntuples(d->res); 
     1188      if(row_count != 1) { 
     1189        PQclear(d->res); 
     1190        goto bad_row; 
     1191      } 
     1192      PG_GET_STR_COL(tmpint, 0, "sid"); 
     1193      sid = atoi(tmpint); 
     1194      PG_GET_STR_COL(tmpint, 0, "storage_node_id"); 
     1195      if(tmpint) storagenode_id = atoi(tmpint); 
     1196      PG_GET_STR_COL(fqdn, 0, "fqdn"); 
     1197      PG_GET_STR_COL(dsn, 0, "dsn"); 
     1198      PQclear(d->res); 
     1199    } 
     1200   bad_row: 
     1201    free_params((ds_single_detail *)d); 
     1202    free(d); 
     1203    release_conn_q(cq); 
     1204    /* Place in cache */ 
     1205    if(fqdn) fqdn = strdup(fqdn); 
     1206    uuidinfo = calloc(1, sizeof(*uuidinfo)); 
     1207    uuidinfo->sid = sid; 
     1208    uuidinfo->uuid_str = strdup(uuid_str); 
     1209    noit_hash_store(&uuid_to_info_cache, 
     1210                    uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 
     1211    /* Also, we may have just witnessed a new storage node, store it */ 
     1212    if(storagenode_id) { 
     1213      int needs_free = 0; 
     1214      info = calloc(1, sizeof(*info)); 
     1215      info->storagenode_id = storagenode_id; 
     1216      info->dsn = dsn ? strdup(dsn) : NULL; 
     1217      info->fqdn = fqdn ? strdup(fqdn) : NULL; 
     1218      pthread_mutex_lock(&storagenode_to_info_cache_lock); 
     1219      if(!noit_hash_retrieve(&storagenode_to_info_cache, 
     1220                             (void *)&storagenode_id, sizeof(int), &vinfo)) { 
     1221        /* hack to save memory -- we *never* remove from these caches, 
     1222           so we can use the same fqdn value in the above cache for the key 
     1223           in the cache below -- (no strdup) */ 
     1224        noit_hash_store(&storagenode_to_info_cache, 
     1225                        (void *)&info->storagenode_id, sizeof(int), info); 
     1226      } 
     1227      else needs_free = 1; 
     1228      pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
     1229      if(needs_free) { 
     1230        if(info->dsn) free(info->dsn); 
     1231        if(info->fqdn) free(info->fqdn); 
     1232        free(info); 
     1233      } 
     1234    } 
     1235  } 
     1236  else 
     1237    uuidinfo = vuuidinfo; 
     1238 
     1239  if(storagenode_id) { 
     1240    if(uuidinfo && 
     1241       ((!dsn && dsn_out) || (!fqdn && fqdn_out))) { 
     1242      /* we don't have dsn and we actually want it */ 
     1243      pthread_mutex_lock(&storagenode_to_info_cache_lock); 
     1244      if(noit_hash_retrieve(&storagenode_to_info_cache, 
     1245                            (void *)&storagenode_id, sizeof(int), &vinfo)) 
     1246        info = vinfo; 
     1247      pthread_mutex_unlock(&storagenode_to_info_cache_lock); 
     1248    } 
     1249  } 
     1250 
     1251  if(fqdn_out) *fqdn_out = fqdn ? fqdn : (info ? info->fqdn : NULL); 
     1252  if(dsn_out) *dsn_out = dsn ? dsn : (info ? info->dsn : NULL); 
     1253  if(storagenode_id_out) *storagenode_id_out = uuidinfo->storagenode_id; 
     1254  if(sid_out) *sid_out = uuidinfo->sid; 
     1255} 
     1256static int 
     1257uuid_to_sid(const char *uuid_str_in, const char *remote_cn) { 
     1258  char uuid_str[UUID_STR_LEN+1]; 
     1259  int sid = 0; 
     1260  strlcpy(uuid_str, uuid_str_in, sizeof(uuid_str)); 
     1261  storage_node_quick_lookup(uuid_str, remote_cn, &sid, NULL, NULL, NULL); 
     1262  return sid; 
     1263} 
     1264static void 
     1265stratcon_datastore_journal(struct sockaddr *remote, 
     1266                           const char *remote_cn, const char *line) { 
     1267  interim_journal_t *ij = NULL; 
     1268  char uuid_str[UUID_STR_LEN+1], *cp, *fqdn, *dsn; 
     1269  int storagenode_id = 0; 
     1270  uuid_t checkid; 
     1271  if(!line) return; 
     1272  /* if it is a UUID based thing, find the storage node */ 
     1273  switch(*line) { 
     1274    case 'C': 
     1275    case 'S': 
     1276    case 'M': 
     1277      if(line[1] == '\t' && (cp = strchr(line+2, '\t')) != NULL) { 
     1278        strlcpy(uuid_str, cp + 1, sizeof(uuid_str)); 
     1279        if(!uuid_parse(uuid_str, checkid)) { 
     1280          storage_node_quick_lookup(uuid_str, remote_cn, NULL, 
     1281                                    &storagenode_id, &fqdn, &dsn); 
     1282          ij = interim_journal_get(remote, remote_cn, storagenode_id, fqdn); 
     1283        } 
     1284      } 
     1285      break; 
     1286    case 'n': 
     1287      ij = interim_journal_get(remote,remote_cn,0,NULL); 
     1288      break; 
     1289    default: 
     1290      break; 
     1291  } 
     1292  if(!ij && fqdn) { 
     1293    noitL(ingest_err, "%d\t%s\n", storagenode_id, line); 
     1294  } 
     1295  else { 
     1296    int len; 
     1297    len = write(ij->fd, line, strlen(line)); 
     1298    if(len < 0) { 
     1299      noitL(noit_error, "write to %s failed: %s\n", 
     1300            ij->filename, strerror(errno)); 
     1301    } 
     1302  } 
     1303  return; 
     1304} 
     1305static noit_hash_table * 
     1306stratcon_datastore_journal_remove(struct sockaddr *remote, 
     1307                                  const char *remote_cn) { 
     1308  void *vhash = NULL; 
     1309  if(noit_hash_retrieve(&working_sets, remote_cn, strlen(remote_cn), &vhash)) { 
     1310    /* pluck it out */ 
     1311    noit_hash_delete(&working_sets, remote_cn, strlen(remote_cn), free, NULL); 
     1312  } 
     1313  else { 
     1314    noitL(noit_error, "attempted checkpoint on non-existing workingset: '%s'\n", 
     1315          remote_cn); 
     1316    abort(); 
     1317  } 
     1318  return vhash; 
    7101319} 
    7111320void 
    7121321stratcon_datastore_push(stratcon_datastore_op_t op, 
    713                         struct sockaddr *remote, void *operand) { 
    714   conn_q *cq; 
     1322                        struct sockaddr *remote, 
     1323                        const char *remote_cn, void *operand, 
     1324                        eventer_t completion) { 
     1325  conn_pool *cpool; 
     1326  syncset_t *syncset; 
    7151327  eventer_t e; 
    716   ds_job_detail *dsjd
     1328  ds_rt_detail *rtdetail
    7171329  struct datastore_onlooker_list *nnode; 
    7181330 
    7191331  for(nnode = onlookers; nnode; nnode = nnode->next) 
    720     nnode->dispatch(op,remote,operand); 
    721  
    722   cq = __get_conn_q_for_remote(remote); 
    723   dsjd = calloc(1, sizeof(*dsjd)); 
     1332    nnode->dispatch(op,remote,remote_cn,operand); 
     1333 
    7241334  switch(op) { 
    725     case DS_OP_FIND: 
    726       dsjd->rt = operand; 
    727       __append(cq, dsjd); 
     1335    case DS_OP_INSERT: 
     1336      stratcon_datastore_journal(remote, remote_cn, (const char *)operand); 
    7281337      break; 
    729     case DS_OP_INSERT: 
    730       dsjd->data = operand; 
    731       __append(cq, dsjd); 
     1338    case DS_OP_CHKPT: 
     1339      e = eventer_alloc(); 
     1340      syncset = calloc(1, sizeof(*syncset)); 
     1341      e->mask = EVENTER_ASYNCH; 
     1342      e->callback = stratcon_datastore_journal_sync; 
     1343      syncset->ws = stratcon_datastore_journal_remove(remote, remote_cn); 
     1344      syncset->completion = completion; 
     1345      e->closure = syncset; 
     1346      eventer_add(e); 
    7321347      break; 
    7331348    case DS_OP_FIND_COMPLETE: 
    734     case DS_OP_CHKPT: 
    735       dsjd->completion_event = operand; 
    736       __append(cq,dsjd); 
     1349      cpool = get_conn_pool_for_remote(NULL,NULL,NULL); 
     1350      rtdetail = calloc(1, sizeof(*rtdetail)); 
     1351      rtdetail->rt = operand; 
     1352      rtdetail->completion_event = completion; 
    7371353      e = eventer_alloc(); 
    7381354      e->mask = EVENTER_ASYNCH; 
    739       if(op == DS_OP_FIND_COMPLETE) 
    740         e->callback = stratcon_datastore_asynch_lookup; 
    741       else if(op == DS_OP_CHKPT) 
    742         e->callback = stratcon_datastore_asynch_execute; 
    743       e->closure = cq; 
    744       eventer_add_asynch(cq->jobq, e); 
     1355      e->callback = stratcon_datastore_asynch_lookup; 
     1356      e->closure = rtdetail; 
     1357      eventer_add_asynch(cpool->jobq, e); 
    7451358      break; 
    7461359  } 
     
    7501363stratcon_datastore_saveconfig(void *unused) { 
    7511364  int rv = -1; 
    752   conn_q _cq = { 0 }, *cq = &_cq; 
    7531365  char *buff; 
    7541366  ds_single_detail _d = { 0 }, *d = &_d; 
     1367  conn_q *cq; 
     1368  cq = get_conn_q_for_metanode(); 
    7551369 
    7561370  if(stratcon_database_connect(cq) == 0) { 
     
    7751389      free_params(d); 
    7761390  } 
    777   if(cq->dbh) PQfinish(cq->dbh); 
     1391  release_conn_q(cq); 
    7781392  return rv; 
    7791393} 
     
    7811395void 
    7821396stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t, 
    783                                                struct sockaddr *, void *)) { 
     1397                                               struct sockaddr *, 
     1398                                               const char *, void *)) { 
    7841399  struct datastore_onlooker_list *nnode; 
    7851400  nnode = calloc(1, sizeof(*nnode)); 
     
    7891404    nnode->next = onlookers; 
    7901405} 
     1406static void 
     1407stratcon_datastore_launch_file_ingestion(char *remote_str, char *remote_cn, 
     1408                                         char *id_str, char *file) { 
     1409  char path[PATH_MAX]; 
     1410  interim_journal_t *ij; 
     1411  eventer_t ingest; 
     1412 
     1413  snprintf(path, sizeof(path), "%s/%s/%s/%s/%s", 
     1414           basejpath, remote_str, remote_cn, id_str, file); 
     1415  ij = calloc(1, sizeof(*ij)); 
     1416  ij->fd = open(path, O_RDONLY); 
     1417  if(ij->fd < 0) { 
     1418    noitL(noit_error, "cannot open journal '%s': %s\n", 
     1419          path, strerror(errno)); 
     1420    free(ij); 
     1421    return; 
     1422  } 
     1423  close(ij->fd); 
     1424  ij->fd = -1; 
     1425  ij->filename = strdup(path); 
     1426  ij->remote_str = strdup(remote_str); 
     1427  ij->remote_cn = strdup(remote_cn); 
     1428  ij->storagenode_id = atoi(id_str); 
     1429  ij->cpool = get_conn_pool_for_remote(ij->remote_str, ij->remote_cn, 
     1430                                       ij->fqdn); 
     1431  noitL(noit_error, "ingesting old payload: %s\n", ij->filename); 
     1432  ingest = eventer_alloc(); 
     1433  ingest->mask = EVENTER_ASYNCH; 
     1434  ingest->callback = stratcon_datastore_asynch_execute; 
     1435  ingest->closure = ij; 
     1436  eventer_add_asynch(ij->cpool->jobq, ingest); 
     1437} 
     1438static void 
     1439stratcon_datastore_sweep_journals_int(char *first, char *second, char *third) { 
     1440  char path[PATH_MAX]; 
     1441  DIR *root; 
     1442  struct dirent de, *entry; 
     1443  int i = 0, cnt = 0; 
     1444  char **entries; 
     1445 
     1446  snprintf(path, sizeof(path), "%s%s%s%s%s%s%s", basejpath, 
     1447           first ? "/" : "", first ? first : "", 
     1448           second ? "/" : "", second ? second : "", 
     1449           third ? "/" : "", third ? third : ""); 
     1450  root = opendir(path); 
     1451  if(!root) return; 
     1452  while(readdir_r(root, &de, &entry) == 0 && entry != NULL) cnt++; 
     1453  rewinddir(root); 
     1454  entries = malloc(sizeof(*entries) * cnt); 
     1455  while(readdir_r(root, &de, &entry) == 0 && entry != NULL) { 
     1456    if(i < cnt) { 
     1457      entries[i++] = strdup(entry->d_name); 
     1458    } 
     1459  } 
     1460  closedir(root); 
     1461  cnt = i; /* could have changed, directories are fickle */ 
     1462  qsort(entries, i, sizeof(*entries), 
     1463        (int (*)(const void *, const void *))strcasecmp); 
     1464  for(i=0; i<cnt; i++) { 
     1465    if(!strcmp(entries[i], ".") || !strcmp(entries[i], "..")) continue; 
     1466    if(!first) 
     1467      stratcon_datastore_sweep_journals_int(entries[i], NULL, NULL); 
     1468    else if(!second) 
     1469      stratcon_datastore_sweep_journals_int(first, entries[i], NULL); 
     1470    else if(!third) 
     1471      stratcon_datastore_sweep_journals_int(first, second, entries[i]); 
     1472    else if(strlen(entries[i]) == 16) 
     1473      stratcon_datastore_launch_file_ingestion(first,second,third,entries[i]); 
     1474  } 
     1475} 
     1476static void 
     1477stratcon_datastore_sweep_journals() { 
     1478  stratcon_datastore_sweep_journals_int(NULL,NULL,NULL); 
     1479} 
     1480 
     1481int 
     1482stratcon_datastore_ingest_all_storagenode_info() { 
     1483  int i, cnt; 
     1484  ds_single_detail _d = { 0 }, *d = &_d; 
     1485  conn_q *cq; 
     1486  cq = get_conn_q_for_metanode(); 
     1487 
     1488  while(stratcon_database_connect(cq)) { 
     1489    noitL(noit_error, "Error connecting to database\n"); 
     1490    sleep(1); 
     1491  } 
     1492 
     1493  GET_QUERY(all_storage); 
     1494  PG_EXEC(all_storage); 
     1495  cnt = PQntuples(d->res); 
     1496  for(i=0; i<cnt; i++) { 
     1497    void *vinfo; 
     1498    char *tmpint, *fqdn, *dsn; 
     1499    int storagenode_id; 
     1500    PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
     1501    storagenode_id = atoi(tmpint); 
     1502    PG_GET_STR_COL(fqdn, i, "fqdn"); 
     1503    PG_GET_STR_COL(dsn, i, "dsn"); 
     1504    PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
     1505    storagenode_id = tmpint ? atoi(tmpint) : 0; 
     1506 
     1507    if(!noit_hash_retrieve(&storagenode_to_info_cache, 
     1508                           (void *)&storagenode_id, sizeof(int), &vinfo)) { 
     1509      storagenode_info *info; 
     1510      info = calloc(1, sizeof(*info)); 
     1511      info->storagenode_id = storagenode_id; 
     1512      info->fqdn = fqdn ? strdup(fqdn) : NULL; 
     1513      info->dsn = dsn ? strdup(dsn) : NULL; 
     1514      noit_hash_store(&storagenode_to_info_cache, 
     1515                      (void *)&info->storagenode_id, sizeof(int), info); 
     1516    } 
     1517  } 
     1518  PQclear(d->res); 
     1519 bad_row: 
     1520  free_params(d); 
     1521 
     1522  release_conn_q(cq); 
     1523  noitL(noit_error, "Loaded %d storage nodes\n", cnt); 
     1524  return cnt; 
     1525} 
     1526int 
     1527stratcon_datastore_ingest_all_check_info() { 
     1528  int i, cnt, loaded = 0; 
     1529  ds_single_detail _d = { 0 }, *d = &_d; 
     1530  conn_q *cq; 
     1531  cq = get_conn_q_for_metanode(); 
     1532 
     1533  while(stratcon_database_connect(cq)) { 
     1534    noitL(noit_error, "Error connecting to database\n"); 
     1535    sleep(1); 
     1536  } 
     1537 
     1538  GET_QUERY(check_mapall); 
     1539  PG_EXEC(check_mapall); 
     1540  cnt = PQntuples(d->res); 
     1541  for(i=0; i<cnt; i++) { 
     1542    void *vinfo; 
     1543    char *tmpint, *fqdn, *dsn, *uuid_str; 
     1544    int sid, storagenode_id; 
     1545    uuid_info *uuidinfo; 
     1546    PG_GET_STR_COL(uuid_str, i, "id"); 
     1547    if(!uuid_str) continue; 
     1548    PG_GET_STR_COL(tmpint, i, "sid"); 
     1549    if(!tmpint) continue; 
     1550    sid = atoi(tmpint); 
     1551    PG_GET_STR_COL(fqdn, i, "fqdn"); 
     1552    PG_GET_STR_COL(dsn, i, "dsn"); 
     1553    PG_GET_STR_COL(tmpint, i, "storage_node_id"); 
     1554    storagenode_id = tmpint ? atoi(tmpint) : 0; 
     1555 
     1556    uuidinfo = calloc(1, sizeof(*uuidinfo)); 
     1557    uuidinfo->uuid_str = strdup(uuid_str); 
     1558    uuidinfo->sid = sid; 
     1559    noit_hash_store(&uuid_to_info_cache, 
     1560                    uuidinfo->uuid_str, strlen(uuidinfo->uuid_str), uuidinfo); 
     1561    loaded++; 
     1562    if(!noit_hash_retrieve(&storagenode_to_info_cache, 
     1563                           (void *)&storagenode_id, sizeof(int), &vinfo)) { 
     1564      storagenode_info *info; 
     1565      info = calloc(1, sizeof(*info)); 
     1566      info->storagenode_id = storagenode_id; 
     1567      info->fqdn = fqdn ? strdup(fqdn) : NULL; 
     1568      info->dsn = dsn ? strdup(dsn) : NULL; 
     1569      noit_hash_store(&storagenode_to_info_cache, 
     1570                      (void *)&info->storagenode_id, sizeof(int), info); 
     1571    } 
     1572  } 
     1573  PQclear(d->res); 
     1574 bad_row: 
     1575  free_params(d); 
     1576 
     1577  release_conn_q(cq); 
     1578  noitL(noit_error, "Loaded %d uuid -> (sid,storage_node_id) mappings\n", loaded); 
     1579  return loaded; 
     1580} 
     1581void 
     1582stratcon_datastore_init() { 
     1583  pthread_mutex_init(&ds_conns_lock, NULL); 
     1584  pthread_mutex_init(&storagenode_to_info_cache_lock, NULL); 
     1585  ds_err = noit_log_stream_find("error/datastore"); 
     1586  ingest_err = noit_log_stream_find("error/ingest"); 
     1587  if(!ds_err) ds_err = noit_error; 
     1588  if(!ingest_err) ingest_err = noit_error; 
     1589  if(!noit_conf_get_string(NULL, "/stratcon/database/journal/path", 
     1590                           &basejpath)) { 
     1591    noitL(noit_error, "/stratcon/database/journal/path is unspecified\n"); 
     1592    exit(-1); 
     1593  } 
     1594  stratcon_datastore_ingest_all_check_info(); 
     1595  stratcon_datastore_ingest_all_storagenode_info(); 
     1596  stratcon_datastore_sweep_journals(); 
     1597} 
  • src/stratcon_datastore.h

    r88a7178 r5360a1e  
    4444 DS_OP_INSERT = 1, 
    4545 DS_OP_CHKPT = 2, 
    46  DS_OP_FIND = 3, 
    47  DS_OP_FIND_COMPLETE = 4 
     46 DS_OP_FIND_COMPLETE = 3 
    4847} stratcon_datastore_op_t; 
    4948 
    5049API_EXPORT(void) 
    5150  stratcon_datastore_push(stratcon_datastore_op_t, 
    52                           struct sockaddr *, void *); 
     51                          struct sockaddr *, const char *, void *, eventer_t); 
    5352 
    5453API_EXPORT(void) 
    5554  stratcon_datastore_register_onlooker(void (*f)(stratcon_datastore_op_t, 
    56                                                  struct sockaddr *, void *)); 
     55                                                 struct sockaddr *, 
     56                                                 const char *, void *)); 
     57 
     58API_EXPORT(void) 
     59  stratcon_datastore_init(); 
    5760 
    5861API_EXPORT(int) 
  • src/stratcon_iep.c

    r0335d9d r5360a1e  
    142142  } 
    143143  noitL(noit_error, "submitting statement: %s\n", line); 
    144   stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 
     144  stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL); 
    145145  stmt->marked = 1; 
    146146} 
     
    163163  /* Phase 1: sweep in all the statements */ 
    164164  for(i=0; i<cnt; i++) { 
    165     char id[UUID_STR_LEN]; 
     165    char id[UUID_STR_LEN+1]; 
    166166    char provides[256]; 
    167167    char *statement; 
     
    202202  /* Phase 2: load the requires graph */ 
    203203  for(i=0; i<cnt; i++) { 
    204     char id[UUID_STR_LEN]; 
     204    char id[UUID_STR_LEN+1]; 
    205205    int rcnt, j; 
    206206    char *requires; 
     
    280280  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 
    281281  for(i=0; i<cnt; i++) { 
    282     char id[UUID_STR_LEN]; 
     282    char id[UUID_STR_LEN+1]; 
    283283    char topic[256]; 
    284284    char *query; 
     
    314314      query++; 
    315315    } 
    316     stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 
     316    stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL); 
    317317  } 
    318318  free(query_configs); 
     
    419419setup_iep_connection_callback(eventer_t e, int mask, void *closure, 
    420420                              struct timeval *now) { 
    421   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL); 
     421  stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, NULL, NULL); 
    422422  return 0; 
    423423} 
     
    526526void 
    527527stratcon_iep_line_processor(stratcon_datastore_op_t op, 
    528                             struct sockaddr *remote, void *operand) { 
     528                            struct sockaddr *remote, const char *remote_cn, 
     529                            void *operand, eventer_t completion) { 
    529530  int len; 
    530531  char remote_str[128]; 
  • src/stratcon_iep.h

    r8ad126b r5360a1e  
    4848API_EXPORT(void) 
    4949  stratcon_iep_line_processor(stratcon_datastore_op_t op, 
    50                               struct sockaddr *remote, void *operand); 
     50                              struct sockaddr *remote, const char *remote_cn, 
     51                              void *operand, eventer_t completion); 
    5152 
    5253API_EXPORT(jlog_streamer_ctx_t *) 
  • src/stratcon_jlog_streamer.c

    recb7471 r06c1b70  
    371371        FULLREAD(e, ctx, (unsigned long)ctx->header.message_len); 
    372372        if(ctx->header.message_len > 0) 
    373           ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 
     373          ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn, 
     374                    ctx->buffer, NULL); 
    374375        else if(ctx->buffer) 
    375376          free(ctx->buffer); 
     
    385386          completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION; 
    386387          ctx->state = JLOG_STREAMER_IS_ASYNC; 
    387           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 
     388          ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn, 
     389                    NULL, completion_e); 
    388390          noitL(noit_debug, "Pushing batch asynch...\n"); 
    389391          return 0; 
  • src/stratcon_jlog_streamer.h

    rbe29b6f r5360a1e  
    9191  u_int64_t total_bytes_read; 
    9292 
    93   void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *); 
     93  void (*push)(stratcon_datastore_op_t, struct sockaddr *, const char *, void *, eventer_t); 
    9494} jlog_streamer_ctx_t; 
    9595 
  • src/stratcon_realtime_http.c

    r3c56016 r5360a1e  
    427427    for(node = rc->checklist; node; node = node->next) { 
    428428      noit_atomic_inc32(&ctx->ref_cnt); 
    429       stratcon_datastore_push(DS_OP_FIND, NULL, node); 
    430429      noitL(noit_error, "Resolving sid: %d\n", node->sid); 
    431430    } 
     
    435434    completion->closure = ctx; 
    436435    gettimeofday(&completion->whence, NULL); 
    437     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion); 
     436    stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL, 
     437                            rc->checklist, completion); 
    438438  } 
    439439  return EVENTER_EXCEPTION; 
  • src/stratcond.c

    r8ad126b r5360a1e  
    171171  noit_watchdog_child_eventer_heartbeat(); 
    172172 
     173  stratcon_datastore_init(); 
    173174  noit_console_init(APPNAME); 
    174175  noit_console_conf_init(); 
  • ui/web/htdocs/js/jquery.flot.js

    r5b62a23 r5efd99e  
    12881288 
    12891289                for (var i = 0; i < data.length; ++i) { 
     1290                    if(data[i] == null || data[i][0] == null || data[i][1] == null) 
     1291                        continue; 
    12901292                    prev = cur; 
    12911293                    cur = [data[i][0], data[i][1]]; 
  • ui/web/lib/Reconnoiter_DB.php

    r33b4929 r2fa2fac  
    2222  } 
    2323  function connect() { 
    24     $this->db = new PDO("pgsql:host=noit.office.omniti.com;dbname=reconnoiter", 
     24    $this->db = new PDO("pgsql:host=localhost;dbname=reconnoiter", 
    2525                        "prism", "prism"); 
    2626    $this->db->setAttribute( PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION ); 
     
    8484    $sth = $this->db->prepare(" 
    8585      select distinct(remote_address) as remote_address 
    86         from stratcon.loading_dock_check_s 
    87         join (   select id, max(whence) as whence 
    88                    from stratcon.loading_dock_check_s 
    89                group by id) latestrecord 
    90        USING (id,whence)"); 
     86        from check_currently"); 
    9187    $rv = array(); 
    9288    while($row = $sth->fetch()) { 
     
    150146                            join prism.saved_graphs_dep gd 
    151147                           using (graphid) 
    152                             join stratcon.metric_name_summary s 
     148                            join metric_name_summary s 
    153149                           using (sid,metric_name,metric_type) 
    154150                           where g.ts_search_all @@ to_tsquery(query) 
    155                               or s.ts_search_all @@ to_tsquery(query)))", 
     151                              or s.fts_data @@ to_tsquery(query)))", 
    156152      "select sheetid, title, 
    157153              to_char(last_update, 'YYYY/mm/dd') as last_update 
     
    166162                            join prism.saved_graphs_dep gd 
    167163                           using (graphid) 
    168                             join stratcon.metric_name_summary s 
     164                            join metric_name_summary s 
    169165                           using (sid,metric_name,metric_type) 
    170166                           where g.ts_search_all @@ to_tsquery(query) 
    171                               or s.ts_search_all @@ to_tsquery(query))) 
     167                              or s.fts_data @@ to_tsquery(query))) 
    172168     order by last_update desc", 
    173169      $offset, $limit); 
     
    182178            or graphid in (select graphid 
    183179                            from prism.saved_graphs_dep gd 
    184                             join stratcon.metric_name_summary s 
     180                            join metric_name_summary s 
    185181                           using (sid,metric_name,metric_type) 
    186                            where ts_search_all @@ to_tsquery(query)))", 
     182                           where fts_data @@ to_tsquery(query)))", 
    187183      "select graphid, title, json, 
    188184              to_char(last_update, 'YYYY/mm/dd') as last_update 
     
    193189            or graphid in (select graphid 
    194190                            from prism.saved_graphs_dep gd 
    195                             join stratcon.metric_name_summary s 
     191                            join metric_name_summary s 
    196192                           using (sid,metric_name,metric_type) 
    197                            where ts_search_all @@ to_tsquery(query))) 
     193                           where fts_data @@ to_tsquery(query))) 
    198194     order by last_update desc", 
    199195      $offset, $limit); 
     
    202198    return $this->run_tsearch($searchstring, 
    203199      "select count(*) as count 
    204          from stratcon.mv_loading_dock_check_s
    205          join stratcon.metric_name_summary m using (sid), 
     200         from check_currently
     201         join metric_name_summary m using (sid), 
    206202              (select ? ::text as query) q 
    207         where active = true and (query = '' or ts_search_all @@ to_tsquery(query))", 
     203        where active = true and (query = '' or fts_data @@ to_tsquery(query))", 
    208204      "select c.id, c.sid, c.remote_address, 
    209205              c.target, c.whence, c.module, c.name, 
    210206              m.metric_name, m.metric_type 
    211          from stratcon.mv_loading_dock_check_s
    212          join stratcon.metric_name_summary m using (sid), 
     207         from check_currently
     208         join metric_name_summary m using (sid), 
    213209              (select ? ::text as query) q 
    214         where active = true and (query = '' or ts_search_all @@ to_tsquery(query)) 
     210        where active = true and (query = '' or fts_data @@ to_tsquery(query)) 
    215211     order by target, module, name, remote_address", 
    216212      $offset, $limit); 
     
    238234      $ptr_groupby = ', ciamt.value'; 
    239235      $ptr_join = " 
    240         left join stratcon.mv_loading_dock_check_s cia 
     236        left join check_currently cia 
    241237               on (    $tblsrc.$want ::inet = cia.target ::inet 
    242238                   and cia.module='dns' and cia.name='in-addr.arpa') 
    243         left join stratcon.current_metric_text ciamt 
     239        left join metric_text_currently ciamt 
    244240               on (cia.sid = ciamt.sid and ciamt.name='answer')"; 
    245241    } 
     
    248244      $ptr_groupby = ', caliasmt.value'; 
    249245      $ptr_join = " 
    250         left join stratcon.current_metric_text caliasmt 
     246        left join metric_text_currently caliasmt 
    251247               on (c.sid = caliasmt.sid and caliasmt.name='alias')"; 
    252248    } 
     
    258254             min(c.sid) as sid, min(metric_type) as metric_type, 
    259255             count(1) as cnt 
    260         from stratcon.mv_loading_dock_check_s
    261         join stratcon.metric_name_summary m using (sid) 
     256        from check_currently
     257        join metric_name_summary m using (sid) 
    262258             $ptr_join 
    263259       where active = " . ($active ? "true" : "false") . $where_sql . " 
     
    293289  function get_targets($remote_address = null) { 
    294290    if($remote_address) { 
    295       $sth = $this->db->prepare("select distinct(target) as target from stratcon.loading_dock_check_s where remote_address = ?"); 
     291      $sth = $this->db->prepare("select distinct(target) as target from check_currently where remote_address = ?"); 
    296292      $sth->execute(array($remote_address)); 
    297293    } 
    298294    else { 
    299       $sth = $this->db->prepare("select distinct(target) as target from stratcon.loading_dock_check_s"); 
     295      $sth = $this->db->prepare("select distinct(target) as target from check_currently"); 
    300296      $sth->execute(); 
    301297    } 
     
    309305    $sth = $this->db->prepare(" 
    310306      select sid, id, check_name, metric_name, metric_type 
    311         from ( 
    312          select distinct on (sid, id) sid, id, name as check_name 
    313            from stratcon.loading_dock_check_s 
    314           where target = ? 
    315        order by sid, id, whence desc 
    316              ) c 
    317         join stratcon.metric_name_summary using(sid) 
     307        from check_currently 
     308        join metric_name_summary using(sid) 
    318309       where active = ? 
    319310    "); 
     
    522513    $binds = array(); 
    523514 
    524     $sql = "select m.* from stratcon.mv_loading_dock_check_s m where m.sid in ("; 
     515    $sql = "select m.* from check_currently m where m.sid in ("; 
    525516    for ($i =0 ; $i<count($sid_list);$i++){ 
    526517      $binds[] = $sid_list[$i];