Show
Ignore:
Timestamp:
05/12/09 12:48:35 (10 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1242132515 +0000
git-parent:

[17cbef4037fd5c482c89bf8d7e2e0bdb43e3f4ad]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1242132515 +0000
Message:

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon_iep.c

    rb3a7d8c ra907717  
    88#include "utils/noit_log.h" 
    99#include "utils/noit_b64.h" 
     10#include "noit_jlog_listener.h" 
     11#include "stratcon_jlog_streamer.h" 
    1012#include "stratcon_datastore.h" 
    1113#include "noit_conf.h" 
    1214#include "noit_check.h" 
    1315 
    14 #define SWEEP_DELAY { 0L, 100000L } /* 100ms */ 
     16#include <unistd.h> 
     17#include <sys/fcntl.h> 
     18#include <assert.h> 
     19#include <libxml/parser.h> 
     20#include <libxml/tree.h> 
     21#include <libxml/xmlsave.h> 
     22#ifdef OPENWIRE 
     23#include "amqcs.h" 
     24#else 
     25#include "stomp/stomp.h" 
     26#endif 
     27 
    1528eventer_jobq_t iep_jobq; 
    1629 
    17 struct noit_line_list { 
    18   char *line; 
    19   struct noit_line_list *next; 
     30struct iep_thread_driver { 
     31#ifdef OPENWIRE 
     32  amqcs_connect_options connect_options; 
     33  amqcs_connection *connection; 
     34#else 
     35  stomp_connection *connection; 
     36#endif 
     37  apr_pool_t *pool; 
    2038}; 
    21 struct iep_batch { 
    22   /* This lock only needs to be used for inserting... (in the pivot) 
    23    * Once the batch is "done" it is submitted to a thread and has 
    24    * no more concurrent access. 
    25    */ 
    26   pthread_mutex_t lock; 
    27   int batch_size; 
    28   struct noit_line_list *head; 
    29   struct noit_line_list *tail; 
     39pthread_key_t iep_connection; 
     40 
     41struct iep_job_closure { 
     42  char *line;       /* This is a copy and gets trashed during processing */ 
     43  xmlDocPtr doc; 
     44  char *doc_str; 
     45  apr_pool_t *pool; 
    3046}; 
    31 /* We safely insert into the pivot... then lock and flip the batch 
    32  * into a self-contained iep_batch which is the closure to the asynch 
    33  * function that inserts it into OpenESB. 
    34  */ 
    35 static struct iep_batch pivot_batch; 
     47 
     48static void 
     49stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 
     50                                struct sockaddr *remote, void *operand); 
     51static void 
     52start_iep_daemon(); 
    3653 
    3754static int 
    38 stratcon_iep_batch_add_line(const char *data) { 
    39   int previous_size; 
    40   struct noit_line_list *nnode; 
    41   nnode = malloc(sizeof(*nnode)); 
    42   nnode->line = strdup(data); 
    43   nnode->next = NULL; 
    44   pthread_mutex_lock(&pivot_batch.lock); 
    45   if(!pivot_batch.tail) pivot_batch.head = pivot_batch.tail = nnode; 
    46   else { 
    47     pivot_batch.tail->next = nnode; 
    48     pivot_batch.tail = nnode; 
    49   } 
    50   previous_size = pivot_batch.batch_size; 
    51   pivot_batch.batch_size++; 
    52   pthread_mutex_unlock(&pivot_batch.lock); 
    53   return previous_size; 
    54 
    55  
    56 static struct iep_batch * 
    57 stratcon_iep_batch_copytrunc() { 
    58   struct iep_batch *nbatch; 
    59   nbatch = calloc(1, sizeof(*nbatch)); 
    60   /* Lock */ 
    61   pthread_mutex_lock(&pivot_batch.lock); 
    62   /* Copy */ 
    63   nbatch->batch_size = pivot_batch.batch_size; 
    64   nbatch->head = pivot_batch.head; 
    65   nbatch->tail = pivot_batch.tail; 
    66   /* Trunc */ 
    67   pivot_batch.batch_size = 0; 
    68   pivot_batch.head = pivot_batch.tail = NULL; 
    69   /* Lock */ 
    70   pthread_mutex_unlock(&pivot_batch.lock); 
    71   return nbatch; 
     55bust_to_parts(char *in, char **p, int len) { 
     56  int cnt = 0; 
     57  char *s = in; 
     58  while(cnt < len) { 
     59    p[cnt++] = s; 
     60    while(*s && *s != '\t') s++; 
     61    if(!*s) break; 
     62    *s++ = '\0'; 
     63  } 
     64  while(*s) s++; /* Move to end */ 
     65  if(s > in && *(s-1) == '\n') *(s-1) = '\0'; /* chomp */ 
     66  return cnt; 
     67
     68 
     69#define ADDCHILD(a,b) \ 
     70  xmlNewTextChild(root, NULL, (xmlChar *)(a), (xmlChar *)(b)) 
     71#define NEWDOC(xmldoc,n,stanza) do { \ 
     72  xmlNodePtr root; \ 
     73  xmldoc = xmlNewDoc((xmlChar *)"1.0"); \ 
     74  root = xmlNewDocNode(xmldoc, NULL, (xmlChar *)(n), NULL); \ 
     75  xmlDocSetRootElement(xmldoc, root); \ 
     76  stanza \ 
     77} while(0) 
     78 
     79 
     80static xmlDocPtr 
     81stratcon_iep_doc_from_status(char *data) { 
     82  xmlDocPtr doc; 
     83  char *parts[7]; 
     84  if(bust_to_parts(data, parts, 7) != 7) return NULL; 
     85  /* 'S' TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE */ 
     86  NEWDOC(doc, "NoitStatus", 
     87         { 
     88           ADDCHILD("id", parts[2]); 
     89           ADDCHILD("state", parts[3]); 
     90           ADDCHILD("availability", parts[4]); 
     91           ADDCHILD("duration", parts[5]); 
     92           ADDCHILD("status", parts[6]); 
     93         }); 
     94  return doc; 
     95
     96 
     97static xmlDocPtr 
     98stratcon_iep_doc_from_metric(char *data) { 
     99  xmlDocPtr doc; 
     100  char *parts[6]; 
     101  const char *rootname = "NoitMetricNumeric"; 
     102  const char *valuename = "value"; 
     103  if(bust_to_parts(data, parts, 6) != 6) return NULL; 
     104  /*  'M' TIMESTAMP UUID NAME TYPE VALUE */ 
     105 
     106  if(*parts[4] == METRIC_STRING) { 
     107    rootname = "NoitMetricText"; 
     108    valuename = "message"; 
     109  } 
     110  NEWDOC(doc, rootname, 
     111         { 
     112           ADDCHILD("id", parts[2]); 
     113           ADDCHILD("name", parts[3]); 
     114           ADDCHILD(valuename, parts[5]); 
     115         }); 
     116  return doc; 
     117
     118 
     119static xmlDocPtr 
     120stratcon_iep_doc_from_query(char *data) { 
     121  xmlDocPtr doc; 
     122  char *parts[4]; 
     123  if(bust_to_parts(data, parts, 4) != 4) return NULL; 
     124  /*  'Q' ID NAME QUERY  */ 
     125 
     126  NEWDOC(doc, "StratconQuery", 
     127         { 
     128           ADDCHILD("id", parts[1]); 
     129           ADDCHILD("name", parts[2]); 
     130           ADDCHILD("expression", parts[3]); 
     131         }); 
     132  return doc; 
     133
     134 
     135static xmlDocPtr 
     136stratcon_iep_doc_from_querystop(char *data) { 
     137  xmlDocPtr doc; 
     138  char *parts[2]; 
     139  if(bust_to_parts(data, parts, 2) != 2) return NULL; 
     140  /*  'Q' ID */ 
     141 
     142  NEWDOC(doc, "StratconQueryStop", 
     143         { 
     144           xmlNodeSetContent(root, (xmlChar *)parts[1]); 
     145         }); 
     146  return doc; 
     147
     148 
     149static xmlDocPtr 
     150stratcon_iep_doc_from_line(char *data) { 
     151  if(data) { 
     152    switch(*data) { 
     153      case 'S': return stratcon_iep_doc_from_status(data); 
     154      case 'M': return stratcon_iep_doc_from_metric(data); 
     155      case 'Q': return stratcon_iep_doc_from_query(data); 
     156      case 'q': return stratcon_iep_doc_from_querystop(data); 
     157    } 
     158  } 
     159  return NULL; 
     160
     161 
     162static float 
     163stratcon_iep_age_from_line(char *data, struct timeval now) { 
     164  float n, t; 
     165  if(data && (*data == 'S' || *data == 'M')) { 
     166    if(data[1] != '\t') return 0; 
     167    t = strtof(data + 2, NULL); 
     168    n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0; 
     169    return n - t; 
     170  } 
     171  return 0; 
     172
     173 
     174void stratcon_iep_submit_queries() { 
     175  int i, cnt = 0; 
     176  noit_conf_section_t *query_configs; 
     177  char path[256]; 
     178 
     179  snprintf(path, sizeof(path), "/stratcon/iep/queries//query"); 
     180  query_configs = noit_conf_get_sections(NULL, path, &cnt); 
     181  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 
     182  for(i=0; i<cnt; i++) { 
     183    char id[UUID_STR_LEN]; 
     184    char topic[256]; 
     185    char *query; 
     186    char *line; 
     187    int line_len; 
     188 
     189    if(!noit_conf_get_stringbuf(query_configs[i], 
     190                                "self::node()/@id", 
     191                                id, sizeof(id))) { 
     192      noitL(noit_error, "No uuid specified in query\n"); 
     193      continue; 
     194    } 
     195    if(!noit_conf_get_stringbuf(query_configs[i], 
     196                                "ancestor-or-self::node()/@topic", 
     197                                topic, sizeof(topic))) { 
     198      noitL(noit_error, "No topic specified in query\n"); 
     199      continue; 
     200    } 
     201    if(!noit_conf_get_string(query_configs[i], "self::node()", 
     202                             &query)) { 
     203      noitL(noit_error, "No contents specified in query\n"); 
     204      continue; 
     205    } 
     206    line_len = 4 /* 3 tabs + \0 */ + 
     207               1 /* 'Q' */ + 1 /* '\n' */ + 
     208               strlen(id) + strlen(topic) + strlen(query); 
     209    line = malloc(line_len); 
     210    snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query); 
     211    free(query); 
     212    query = line; 
     213    while(query[0] && query[1]) { 
     214      if(*query == '\n') *query = ' '; 
     215      query++; 
     216    } 
     217    stratcon_iep_datastore_onlooker(DS_OP_INSERT, NULL, line); 
     218    free(line); 
     219  } 
     220
     221 
     222static char * 
     223stratcon__xml_doc_to_str(xmlDocPtr doc) { 
     224  char *rv; 
     225  xmlSaveCtxtPtr savectx; 
     226  xmlBufferPtr xmlbuffer; 
     227  xmlbuffer = xmlBufferCreate(); 
     228  savectx = xmlSaveToBuffer(xmlbuffer, "utf8", 1); 
     229  xmlSaveDoc(savectx, doc); 
     230  xmlSaveClose(savectx); 
     231  rv = strdup((const char *)xmlBufferContent(xmlbuffer)); 
     232  xmlBufferFree(xmlbuffer); 
     233  return rv; 
     234
     235 
     236static 
     237struct iep_thread_driver *stratcon_iep_get_connection() { 
     238  apr_status_t rc; 
     239  struct iep_thread_driver *driver; 
     240  driver = pthread_getspecific(iep_connection); 
     241  if(!driver) { 
     242    driver = calloc(1, sizeof(*driver)); 
     243    pthread_setspecific(iep_connection, driver); 
     244  } 
     245 
     246  if(!driver->pool) { 
     247    if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL; 
     248  } 
     249 
     250  if(!driver->connection) { 
     251    int port; 
     252    char hostname[128]; 
     253    if(!noit_conf_get_int(NULL, "/stratcon/iep/port", &port)) 
     254      port = 61613; 
     255    if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/hostname", 
     256                                hostname, sizeof(hostname))) 
     257      strlcpy(hostname, "127.0.0.1", sizeof(hostname)); 
     258#ifdef OPENWIRE 
     259    memset(&driver->connect_options, 0, sizeof(driver->connect_options)); 
     260    strlcpy(driver->connect_options.hostname, hostname, 
     261            sizeof(driver->connect_options.hostname)); 
     262    driver->connect_options.port = port; 
     263    if(amqcs_connect(&driver->connection, &driver->connect_options, 
     264                     driver->pool) != APR_SUCCESS) { 
     265      noitL(noit_error, "MQ connection failed\n"); 
     266      return NULL; 
     267    } 
     268#else 
     269    if(stomp_connect(&driver->connection, hostname, port, 
     270                     driver->pool)!= APR_SUCCESS) { 
     271      noitL(noit_error, "MQ connection failed\n"); 
     272      return NULL; 
     273    } 
     274 
     275    { 
     276      stomp_frame frame; 
     277      frame.command = "CONNECT"; 
     278      frame.headers = apr_hash_make(driver->pool); 
     279/* 
     280      We don't use login/pass 
     281      apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, ""); 
     282      apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, ""); 
     283*/ 
     284      frame.body = NULL; 
     285      frame.body_length = -1; 
     286      rc = stomp_write(driver->connection, &frame, driver->pool); 
     287      if(rc != APR_SUCCESS) { 
     288        noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc); 
     289        stomp_disconnect(&driver->connection); 
     290        return NULL; 
     291      } 
     292    }   
     293    { 
     294      stomp_frame *frame; 
     295      rc = stomp_read(driver->connection, &frame, driver->pool); 
     296      if (rc != APR_SUCCESS) { 
     297        noitL(noit_error, "MQ STOMP CONNECT bad response, %d\n", rc); 
     298        stomp_disconnect(&driver->connection); 
     299        return NULL; 
     300      } 
     301      noitL(noit_error, "Response: %s, %s\n", frame->command, frame->body); 
     302     } 
     303#endif 
     304     stratcon_iep_submit_queries(); 
     305  } 
     306 
     307  return driver; 
    72308} 
    73309 
     
    75311stratcon_iep_submitter(eventer_t e, int mask, void *closure, 
    76312                       struct timeval *now) { 
    77   struct iep_batch *batch = closure; 
     313  float age; 
     314  struct iep_job_closure *job = closure; 
    78315  /* We only play when it is an asynch event */ 
    79316  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     
    81318  if(mask & EVENTER_ASYNCH_CLEANUP) { 
    82319    /* free all the memory associated with the batch */ 
    83     while(batch->head) { 
    84       struct noit_line_list *l; 
    85       l = batch->head; 
    86       batch->head = l->next; 
    87       free(l->line); 
    88       free(l); 
    89     } 
    90     free(batch); 
     320    if(job) { 
     321      if(job->line) free(job->line); 
     322      if(job->doc_str) free(job->doc_str); 
     323      if(job->doc) xmlFreeDoc(job->doc); 
     324      if(job->pool) apr_pool_destroy(job->pool); 
     325      free(job); 
     326    } 
    91327    return 0; 
    92328  } 
    93329 
    94   /* pull from batch and submit */ 
    95   noitL(noit_error, "Firing stratcon_iep_submitter on a batch of %d events\n", 
    96         batch->batch_size); 
    97  
    98   return 0; 
    99 
    100  
    101 static int 
    102 stratcon_iep_batch_sweep(eventer_t e, int mask, void *closure, 
    103                          struct timeval *now) { 
    104   struct iep_batch *nbatch; 
    105   struct timeval iep_timeout = { 5L, 0L }; 
    106   eventer_t newe; 
    107  
    108   nbatch = stratcon_iep_batch_copytrunc(); 
    109   if(nbatch->batch_size == 0) { 
    110     /* misfire */ 
    111     free(nbatch); 
     330  if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) { 
     331    noitL(noit_debug, "Skipping old event %f second old.\n", age); 
    112332    return 0; 
    113333  } 
    114  
    115   newe = eventer_alloc(); 
    116   newe->mask = EVENTER_ASYNCH; 
    117   add_timeval(*now, iep_timeout, &newe->whence); 
    118   newe->callback = stratcon_iep_submitter; 
    119   newe->closure = nbatch; 
    120  
    121   eventer_add_asynch(&iep_jobq, newe); 
     334  job->doc = stratcon_iep_doc_from_line(job->line); 
     335  if(job->doc) { 
     336    job->doc_str = stratcon__xml_doc_to_str(job->doc); 
     337    if(job->doc_str) { 
     338      /* Submit */ 
     339      struct iep_thread_driver *driver; 
     340      driver = stratcon_iep_get_connection(); 
     341      if(driver && driver->pool && driver->connection) { 
     342        apr_status_t rc; 
     343#ifdef OPENWIRE 
     344        ow_ActiveMQQueue *dest; 
     345        ow_ActiveMQTextMessage *message; 
     346 
     347        apr_pool_create(&job->pool, driver->pool); 
     348        message = ow_ActiveMQTextMessage_create(job->pool); 
     349        message->content = 
     350          ow_byte_array_create_with_data(job->pool,strlen(job->doc_str), 
     351                                         job->doc_str); 
     352        dest = ow_ActiveMQQueue_create(job->pool); 
     353        dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");          
     354        rc = amqcs_send(driver->connection, 
     355                        (ow_ActiveMQDestination*)dest, 
     356                        (ow_ActiveMQMessage*)message, 
     357                        1,4,0,job->pool); 
     358        if(rc != APR_SUCCESS) { 
     359          noitL(noit_error, "MQ send failed, disconnecting\n"); 
     360          if(driver->connection) amqcs_disconnect(&driver->connection); 
     361          driver->connection = NULL; 
     362        } 
     363#else 
     364        stomp_frame out; 
     365 
     366        apr_pool_create(&job->pool, driver->pool); 
     367 
     368        out.command = "SEND"; 
     369        out.headers = apr_hash_make(job->pool); 
     370        apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 
     371        apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 
     372       
     373        out.body_length = -1; 
     374        out.body = job->doc_str; 
     375        rc = stomp_write(driver->connection, &out, job->pool); 
     376        if(rc != APR_SUCCESS) { 
     377          noitL(noit_error, "STOMP send failed, disconnecting\n"); 
     378          if(driver->connection) stomp_disconnect(&driver->connection); 
     379          driver->connection = NULL; 
     380        } 
     381#endif 
     382      } 
     383      else { 
     384        noitL(noit_error, "Not submitting event, no MQ\n"); 
     385      } 
     386    } 
     387  } 
     388  else { 
     389    noitL(noit_error, "no iep handler for: '%s'\n", job->line); 
     390  } 
    122391  return 0; 
    123392} 
     
    126395stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 
    127396                                struct sockaddr *remote, void *operand) { 
     397  struct iep_job_closure *jc; 
     398  eventer_t newe; 
     399  struct timeval __now, iep_timeout = { 20L, 0L }; 
    128400  /* We only care about inserts */ 
     401 
     402  if(op == DS_OP_CHKPT) { 
     403    eventer_add((eventer_t) operand); 
     404    return; 
     405  } 
    129406  if(op != DS_OP_INSERT) return; 
    130407 
    131408  /* process operand and push onto queue */ 
    132   if(stratcon_iep_batch_add_line((char *)operand) == 0) { 
    133     /* If this is the first in the queue, then we need to schedule a 
    134      * sweeper to submit the queue. 
    135      */ 
    136     eventer_t e; 
    137     struct timeval __now, sweep_delay = SWEEP_DELAY; 
    138  
    139     gettimeofday(&__now, NULL); 
    140  
    141     e = eventer_alloc(); 
    142     e->callback = stratcon_iep_batch_sweep; 
    143     add_timeval(__now, sweep_delay, &e->whence); 
    144     e->closure = NULL; /* we can only do one thing */ 
    145     e->mask = EVENTER_TIMER; 
    146     eventer_add(e); 
    147   } 
     409  gettimeofday(&__now, NULL); 
     410  newe = eventer_alloc(); 
     411  newe->mask = EVENTER_ASYNCH; 
     412  add_timeval(__now, iep_timeout, &newe->whence); 
     413  newe->callback = stratcon_iep_submitter; 
     414  jc = calloc(1, sizeof(*jc)); 
     415  jc->line = strdup(operand); 
     416  newe->closure = jc; 
     417 
     418  eventer_add_asynch(&iep_jobq, newe); 
     419
     420 
     421static void connection_destroy(void *vd) { 
     422  struct iep_thread_driver *driver = vd; 
     423#ifdef OPENWIRE 
     424  if(driver->connection) amqcs_disconnect(&driver->connection); 
     425#else 
     426  if(driver->connection) stomp_disconnect(&driver->connection); 
     427#endif 
     428  if(driver->pool) apr_pool_destroy(driver->pool); 
     429  free(driver); 
     430
     431 
     432jlog_streamer_ctx_t * 
     433stratcon_jlog_streamer_iep_ctx_alloc(void) { 
     434  jlog_streamer_ctx_t *ctx; 
     435  ctx = stratcon_jlog_streamer_ctx_alloc(); 
     436  ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED); 
     437  ctx->push = stratcon_iep_datastore_onlooker; 
     438  return ctx; 
     439
     440 
     441struct iep_daemon_info { 
     442  pid_t child; 
     443  int stdin_pipe[2]; 
     444  int stderr_pipe[2]; 
     445  char *directory; 
     446  char *command; 
     447}; 
     448 
     449static void 
     450iep_daemon_info_free(struct iep_daemon_info *info) { 
     451  if(!info) return; 
     452  if(info->directory) free(info->directory); 
     453  if(info->command) free(info->command); 
     454  if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]); 
     455  if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]); 
     456  if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]); 
     457  if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]); 
     458  free(info); 
     459
     460 
     461static int 
     462stratcon_iep_err_handler(eventer_t e, int mask, void *closure, 
     463                         struct timeval *now) { 
     464  int len, newmask; 
     465  char buff[4096]; 
     466  struct iep_daemon_info *info = (struct iep_daemon_info *)closure; 
     467 
     468  if(mask & EVENTER_EXCEPTION) { 
     469    int rv; 
     470   read_error: 
     471    kill(SIGKILL, info->child); 
     472    if(waitpid(info->child, &rv, 0) != info->child) { 
     473      noitL(noit_error, "Failed to reap IEP daemon\n"); 
     474      exit(-1); 
     475    } 
     476    noitL(noit_error, "IEP daemon is done, starting a new one\n"); 
     477    start_iep_daemon(); 
     478    eventer_remove_fd(e->fd); 
     479    e->opset->close(e->fd, &newmask, e); 
     480    return 0; 
     481  } 
     482  while(1) { 
     483    len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e); 
     484    if(len == -1 && (errno == EAGAIN || errno == EINTR)) 
     485      return newmask | EVENTER_EXCEPTION; 
     486    if(len <= 0) goto read_error; 
     487    assert(len < sizeof(buff)); 
     488    buff[len] = '\0'; 
     489    noitL(noit_error, "IEP: %s", buff); 
     490  } 
     491
     492 
     493static void 
     494start_iep_daemon() { 
     495  eventer_t newe; 
     496  struct iep_daemon_info *info; 
     497 
     498  info = calloc(1, sizeof(*info)); 
     499  info->stdin_pipe[0] = info->stdin_pipe[1] = -1; 
     500  info->stderr_pipe[0] = info->stderr_pipe[1] = -1; 
     501 
     502  if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@directory", 
     503                           &info->directory)) 
     504    info->directory = strdup("."); 
     505  if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@command", 
     506                           &info->command)) { 
     507    noitL(noit_error, "No IEP start command provided.  You're on your own.\n"); 
     508    goto bail; 
     509  } 
     510  if(pipe(info->stdin_pipe) != 0 || 
     511     pipe(info->stderr_pipe) != 0) { 
     512    noitL(noit_error, "pipe: %s\n", strerror(errno)); 
     513    goto bail; 
     514  } 
     515  info->child = fork(); 
     516  if(info->child == -1) { 
     517    noitL(noit_error, "fork: %s\n", strerror(errno)); 
     518    goto bail; 
     519  } 
     520  if(info->child == 0) { 
     521    char *argv[2] = { "run-iep", NULL }; 
     522    int stdout_fileno; 
     523 
     524    if(chdir(info->directory) != 0) { 
     525      noitL(noit_error, "Starting IEP daemon, chdir failed: %s\n", 
     526            strerror(errno)); 
     527      exit(-1); 
     528    } 
     529 
     530    close(info->stdin_pipe[1]); 
     531    close(info->stderr_pipe[0]); 
     532    dup2(info->stdin_pipe[0], 0); 
     533    dup2(info->stderr_pipe[1], 2); 
     534    stdout_fileno = open("/dev/null", O_WRONLY); 
     535    dup2(stdout_fileno, 1); 
     536 
     537    exit(execv(info->command, argv)); 
     538  } 
     539  /* in the parent */ 
     540  socklen_t on = 1; 
     541 
     542  close(info->stdin_pipe[0]); 
     543  info->stdin_pipe[0] = -1; 
     544  close(info->stderr_pipe[1]); 
     545  info->stderr_pipe[1] = -1; 
     546  if(ioctl(info->stderr_pipe[0], FIONBIO, &on)) { 
     547    goto bail; 
     548  } 
     549 
     550  newe = eventer_alloc(); 
     551  newe->fd = info->stderr_pipe[0]; 
     552  newe->mask = EVENTER_READ | EVENTER_EXCEPTION; 
     553  newe->callback = stratcon_iep_err_handler; 
     554  newe->closure = info; 
     555  eventer_add(newe); 
     556  info = NULL; 
     557 
     558  return; 
     559 
     560 bail: 
     561  if(info) { 
     562    iep_daemon_info_free(info); 
     563  } 
     564  noitL(noit_error, "Failed to start IEP daemon\n"); 
     565  exit(-1); 
     566  return; 
    148567} 
    149568 
    150569void 
    151570stratcon_iep_init() { 
    152   eventer_name_callback("stratcon_iep_batch_sweep", stratcon_iep_batch_sweep); 
     571  noit_boolean disabled = noit_false; 
     572  apr_initialize(); 
     573  atexit(apr_terminate);    
     574 
     575  if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) && 
     576     disabled == noit_true) { 
     577    noitL(noit_error, "IEP system is disabled!\n"); 
     578    return; 
     579  } 
     580 
     581  start_iep_daemon(); 
     582 
    153583  eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter); 
     584  pthread_key_create(&iep_connection, connection_destroy); 
    154585 
    155586  /* start up a thread pool of one */ 
     
    159590  eventer_jobq_increase_concurrency(&iep_jobq); 
    160591 
    161   /* Setup our pivot batch */ 
    162   memset(&pivot_batch, 0, sizeof(pivot_batch)); 
    163   pthread_mutex_init(&pivot_batch.lock, NULL); 
    164  
    165   /* Add our onlooker */ 
    166   stratcon_datastore_register_onlooker(stratcon_iep_datastore_onlooker); 
    167 } 
    168  
     592  /* setup our live jlog stream */ 
     593  stratcon_streamer_connection(NULL, NULL, 
     594                               stratcon_jlog_recv_handler, 
     595                               (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc, 
     596                               NULL, 
     597                               jlog_streamer_ctx_free); 
     598} 
     599