Changeset a9077178423e39a94a9b624e44cd4b37899d6fd3

Show
Ignore:
Timestamp:
05/12/09 12:48:35 (6 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1242132515 +0000
git-parent:

[17cbef4037fd5c482c89bf8d7e2e0bdb43e3f4ad]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1242132515 +0000
Message:

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • configure.in

    r0b1d2f2 ra907717  
    2424AC_PATH_PROGS(XSLTPROC, xsltproc) 
    2525AC_PATH_PROGS(XMLLINT, xmllint) 
     26AC_PATH_PROGS(JAVA, java) 
     27AC_PATH_PROGS(JAVAC, javac) 
     28AC_PATH_PROGS(JAR, jar) 
    2629AC_PATH_PROG(NROFF, nroff) 
    2730AC_SUBST(PERL) 
     
    98101AC_SUBST(ATOMIC_OBJS) 
    99102AC_DEFINE_UNQUOTED(MODULEEXT, "$MODULEEXT", [module extension]) 
     103AC_MSG_CHECKING([enable build/install of the Java IEP bits]) 
     104 
     105if test -z "$ac_cv_path_JAVA" ; then 
     106        AC_MSG_RESULT([no]) 
     107else 
     108        JAVAPARTS=java 
     109        AC_MSG_RESULT([yes]) 
     110fi 
     111AC_SUBST(JAVAPARTS) 
    100112 
    101113NOIT_SVNVERSION=`svnversion` 
     
    571583src/noitedit/Makefile 
    572584src/lua/Makefile 
     585src/stomp/Makefile 
     586src/java/Makefile 
     587src/java/run-iep.sh 
    573588sql/Makefile 
    574589test/Makefile 
  • src/Makefile.in

    ra18c447 ra907717  
    3232NOWHOLE_ARCHIVE=@NOWHOLE_ARCHIVE@ 
    3333 
    34 SUBS=lua utils eventer udns modules noitedit man 
     34SUBS=lua utils eventer udns modules noitedit man stomp @JAVAPARTS@ 
    3535 
    3636NOIT_OBJS=noitd.o noit_listener.o \ 
     
    5454make-subdirs:   serf/.libs/libserf-0.o jlog/libjlog.a 
    5555        for dir in $(SUBS) ; do \ 
    56                 (cd $$dir && make) ; \ 
     56                echo "- building $$dir bits" ; \ 
     57                (cd $$dir && make -s) ; \ 
    5758        done 
    5859 
     
    9394                -Ljlog -ljlog \ 
    9495                -Lnoitedit -lnoitedit \ 
     96                -Lstomp -lstomp \ 
    9597                $(NOWHOLE_ARCHIVE) \ 
    96                 $(LIBS) $(PGLIBS) 
     98                $(LIBS) $(PGLIBS) @SERFLIBS@ 
    9799        @echo "- linking $@" 
    98100 
    99101stratcon_datastore.o:   stratcon_datastore.c 
    100102        @$(CC) $(CPPFLAGS) $(PGCFLAGS) -c $< 
     103        @echo "- compiling $<" 
     104 
     105stratcon_iep.o: stratcon_iep.c 
     106        @$(CC) $(CPPFLAGS) $(CFLAGS) @SERFCFLAGS@ -c $< 
    101107        @echo "- compiling $<" 
    102108 
     
    115121        @echo "- re2c noit_tokenizer.re" 
    116122 
    117 test-noit.conf: noit.conf.in 
     123test-noit.conf: noit.conf.in Makefile 
    118124        sed -e "s^%sysconfdir%^`pwd`^g;" \ 
    119125                -e "s^%modulesdir%^`pwd`/modules^g;" \ 
     
    123129                test-noit.conf 
    124130 
    125 noit.conf:      noit.conf.in 
     131noit.conf:      noit.conf.in Makefile 
    126132        sed -e "s^%sysconfdir%^$(sysconfdir)^g;" \ 
    127133                -e "s^%modulesdir%^$(MODULES_DIR)^g;" \ 
     
    131137                noit.conf 
    132138 
    133 test-stratcon.conf:     stratcon.conf.in 
     139test-stratcon.conf:     stratcon.conf.in Makefile 
    134140        sed -e "s^%sysconfdir%^`pwd`^g;" \ 
    135141                -e "s^%modulesdir%^`pwd`/modules^g;" \ 
    136142                -e "s^%modulesluadir%^`pwd`/modules-lua^g;" \ 
     143                -e "s^%iepbindir%^`pwd`/java^g;" \ 
     144                -e "s^%iepdbdir%^`pwd`/java^g;" \ 
    137145                -e "s^%PKIPREFIX%^../test/test-^g;" < \ 
    138146                stratcon.conf.in > \ 
    139147                test-stratcon.conf 
    140148 
    141 stratcon.conf:  stratcon.conf.in 
     149stratcon.conf:  stratcon.conf.in Makefile 
    142150        sed -e "s^%sysconfdir%^$(sysconfdir)^g;" \ 
    143151                -e "s^%modulesdir%^$(MODULES_DIR)^g;" \ 
    144152                -e "s^%modulesluadir%^$(MODULES_DIR)^g;" \ 
     153                -e "s^%iepbindir%^$(bindir)^g;" \ 
     154                -e "s^%iepdbdir%^$(prefix)/var/db^g;" \ 
    145155                -e "s^%PKIPREFIX%^$${PKIPREFIX}^g;" < \ 
    146156                stratcon.conf.in > \ 
     
    158168        (cd man && make install DESTDIR=$(DESTDIR)) 
    159169        (cd modules && make install DESTDIR=$(DESTDIR)) 
     170        test -n "@JAVAPARTS@" && (cd @JAVAPARTS@ && make install DESTDIR=$(DESTDIR)) 
    160171        (cd modules-lua && make install DESTDIR=$(DESTDIR)) 
    161172 
  • src/eventer/eventer_SSL_fd_opset.c

    r6210da7 ra907717  
    3939  unsigned long err; 
    4040  char buf[120]; /* ERR_error_string(3): buf must be at least 120 bytes */ 
    41   noitL(noit_error, "%s:%d: errno: [%d] %s\n", f, l, errno, strerror(errno)); 
     41  noitL(eventer_err, "%s:%d: errno: [%d] %s\n", f, l, errno, strerror(errno)); 
    4242  while((err = ERR_get_error()) != 0) { 
    4343    ERR_error_string(err, buf); 
    44     noitL(noit_error, "%s:%d: write error[%08lx] %s\n", f, l, err, buf); 
     44    noitL(eventer_err, "%s:%d: write error[%08lx] %s\n", f, l, err, buf); 
    4545  } 
    4646} 
     
    6868    if(!out || 
    6969       !PEM_write_bio_RSAPrivateKey(out, tmpkey, NULL, NULL, 0, 0, NULL)) { 
    70       noitL(noit_error, "Could not save temporary RSA key to %s\n", tmpkeyfile); 
     70      noitL(eventer_err, "Could not save temporary RSA key to %s\n", tmpkeyfile); 
    7171    } else { 
    7272      tmpkeyfile_time = time(NULL); 
     
    118118    if(!strcmp(opt_no_ca, "true")) ok = 1; 
    119119    else { 
    120       noitL(noit_error, "SSL client cert invalid: %s\n", 
     120      noitL(eventer_err, "SSL client cert invalid: %s\n", 
    121121            X509_verify_cert_error_string(v_res)); 
    122122      ok = 0; 
     
    128128    if(!strcmp(ignore_dates, "true")) ok = 1; 
    129129    else { 
    130       noitL(noit_error, "SSL client cert is %s valid.\n", 
     130      noitL(eventer_err, "SSL client cert is %s valid.\n", 
    131131            (v_res < 0) ? "not yet" : "no longer"); 
    132132      ok = 0; 
     
    356356      break; 
    357357    default: 
    358       noitL(noit_error, "SSL rw error: %d\n", sslerror); 
     358      noitL(eventer_err, "SSL rw error: %d\n", sslerror); 
    359359      eventer_ssl_error(); 
    360360      errno = EIO; 
     
    374374eventer_SSL_read(int fd, void *buffer, size_t len, int *mask, void *closure) { 
    375375  int rv; 
    376   noitL(noit_debug, "SSL_read(%d) wants %ld bytes\n", fd, (long int)len); 
    377376  rv = eventer_SSL_rw(SSL_OP_READ, fd, buffer, len, mask, closure); 
    378   noitL(noit_debug, "SSL_read(%d) wanted %ld bytes, got return value %d\n", fd, (long int)len, rv); 
    379377  return rv; 
    380378} 
     
    383381                  void *closure) { 
    384382  int rv; 
    385   noitL(noit_debug, "SSL_write(%d) wants %ld bytes\n", fd, (long int)len); 
    386383  rv = eventer_SSL_rw(SSL_OP_WRITE, fd, (void *)buffer, len, mask, closure); 
    387   noitL(noit_debug, "SSL_write(%d) wanted %ld bytes, got return value %d\n", fd, (long int)len, rv); 
    388384  return rv; 
    389385} 
  • src/eventer/eventer_impl.c

    r84d6f13 ra907717  
    1111 
    1212static struct timeval *eventer_impl_epoch = NULL; 
     13static int EVENTER_DEBUGGING = 0; 
    1314 
    1415#ifdef HAVE_KQUEUE 
     
    5758    return 0; 
    5859  } 
     60  else if(!strcasecmp(key, "debugging")) { 
     61    if(strcmp(value, "0")) { 
     62      EVENTER_DEBUGGING = 1; 
     63      noitL(noit_error, "Enabling debugging from property\n"); 
     64    } 
     65    return 0; 
     66  } 
    5967  return -1; 
    6068} 
     
    7381  int i; 
    7482  eventer_t e; 
     83  char *evdeb; 
    7584 
     85  evdeb = getenv("EVENTER_DEBUGGING"); 
     86  if(evdeb) { 
     87    if(strcmp(evdeb, "0")) { 
     88      /* Set to anything but "0" turns debugging on */ 
     89      EVENTER_DEBUGGING = 1; 
     90      noitL(noit_error, "Disabling eventer debugging from environment\n"); 
     91    } 
     92    else { 
     93      EVENTER_DEBUGGING = 1; 
     94      noitL(noit_error, "Enabling eventer debugging from environment\n"); 
     95    } 
     96  } 
    7697  eventer_impl_epoch = malloc(sizeof(struct timeval)); 
    7798  gettimeofday(eventer_impl_epoch, NULL); 
     
    104125  job->fd_event = e; 
    105126  gettimeofday(&job->create_time, NULL); 
    106   if(e->whence.tv_sec) { 
     127  /* If we're debugging the eventer, these cross thread timeouts will 
     128   * make it impossible for us to slowly trace an asynch job. */ 
     129  if(!EVENTER_DEBUGGING && e->whence.tv_sec) { 
    107130    job->timeout_event = eventer_alloc(); 
    108131    memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence)); 
  • src/eventer/eventer_jobq.c

    rec98850 ra907717  
    226226    pthread_mutex_lock(&job->lock); 
    227227    if(job->timeout_triggered) { 
     228      struct timeval diff, diff2; 
    228229      /* This happens if the timeout occurred before we even had the change 
    229230       * to pull the job off the queue.  We must be in bad shape here. 
     
    231232      noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p]\n", pthread_self(), jobq->queue_name, job); 
    232233      gettimeofday(&job->finish_time, NULL); /* We're done */ 
     234      sub_timeval(job->finish_time, job->fd_event->whence, &diff); 
     235      sub_timeval(job->finish_time, job->create_time, &diff2); 
     236      noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p] -%0.6f (%0.6f)\n", 
     237            pthread_self(), jobq->queue_name, job, 
     238            (float)diff.tv_sec + (float)diff.tv_usec/1000000.0, 
     239            (float)diff2.tv_sec + (float)diff2.tv_usec/1000000.0); 
    233240      pthread_mutex_unlock(&job->lock); 
    234241      job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP, 
  • src/noit_jlog_listener.c

    rdb9d1c2 ra907717  
    1515#include <sys/ioctl.h> 
    1616#define MAX_ROWS_AT_ONCE 1000 
    17 #define DEFAULT_SECONDS_BETWEEN_BATCHES 1 
     17#define DEFAULT_SECONDS_BETWEEN_BATCHES 10 
     18 
     19static noit_atomic32_t tmpfeedcounter = 0; 
    1820 
    1921void 
     
    2325                                 NOIT_JLOG_DATA_FEED, 
    2426                                 noit_jlog_handler); 
     27  noit_control_dispatch_delegate(noit_control_dispatch, 
     28                                 NOIT_JLOG_DATA_TEMP_FEED, 
     29                                 noit_jlog_handler); 
    2530} 
    2631 
    2732typedef struct { 
    2833  jlog_ctx *jlog; 
     34  char *subscriber; 
    2935  jlog_id chkpt; 
    3036  jlog_id start; 
     
    4349void 
    4450noit_jlog_closure_free(noit_jlog_closure_t *jcl) { 
    45   if(jcl->jlog) jlog_ctx_close(jcl->jlog); 
     51  if(jcl->jlog) { 
     52    if(jcl->subscriber) { 
     53      if(jcl->subscriber[0] == '~') 
     54        jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber); 
     55      free(jcl->subscriber); 
     56    } 
     57    jlog_ctx_close(jcl->jlog); 
     58  } 
    4659  free(jcl); 
    4760} 
     
    112125  while(1) { 
    113126    jlog_id client_chkpt; 
    114     int sleeptime = DEFAULT_SECONDS_BETWEEN_BATCHES; 
     127    int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ? 
     128                      1 : DEFAULT_SECONDS_BETWEEN_BATCHES; 
    115129    jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt); 
    116130    jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish); 
     
    184198    noit_log_stream_t ls; 
    185199    const char *logname; 
    186     char path[PATH_MAX], *sub; 
     200    char path[PATH_MAX], subscriber[32], *sub; 
    187201    jcl = ac->service_ctx = noit_jlog_closure_alloc(); 
    188202    if(!noit_hash_retr_str(ac->config, 
     
    204218      goto socket_error; 
    205219    } 
    206     if(!ac->remote_cn) { 
    207       noitL(noit_error, "jlog transit started to unidentified party.\n"); 
    208       goto socket_error; 
    209     } 
     220    if(ac->cmd == NOIT_JLOG_DATA_FEED) { 
     221      if(!ac->remote_cn) { 
     222        noitL(noit_error, "jlog transit started to unidentified party.\n"); 
     223        goto socket_error; 
     224      } 
     225      strlcpy(subscriber, ac->remote_cn, sizeof(subscriber)); 
     226    } 
     227    else { 
     228      snprintf(subscriber, sizeof(subscriber), 
     229               "~%07d", noit_atomic_inc32(&tmpfeedcounter)); 
     230    } 
     231    jcl->subscriber = strdup(subscriber); 
    210232 
    211233    strlcpy(path, ls->path, sizeof(path)); 
     
    221243 
    222244    jcl->jlog = jlog_new(path); 
    223     if(jlog_ctx_open_reader(jcl->jlog, ac->remote_cn) == -1) { 
    224       noitL(noit_error, "jlog reader[%s] error: %s\n", ac->remote_cn, 
     245    if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) 
     246      if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) 
     247        noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber, 
     248              jlog_ctx_err_string(jcl->jlog)); 
     249    if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) { 
     250      noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber, 
    225251            jlog_ctx_err_string(jcl->jlog)); 
    226252      goto socket_error; 
  • src/noit_jlog_listener.h

    r6bb9ef8 ra907717  
    1111 
    1212#define NOIT_JLOG_DATA_FEED 0xda7afeed 
     13#define NOIT_JLOG_DATA_TEMP_FEED 0x7e66feed 
    1314 
    1415API_EXPORT(void) 
  • src/noit_listener.c

    rdb9d1c2 ra907717  
    368368  } 
    369369 
    370   cmd = ntohl(cmd); 
     370  ac->cmd = ntohl(cmd); 
    371371  /* Lookup cmd and dispatch */ 
    372372  if(noit_hash_retrieve(&listener_commands, 
     
    376376    delegation_table = (noit_hash_table *)vdelegation_table; 
    377377    if(noit_hash_retrieve(delegation_table, 
    378                           (char *)&cmd, sizeof(cmd), &vfunc)) { 
     378                          (char *)&ac->cmd, sizeof(ac->cmd), &vfunc)) { 
    379379      e->callback = *((eventer_func_t *)vfunc); 
    380380      return e->callback(e, mask, closure, now); 
  • src/noit_listener.h

    rdb9d1c2 ra907717  
    2929  void *service_ctx; 
    3030  eventer_func_t dispatch; 
     31  u_int32_t cmd; 
    3132} acceptor_closure_t; 
    3233 
  • src/stratcon.conf.in

    re2900c8 ra907717  
    2929    <noit address="127.0.0.1" port="34332" /> 
    3030  </noits> 
     31 
     32  <iep disable="false"> <!-- false the default --> 
     33    <start directory="%iepdbdir%" 
     34           command="%iepbindir%/run-iep.sh" /> 
     35    <queries> 
     36      <query id="ce6bf8d2-3dd7-11de-a45c-a7df160cba9e" topic="status"> 
     37        select * from NoitStatus 
     38      </query> 
     39    </queries> 
     40  </iep> 
    3141 
    3242  <database> 
  • src/stratcon_datastore.c

    rb3a7d8c ra907717  
    430430  noit_hash_table *t; 
    431431 
     432  dsn[0] = '\0'; 
     433  t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 
     434  while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 
     435    if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); 
     436    strlcat(dsn, k, sizeof(dsn)); 
     437    strlcat(dsn, "=", sizeof(dsn)); 
     438    strlcat(dsn, v, sizeof(dsn)); 
     439  } 
     440 
    432441  if(cq->dbh) { 
    433442    if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; 
     
    439448  } 
    440449 
    441   dsn[0] = '\0'; 
    442   t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 
    443   while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 
    444     if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); 
    445     strlcat(dsn, k, sizeof(dsn)); 
    446     strlcat(dsn, "=", sizeof(dsn)); 
    447     strlcat(dsn, v, sizeof(dsn)); 
    448   } 
    449450  cq->dbh = PQconnectdb(dsn); 
    450451  if(!cq->dbh) return -1; 
     
    524525stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, 
    525526                                  struct timeval *now) { 
     527  int i; 
    526528  conn_q *cq = closure; 
    527529  ds_job_detail *current, *last_sp; 
     
    532534 full_monty: 
    533535  /* Make sure we have a connection */ 
     536  i = 1; 
    534537  while(stratcon_database_connect(cq)) { 
    535538    noitL(noit_error, "Error connecting to database\n"); 
    536     sleep(1); 
     539    sleep(i); 
     540    i *= 2; 
     541    i = MIN(i, 16); 
    537542  } 
    538543 
  • src/stratcon_iep.c

    rb3a7d8c ra907717  
    88#include "utils/noit_log.h" 
    99#include "utils/noit_b64.h" 
     10#include "noit_jlog_listener.h" 
     11#include "stratcon_jlog_streamer.h" 
    1012#include "stratcon_datastore.h" 
    1113#include "noit_conf.h" 
    1214#include "noit_check.h" 
    1315 
    14 #define SWEEP_DELAY { 0L, 100000L } /* 100ms */ 
     16#include <unistd.h> 
     17#include <sys/fcntl.h> 
     18#include <assert.h> 
     19#include <libxml/parser.h> 
     20#include <libxml/tree.h> 
     21#include <libxml/xmlsave.h> 
     22#ifdef OPENWIRE 
     23#include "amqcs.h" 
     24#else 
     25#include "stomp/stomp.h" 
     26#endif 
     27 
    1528eventer_jobq_t iep_jobq; 
    1629 
    17 struct noit_line_list { 
    18   char *line; 
    19   struct noit_line_list *next; 
     30struct iep_thread_driver { 
     31#ifdef OPENWIRE 
     32  amqcs_connect_options connect_options; 
     33  amqcs_connection *connection; 
     34#else 
     35  stomp_connection *connection; 
     36#endif 
     37  apr_pool_t *pool; 
    2038}; 
    21 struct iep_batch { 
    22   /* This lock only needs to be used for inserting... (in the pivot) 
    23    * Once the batch is "done" it is submitted to a thread and has 
    24    * no more concurrent access. 
    25    */ 
    26   pthread_mutex_t lock; 
    27   int batch_size; 
    28   struct noit_line_list *head; 
    29   struct noit_line_list *tail; 
     39pthread_key_t iep_connection; 
     40 
     41struct iep_job_closure { 
     42  char *line;       /* This is a copy and gets trashed during processing */ 
     43  xmlDocPtr doc; 
     44  char *doc_str; 
     45  apr_pool_t *pool; 
    3046}; 
    31 /* We safely insert into the pivot... then lock and flip the batch 
    32  * into a self-contained iep_batch which is the closure to the asynch 
    33  * function that inserts it into OpenESB. 
    34  */ 
    35 static struct iep_batch pivot_batch; 
     47 
     48static void 
     49stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 
     50                                struct sockaddr *remote, void *operand); 
     51static void 
     52start_iep_daemon(); 
    3653 
    3754static int 
    38 stratcon_iep_batch_add_line(const char *data) { 
    39   int previous_size; 
    40   struct noit_line_list *nnode; 
    41   nnode = malloc(sizeof(*nnode)); 
    42   nnode->line = strdup(data); 
    43   nnode->next = NULL; 
    44   pthread_mutex_lock(&pivot_batch.lock); 
    45   if(!pivot_batch.tail) pivot_batch.head = pivot_batch.tail = nnode; 
    46   else { 
    47     pivot_batch.tail->next = nnode; 
    48     pivot_batch.tail = nnode; 
    49   } 
    50   previous_size = pivot_batch.batch_size; 
    51   pivot_batch.batch_size++; 
    52   pthread_mutex_unlock(&pivot_batch.lock); 
    53   return previous_size; 
    54 
    55  
    56 static struct iep_batch * 
    57 stratcon_iep_batch_copytrunc() { 
    58   struct iep_batch *nbatch; 
    59   nbatch = calloc(1, sizeof(*nbatch)); 
    60   /* Lock */ 
    61   pthread_mutex_lock(&pivot_batch.lock); 
    62   /* Copy */ 
    63   nbatch->batch_size = pivot_batch.batch_size; 
    64   nbatch->head = pivot_batch.head; 
    65   nbatch->tail = pivot_batch.tail; 
    66   /* Trunc */ 
    67   pivot_batch.batch_size = 0; 
    68   pivot_batch.head = pivot_batch.tail = NULL; 
    69   /* Lock */ 
    70   pthread_mutex_unlock(&pivot_batch.lock); 
    71   return nbatch; 
     55bust_to_parts(char *in, char **p, int len) { 
     56  int cnt = 0; 
     57  char *s = in; 
     58  while(cnt < len) { 
     59    p[cnt++] = s; 
     60    while(*s && *s != '\t') s++; 
     61    if(!*s) break; 
     62    *s++ = '\0'; 
     63  } 
     64  while(*s) s++; /* Move to end */ 
     65  if(s > in && *(s-1) == '\n') *(s-1) = '\0'; /* chomp */ 
     66  return cnt; 
     67
     68 
     69#define ADDCHILD(a,b) \ 
     70  xmlNewTextChild(root, NULL, (xmlChar *)(a), (xmlChar *)(b)) 
     71#define NEWDOC(xmldoc,n,stanza) do { \ 
     72  xmlNodePtr root; \ 
     73  xmldoc = xmlNewDoc((xmlChar *)"1.0"); \ 
     74  root = xmlNewDocNode(xmldoc, NULL, (xmlChar *)(n), NULL); \ 
     75  xmlDocSetRootElement(xmldoc, root); \ 
     76  stanza \ 
     77} while(0) 
     78 
     79 
     80static xmlDocPtr 
     81stratcon_iep_doc_from_status(char *data) { 
     82  xmlDocPtr doc; 
     83  char *parts[7]; 
     84  if(bust_to_parts(data, parts, 7) != 7) return NULL; 
     85  /* 'S' TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE */ 
     86  NEWDOC(doc, "NoitStatus", 
     87         { 
     88           ADDCHILD("id", parts[2]); 
     89           ADDCHILD("state", parts[3]); 
     90           ADDCHILD("availability", parts[4]); 
     91           ADDCHILD("duration", parts[5]); 
     92           ADDCHILD("status", parts[6]); 
     93         }); 
     94  return doc; 
     95
     96 
     97static xmlDocPtr 
     98stratcon_iep_doc_from_metric(char *data) { 
     99  xmlDocPtr doc; 
     100  char *parts[6]; 
     101  const char *rootname = "NoitMetricNumeric"; 
     102  const char *valuename = "value"; 
     103  if(bust_to_parts(data, parts, 6) != 6) return NULL; 
     104  /*  'M' TIMESTAMP UUID NAME TYPE VALUE */ 
     105 
     106  if(*parts[4] == METRIC_STRING) { 
     107    rootname = "NoitMetricText"; 
     108    valuename = "message"; 
     109  } 
     110  NEWDOC(doc, rootname, 
     111         { 
     112           ADDCHILD("id", parts[2]); 
     113           ADDCHILD("name", parts[3]); 
     114           ADDCHILD(valuename, parts[5]); 
     115         }); 
     116  return doc; 
     117
     118 
     119static xmlDocPtr 
     120stratcon_iep_doc_from_query(char *data) { 
     121  xmlDocPtr doc; 
     122  char *parts[4]; 
     123  if(bust_to_parts(data, parts, 4) != 4) return NULL; 
     124  /*  'Q' ID NAME QUERY  */ 
     125 
     126  NEWDOC(doc, "StratconQuery", 
     127         { 
     128           ADDCHILD("id", parts[1]); 
     129           ADDCHILD("name", parts[2]); 
     130           ADDCHILD("expression", parts[3]); 
     131         }); 
     132  return doc; 
     133
     134 
     135static xmlDocPtr 
     136stratcon_iep_doc_from_querystop(char *data) { 
     137  xmlDocPtr doc; 
     138  char *parts[2]; 
     139  if(bust_to_parts(data, parts, 2) != 2) return NULL; 
     140  /*  'Q' ID */ 
     141 
     142  NEWDOC(doc, "StratconQueryStop", 
     143         { 
     144           xmlNodeSetContent(root, (xmlChar *)parts[1]); 
     145         }); 
     146  return doc; 
     147
     148 
     149static xmlDocPtr 
     150stratcon_iep_doc_from_line(char *data) { 
     151  if(data) { 
     152    switch(*data) { 
     153      case 'S': return stratcon_iep_doc_from_status(data); 
     154      case 'M': return stratcon_iep_doc_from_metric(data); 
     155      case 'Q': return stratcon_iep_doc_from_query(data); 
     156      case 'q': return stratcon_iep_doc_from_querystop(data); 
     157    } 
     158  } 
     159  return NULL; 
     160
     161 
     162static float 
     163stratcon_iep_age_from_line(char *data, struct timeval now) { 
     164  float n, t; 
     165  if(data && (*data == 'S' || *data == 'M')) { 
     166    if(data[1] != '\t') return 0; 
     167    t = strtof(data + 2, NULL); 
     168    n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0; 
     169    return n - t; 
     170  } 
     171  return 0; 
     172
     173 
     174void stratcon_iep_submit_queries() { 
     175  int i, cnt = 0; 
     176  noit_conf_section_t *query_configs; 
     177  char path[256]; 
     178 
     179  snprintf(path, sizeof(path), "/stratcon/iep/queries//query"); 
     180  query_configs = noit_conf_get_sections(NULL, path, &cnt); 
     181  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 
     182  for(i=0; i<cnt; i++) { 
     183    char id[UUID_STR_LEN]; 
     184    char topic[256]; 
     185    char *query; 
     186    char *line; 
     187    int line_len; 
     188 
     189    if(!noit_conf_get_stringbuf(query_configs[i], 
     190                                "self::node()/@id", 
     191                                id, sizeof(id))) { 
     192      noitL(noit_error, "No uuid specified in query\n"); 
     193      continue; 
     194    } 
     195    if(!noit_conf_get_stringbuf(query_configs[i], 
     196                                "ancestor-or-self::node()/@topic", 
     197                                topic, sizeof(topic))) { 
     198      noitL(noit_error, "No topic specified in query\n"); 
     199      continue; 
     200    } 
     201    if(!noit_conf_get_string(query_configs[i], "self::node()", 
     202                             &query)) { 
     203      noitL(noit_error, "No contents specified in query\n"); 
     204      continue; 
     205    } 
     206    line_len = 4 /* 3 tabs + \0 */ + 
     207               1 /* 'Q' */ + 1 /* '\n' */ + 
     208               strlen(id) + strlen(topic) + strlen(query); 
     209    line = malloc(line_len); 
     210    snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query); 
     211    free(query); 
     212    query = line; 
     213    while(query[0] && query[1]) { 
     214      if(*query == '\n') *query = ' '; 
     215      query++; 
     216    } 
     217    stratcon_iep_datastore_onlooker(DS_OP_INSERT, NULL, line); 
     218    free(line); 
     219  } 
     220
     221 
     222static char * 
     223stratcon__xml_doc_to_str(xmlDocPtr doc) { 
     224  char *rv; 
     225  xmlSaveCtxtPtr savectx; 
     226  xmlBufferPtr xmlbuffer; 
     227  xmlbuffer = xmlBufferCreate(); 
     228  savectx = xmlSaveToBuffer(xmlbuffer, "utf8", 1); 
     229  xmlSaveDoc(savectx, doc); 
     230  xmlSaveClose(savectx); 
     231  rv = strdup((const char *)xmlBufferContent(xmlbuffer)); 
     232  xmlBufferFree(xmlbuffer); 
     233  return rv; 
     234
     235 
     236static 
     237struct iep_thread_driver *stratcon_iep_get_connection() { 
     238  apr_status_t rc; 
     239  struct iep_thread_driver *driver; 
     240  driver = pthread_getspecific(iep_connection); 
     241  if(!driver) { 
     242    driver = calloc(1, sizeof(*driver)); 
     243    pthread_setspecific(iep_connection, driver); 
     244  } 
     245 
     246  if(!driver->pool) { 
     247    if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL; 
     248  } 
     249 
     250  if(!driver->connection) { 
     251    int port; 
     252    char hostname[128]; 
     253    if(!noit_conf_get_int(NULL, "/stratcon/iep/port", &port)) 
     254      port = 61613; 
     255    if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/hostname", 
     256                                hostname, sizeof(hostname))) 
     257      strlcpy(hostname, "127.0.0.1", sizeof(hostname)); 
     258#ifdef OPENWIRE 
     259    memset(&driver->connect_options, 0, sizeof(driver->connect_options)); 
     260    strlcpy(driver->connect_options.hostname, hostname, 
     261            sizeof(driver->connect_options.hostname)); 
     262    driver->connect_options.port = port; 
     263    if(amqcs_connect(&driver->connection, &driver->connect_options, 
     264                     driver->pool) != APR_SUCCESS) { 
     265      noitL(noit_error, "MQ connection failed\n"); 
     266      return NULL; 
     267    } 
     268#else 
     269    if(stomp_connect(&driver->connection, hostname, port, 
     270                     driver->pool)!= APR_SUCCESS) { 
     271      noitL(noit_error, "MQ connection failed\n"); 
     272      return NULL; 
     273    } 
     274 
     275    { 
     276      stomp_frame frame; 
     277      frame.command = "CONNECT"; 
     278      frame.headers = apr_hash_make(driver->pool); 
     279/* 
     280      We don't use login/pass 
     281      apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, ""); 
     282      apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, ""); 
     283*/ 
     284      frame.body = NULL; 
     285      frame.body_length = -1; 
     286      rc = stomp_write(driver->connection, &frame, driver->pool); 
     287      if(rc != APR_SUCCESS) { 
     288        noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc); 
     289        stomp_disconnect(&driver->connection); 
     290        return NULL; 
     291      } 
     292    }   
     293    { 
     294      stomp_frame *frame; 
     295      rc = stomp_read(driver->connection, &frame, driver->pool); 
     296      if (rc != APR_SUCCESS) { 
     297        noitL(noit_error, "MQ STOMP CONNECT bad response, %d\n", rc); 
     298        stomp_disconnect(&driver->connection); 
     299        return NULL; 
     300      } 
     301      noitL(noit_error, "Response: %s, %s\n", frame->command, frame->body); 
     302     } 
     303#endif 
     304     stratcon_iep_submit_queries(); 
     305  } 
     306 
     307  return driver; 
    72308} 
    73309 
     
    75311stratcon_iep_submitter(eventer_t e, int mask, void *closure, 
    76312                       struct timeval *now) { 
    77   struct iep_batch *batch = closure; 
     313  float age; 
     314  struct iep_job_closure *job = closure; 
    78315  /* We only play when it is an asynch event */ 
    79316  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     
    81318  if(mask & EVENTER_ASYNCH_CLEANUP) { 
    82319    /* free all the memory associated with the batch */ 
    83     while(batch->head) { 
    84       struct noit_line_list *l; 
    85       l = batch->head; 
    86       batch->head = l->next; 
    87       free(l->line); 
    88       free(l); 
    89     } 
    90     free(batch); 
     320    if(job) { 
     321      if(job->line) free(job->line); 
     322      if(job->doc_str) free(job->doc_str); 
     323      if(job->doc) xmlFreeDoc(job->doc); 
     324      if(job->pool) apr_pool_destroy(job->pool); 
     325      free(job); 
     326    } 
    91327    return 0; 
    92328  } 
    93329 
    94   /* pull from batch and submit */ 
    95   noitL(noit_error, "Firing stratcon_iep_submitter on a batch of %d events\n", 
    96         batch->batch_size); 
    97  
    98   return 0; 
    99 
    100  
    101 static int 
    102 stratcon_iep_batch_sweep(eventer_t e, int mask, void *closure, 
    103                          struct timeval *now) { 
    104   struct iep_batch *nbatch; 
    105   struct timeval iep_timeout = { 5L, 0L }; 
    106   eventer_t newe; 
    107  
    108   nbatch = stratcon_iep_batch_copytrunc(); 
    109   if(nbatch->batch_size == 0) { 
    110     /* misfire */ 
    111     free(nbatch); 
     330  if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) { 
     331    noitL(noit_debug, "Skipping old event %f second old.\n", age); 
    112332    return 0; 
    113333  } 
    114  
    115   newe = eventer_alloc(); 
    116   newe->mask = EVENTER_ASYNCH; 
    117   add_timeval(*now, iep_timeout, &newe->whence); 
    118   newe->callback = stratcon_iep_submitter; 
    119   newe->closure = nbatch; 
    120  
    121   eventer_add_asynch(&iep_jobq, newe); 
     334  job->doc = stratcon_iep_doc_from_line(job->line); 
     335  if(job->doc) { 
     336    job->doc_str = stratcon__xml_doc_to_str(job->doc); 
     337    if(job->doc_str) { 
     338      /* Submit */ 
     339      struct iep_thread_driver *driver; 
     340      driver = stratcon_iep_get_connection(); 
     341      if(driver && driver->pool && driver->connection) { 
     342        apr_status_t rc; 
     343#ifdef OPENWIRE 
     344        ow_ActiveMQQueue *dest; 
     345        ow_ActiveMQTextMessage *message; 
     346 
     347        apr_pool_create(&job->pool, driver->pool); 
     348        message = ow_ActiveMQTextMessage_create(job->pool); 
     349        message->content = 
     350          ow_byte_array_create_with_data(job->pool,strlen(job->doc_str), 
     351                                         job->doc_str); 
     352        dest = ow_ActiveMQQueue_create(job->pool); 
     353        dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");          
     354        rc = amqcs_send(driver->connection, 
     355                        (ow_ActiveMQDestination*)dest, 
     356                        (ow_ActiveMQMessage*)message, 
     357                        1,4,0,job->pool); 
     358        if(rc != APR_SUCCESS) { 
     359          noitL(noit_error, "MQ send failed, disconnecting\n"); 
     360          if(driver->connection) amqcs_disconnect(&driver->connection); 
     361          driver->connection = NULL; 
     362        } 
     363#else 
     364        stomp_frame out; 
     365 
     366        apr_pool_create(&job->pool, driver->pool); 
     367 
     368        out.command = "SEND"; 
     369        out.headers = apr_hash_make(job->pool); 
     370        apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 
     371        apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 
     372       
     373        out.body_length = -1; 
     374        out.body = job->doc_str; 
     375        rc = stomp_write(driver->connection, &out, job->pool); 
     376        if(rc != APR_SUCCESS) { 
     377          noitL(noit_error, "STOMP send failed, disconnecting\n"); 
     378          if(driver->connection) stomp_disconnect(&driver->connection); 
     379          driver->connection = NULL; 
     380        } 
     381#endif 
     382      } 
     383      else { 
     384        noitL(noit_error, "Not submitting event, no MQ\n"); 
     385      } 
     386    } 
     387  } 
     388  else { 
     389    noitL(noit_error, "no iep handler for: '%s'\n", job->line); 
     390  } 
    122391  return 0; 
    123392} 
     
    126395stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 
    127396                                struct sockaddr *remote, void *operand) { 
     397  struct iep_job_closure *jc; 
     398  eventer_t newe; 
     399  struct timeval __now, iep_timeout = { 20L, 0L }; 
    128400  /* We only care about inserts */ 
     401 
     402  if(op == DS_OP_CHKPT) { 
     403    eventer_add((eventer_t) operand); 
     404    return; 
     405  } 
    129406  if(op != DS_OP_INSERT) return; 
    130407 
    131408  /* process operand and push onto queue */ 
    132   if(stratcon_iep_batch_add_line((char *)operand) == 0) { 
    133     /* If this is the first in the queue, then we need to schedule a 
    134      * sweeper to submit the queue. 
    135      */ 
    136     eventer_t e; 
    137     struct timeval __now, sweep_delay = SWEEP_DELAY; 
    138  
    139     gettimeofday(&__now, NULL); 
    140  
    141     e = eventer_alloc(); 
    142     e->callback = stratcon_iep_batch_sweep; 
    143     add_timeval(__now, sweep_delay, &e->whence); 
    144     e->closure = NULL; /* we can only do one thing */ 
    145     e->mask = EVENTER_TIMER; 
    146     eventer_add(e); 
    147   } 
     409  gettimeofday(&__now, NULL); 
     410  newe = eventer_alloc(); 
     411  newe->mask = EVENTER_ASYNCH; 
     412  add_timeval(__now, iep_timeout, &newe->whence); 
     413  newe->callback = stratcon_iep_submitter; 
     414  jc = calloc(1, sizeof(*jc)); 
     415  jc->line = strdup(operand); 
     416  newe->closure = jc; 
     417 
     418  eventer_add_asynch(&iep_jobq, newe); 
     419
     420 
     421static void connection_destroy(void *vd) { 
     422  struct iep_thread_driver *driver = vd; 
     423#ifdef OPENWIRE 
     424  if(driver->connection) amqcs_disconnect(&driver->connection); 
     425#else 
     426  if(driver->connection) stomp_disconnect(&driver->connection); 
     427#endif 
     428  if(driver->pool) apr_pool_destroy(driver->pool); 
     429  free(driver); 
     430
     431 
     432jlog_streamer_ctx_t * 
     433stratcon_jlog_streamer_iep_ctx_alloc(void) { 
     434  jlog_streamer_ctx_t *ctx; 
     435  ctx = stratcon_jlog_streamer_ctx_alloc(); 
     436  ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED); 
     437  ctx->push = stratcon_iep_datastore_onlooker; 
     438  return ctx; 
     439
     440 
     441struct iep_daemon_info { 
     442  pid_t child; 
     443  int stdin_pipe[2]; 
     444  int stderr_pipe[2]; 
     445  char *directory; 
     446  char *command; 
     447}; 
     448 
     449static void 
     450iep_daemon_info_free(struct iep_daemon_info *info) { 
     451  if(!info) return; 
     452  if(info->directory) free(info->directory); 
     453  if(info->command) free(info->command); 
     454  if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]); 
     455  if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]); 
     456  if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]); 
     457  if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]); 
     458  free(info); 
     459
     460 
     461static int 
     462stratcon_iep_err_handler(eventer_t e, int mask, void *closure, 
     463                         struct timeval *now) { 
     464  int len, newmask; 
     465  char buff[4096]; 
     466  struct iep_daemon_info *info = (struct iep_daemon_info *)closure; 
     467 
     468  if(mask & EVENTER_EXCEPTION) { 
     469    int rv; 
     470   read_error: 
     471    kill(SIGKILL, info->child); 
     472    if(waitpid(info->child, &rv, 0) != info->child) { 
     473      noitL(noit_error, "Failed to reap IEP daemon\n"); 
     474      exit(-1); 
     475    } 
     476    noitL(noit_error, "IEP daemon is done, starting a new one\n"); 
     477    start_iep_daemon(); 
     478    eventer_remove_fd(e->fd); 
     479    e->opset->close(e->fd, &newmask, e); 
     480    return 0; 
     481  } 
     482  while(1) { 
     483    len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e); 
     484    if(len == -1 && (errno == EAGAIN || errno == EINTR)) 
     485      return newmask | EVENTER_EXCEPTION; 
     486    if(len <= 0) goto read_error; 
     487    assert(len < sizeof(buff)); 
     488    buff[len] = '\0'; 
     489    noitL(noit_error, "IEP: %s", buff); 
     490  } 
     491
     492 
     493static void 
     494start_iep_daemon() { 
     495  eventer_t newe; 
     496  struct iep_daemon_info *info; 
     497 
     498  info = calloc(1, sizeof(*info)); 
     499  info->stdin_pipe[0] = info->stdin_pipe[1] = -1; 
     500  info->stderr_pipe[0] = info->stderr_pipe[1] = -1; 
     501 
     502  if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@directory", 
     503                           &info->directory)) 
     504    info->directory = strdup("."); 
     505  if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@command", 
     506                           &info->command)) { 
     507    noitL(noit_error, "No IEP start command provided.  You're on your own.\n"); 
     508    goto bail; 
     509  } 
     510  if(pipe(info->stdin_pipe) != 0 || 
     511     pipe(info->stderr_pipe) != 0) { 
     512    noitL(noit_error, "pipe: %s\n", strerror(errno)); 
     513    goto bail; 
     514  } 
     515  info->child = fork(); 
     516  if(info->child == -1) { 
     517    noitL(noit_error, "fork: %s\n", strerror(errno)); 
     518    goto bail; 
     519  } 
     520  if(info->child == 0) { 
     521    char *argv[2] = { "run-iep", NULL }; 
     522    int stdout_fileno; 
     523 
     524    if(chdir(info->directory) != 0) { 
     525      noitL(noit_error, "Starting IEP daemon, chdir failed: %s\n", 
     526            strerror(errno)); 
     527      exit(-1); 
     528    } 
     529 
     530    close(info->stdin_pipe[1]); 
     531    close(info->stderr_pipe[0]); 
     532    dup2(info->stdin_pipe[0], 0); 
     533    dup2(info->stderr_pipe[1], 2); 
     534    stdout_fileno = open("/dev/null", O_WRONLY); 
     535    dup2(stdout_fileno, 1); 
     536 
     537    exit(execv(info->command, argv)); 
     538  } 
     539  /* in the parent */ 
     540  socklen_t on = 1; 
     541 
     542  close(info->stdin_pipe[0]); 
     543  info->stdin_pipe[0] = -1; 
     544  close(info->stderr_pipe[1]); 
     545  info->stderr_pipe[1] = -1; 
     546  if(ioctl(info->stderr_pipe[0], FIONBIO, &on)) { 
     547    goto bail; 
     548  } 
     549 
     550  newe = eventer_alloc(); 
     551  newe->fd = info->stderr_pipe[0]; 
     552  newe->mask = EVENTER_READ | EVENTER_EXCEPTION; 
     553  newe->callback = stratcon_iep_err_handler; 
     554  newe->closure = info; 
     555  eventer_add(newe); 
     556  info = NULL; 
     557 
     558  return; 
     559 
     560 bail: 
     561  if(info) { 
     562    iep_daemon_info_free(info); 
     563  } 
     564  noitL(noit_error, "Failed to start IEP daemon\n"); 
     565  exit(-1); 
     566  return; 
    148567} 
    149568 
    150569void 
    151570stratcon_iep_init() { 
    152   eventer_name_callback("stratcon_iep_batch_sweep", stratcon_iep_batch_sweep); 
     571  noit_boolean disabled = noit_false; 
     572  apr_initialize(); 
     573  atexit(apr_terminate);    
     574 
     575  if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) && 
     576     disabled == noit_true) { 
     577    noitL(noit_error, "IEP system is disabled!\n"); 
     578    return; 
     579  } 
     580 
     581  start_iep_daemon(); 
     582 
    153583  eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter); 
     584  pthread_key_create(&iep_connection, connection_destroy); 
    154585 
    155586  /* start up a thread pool of one */ 
     
    159590  eventer_jobq_increase_concurrency(&iep_jobq); 
    160591 
    161   /* Setup our pivot batch */ 
    162   memset(&pivot_batch, 0, sizeof(pivot_batch)); 
    163   pthread_mutex_init(&pivot_batch.lock, NULL); 
    164  
    165   /* Add our onlooker */ 
    166   stratcon_datastore_register_onlooker(stratcon_iep_datastore_onlooker); 
    167 } 
    168  
     592  /* setup our live jlog stream */ 
     593  stratcon_streamer_connection(NULL, NULL, 
     594                               stratcon_jlog_recv_handler, 
     595                               (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc, 
     596                               NULL, 
     597                               jlog_streamer_ctx_free); 
     598} 
     599 
  • src/stratcon_jlog_streamer.c

    re7ae97b ra907717  
    99#include "utils/noit_hash.h" 
    1010#include "utils/noit_log.h" 
    11 #include "jlog/jlog.h" 
    1211#include "noit_jlog_listener.h" 
    1312#include "stratcon_datastore.h" 
     
    2827noit_hash_table noits = NOIT_HASH_EMPTY; 
    2928 
    30 typedef struct jlog_streamer_ctx_t { 
    31   int bytes_expected; 
    32   int bytes_read; 
    33   char *buffer;         /* These guys are for doing partial reads */ 
    34  
    35   enum { 
    36     WANT_INITIATE = 0, 
    37     WANT_COUNT = 1, 
    38     WANT_HEADER = 2, 
    39     WANT_BODY = 3, 
    40     WANT_CHKPT = 4, 
    41   } state; 
    42   int count;            /* Number of jlog messages we need to read */ 
    43   struct { 
    44     jlog_id   chkpt; 
    45     u_int32_t tv_sec; 
    46     u_int32_t tv_usec; 
    47     u_int32_t message_len; 
    48   } header; 
    49 } jlog_streamer_ctx_t; 
    50  
    5129static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx); 
    5230 
    5331jlog_streamer_ctx_t * 
    54 jlog_streamer_ctx_alloc(void) { 
     32stratcon_jlog_streamer_datastore_ctx_alloc(void) { 
     33  jlog_streamer_ctx_t *ctx; 
     34  ctx = stratcon_jlog_streamer_ctx_alloc(); 
     35  ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED); 
     36  ctx->push = stratcon_datastore_push; 
     37  return ctx; 
     38
     39jlog_streamer_ctx_t * 
     40stratcon_jlog_streamer_ctx_alloc(void) { 
    5541  jlog_streamer_ctx_t *ctx; 
    5642  ctx = calloc(1, sizeof(*ctx)); 
     
    180166stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure, 
    181167                           struct timeval *now) { 
    182   static u_int32_t jlog_feed_cmd = 0; 
    183168  noit_connection_ctx_t *nctx = closure; 
    184169  jlog_streamer_ctx_t *ctx = nctx->consumer_ctx; 
    185170  int len; 
    186171  jlog_id n_chkpt; 
    187  
    188   if(!jlog_feed_cmd) jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED); 
    189172 
    190173  if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) { 
     
    207190    switch(ctx->state) { 
    208191      case WANT_INITIATE: 
    209         len = e->opset->write(e->fd, &jlog_feed_cmd, sizeof(&jlog_feed_cmd), 
     192        len = e->opset->write(e->fd, &ctx->jlog_feed_cmd, 
     193                              sizeof(&ctx->jlog_feed_cmd), 
    210194                              &mask, e); 
    211195        if(len < 0) { 
     
    213197          goto socket_error; 
    214198        } 
    215         if(len != sizeof(jlog_feed_cmd)) { 
     199        if(len != sizeof(ctx->jlog_feed_cmd)) { 
    216200          noitL(noit_error, "short write on initiating stream.\n"); 
    217201          goto socket_error; 
     
    246230      case WANT_BODY: 
    247231        FULLREAD(e, ctx, (unsigned long)ctx->header.message_len); 
    248         stratcon_datastore_push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 
     232        ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 
    249233        /* Don't free the buffer, it's used by the datastore process. */ 
    250234        ctx->buffer = NULL; 
     
    257241          completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION; 
    258242          ctx->state = WANT_CHKPT; 
    259           stratcon_datastore_push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 
     243          ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 
    260244          noitL(noit_debug, "Pushing batch asynch...\n"); 
    261245          return 0; 
     
    513497  stratcon_streamer_connection(toplevel, NULL, 
    514498                               stratcon_jlog_recv_handler, 
    515                                (void *(*)())jlog_streamer_ctx_alloc, NULL, 
     499                               (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc, 
     500                               NULL, 
    516501                               jlog_streamer_ctx_free); 
    517502} 
  • src/stratcon_jlog_streamer.h

    r21b0c6c ra907717  
    88 
    99#include "noit_conf.h" 
     10#include "jlog/jlog.h" 
    1011#include <netinet/in.h> 
    1112#include <sys/un.h> 
    1213#include <arpa/inet.h> 
     14#include "stratcon_datastore.h" 
    1315 
    1416typedef struct noit_connection_ctx_t { 
     
    3234} noit_connection_ctx_t; 
    3335 
     36typedef struct jlog_streamer_ctx_t { 
     37  u_int32_t jlog_feed_cmd; 
     38  int bytes_expected; 
     39  int bytes_read; 
     40  char *buffer;         /* These guys are for doing partial reads */ 
     41 
     42  enum { 
     43    WANT_INITIATE = 0, 
     44    WANT_COUNT = 1, 
     45    WANT_HEADER = 2, 
     46    WANT_BODY = 3, 
     47    WANT_CHKPT = 4, 
     48  } state; 
     49  int count;            /* Number of jlog messages we need to read */ 
     50  struct { 
     51    jlog_id   chkpt; 
     52    u_int32_t tv_sec; 
     53    u_int32_t tv_usec; 
     54    u_int32_t message_len; 
     55  } header; 
     56 
     57  void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *); 
     58} jlog_streamer_ctx_t; 
     59 
    3460API_EXPORT(void) 
    3561  stratcon_jlog_streamer_init(const char *toplevel); 
    3662API_EXPORT(void) 
    3763  stratcon_jlog_streamer_reload(const char *toplevel); 
     64API_EXPORT(int) 
     65  stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure, 
     66                             struct timeval *now); 
     67API_EXPORT(jlog_streamer_ctx_t *) 
     68  stratcon_jlog_streamer_ctx_alloc(void); 
     69API_EXPORT(void) 
     70  jlog_streamer_ctx_free(void *cl); 
    3871API_EXPORT(void) 
    3972  stratcon_streamer_connection(const char *toplevel, const char *destination, 
  • src/utils/noit_log.c

    rae34340 ra907717  
    8080}; 
    8181 
     82static int 
     83jlog_logio_reopen(noit_log_stream_t ls) { 
     84  char **subs; 
     85  int i; 
     86  /* reopening only has the effect of removing temporary subscriptions */ 
     87  /* (they start with ~ in our hair-brained model */ 
     88 
     89  if(jlog_ctx_list_subscribers(ls->op_ctx, &subs) == -1) { 
     90    noitL(noit_error, "Cannot list subscribers: %s\n", 
     91          jlog_ctx_err_string(ls->op_ctx)); 
     92    return 0; 
     93  } 
     94 
     95  for(i=0;subs[i];i++) 
     96    if(subs[i][0] == '~') 
     97      if(jlog_ctx_remove_subscriber(ls->op_ctx, subs[i]) == -1) 
     98        noitL(noit_error, "Cannot remove subscriber '%s': %s\n", 
     99              subs[i], jlog_ctx_err_string(ls->op_ctx)); 
     100 
     101  jlog_ctx_list_subscribers_dispose(ls->op_ctx, subs); 
     102  return 0; 
     103} 
    82104static int 
    83105jlog_logio_open(noit_log_stream_t ls) { 
     
    126148  } 
    127149  ls->op_ctx = log; 
    128   return 0; 
    129 
    130 static int 
    131 jlog_logio_reopen(noit_log_stream_t ls) { 
     150  /* We do this to clean things up */ 
     151  jlog_logio_reopen(ls); 
    132152  return 0; 
    133153}