Changeset 04242928f5428460d4da69245f0f005b56051fc2

Show
Ignore:
Timestamp:
02/27/08 04:23:20 (6 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1204086200 +0000
git-parent:

[39cf694b99f710f20d0f8bccc48d4376c30c27ae]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1204086200 +0000
Message:

basic log streamer, need a client to further test it.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/noit.conf

    r06f58e6 r0424292  
    1212      <log name="debug" disabled="true"/> 
    1313    </console_output> 
    14     <log name="feed" type="jlog" path="/var/log/noitd.feed" /> 
     14    <log name="feed" type="jlog" path="/var/log/noitd.feed=>stratcon" /> 
    1515    <!--<log name="feed"><outlet name="stderr"/></log>--> 
    1616    <feeds> 
     
    3333  </modules> 
    3434  <listeners> 
     35    <sslconfig> 
     36      <optional_no_ca>false</optional_no_ca> 
     37      <certificate_file>test.crt</certificate_file> 
     38      <key_file>test.key</key_file> 
     39      <ca_chain>test-ca.crt</ca_chain> 
     40    </sslconfig> 
    3541    <consoles type="noit_console"> 
    3642      <listener address="/tmp/noit"> 
     
    4147      </listener> 
    4248      <listener address="*" port="32323"> 
    43         <sslconfig> 
    44           <optional_no_ca>false</optional_no_ca> 
    45           <certificate_file>test.crt</certificate_file> 
    46           <key_file>test.key</key_file> 
    47           <ca_chain>test-ca.crt</ca_chain> 
    48         </sslconfig> 
    4949      </listener> 
    5050    </consoles> 
  • src/noit_jlog_listener.c

    r06f58e6 r0424292  
    1212#include "noit_jlog_listener.h" 
    1313 
     14#include <unistd.h> 
     15#include <sys/ioctl.h> 
     16 
    1417void 
    1518noit_jlog_listener_init() { 
     
    1922typedef struct { 
    2023  jlog_ctx *jlog; 
     24  jlog_id chkpt; 
     25  jlog_id start; 
     26  jlog_id finish; 
     27  int count; 
    2128  int wants_shutdown; 
    2229} noit_jlog_closure_t; 
     
    3542} 
    3643 
     44#define Ewrite(a,b) e->opset->write(e->fd, a, b, &mask, e) 
     45static int 
     46noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) { 
     47  jlog_message msg; 
     48  int mask; 
     49  u_int32_t n_count; 
     50  n_count = htonl(jcl->count); 
     51  if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count)) 
     52    return -1; 
     53  while(jcl->count > 0) { 
     54    struct { u_int32_t n_sec, n_usec, n_len; } payload; 
     55    if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1) 
     56      return -1; 
     57 
     58    /* Here we actually push the message */ 
     59    payload.n_sec  = htonl(msg.header->tv_sec); 
     60    payload.n_usec = htonl(msg.header->tv_usec); 
     61    payload.n_len  = htonl(msg.mess_len); 
     62    if(Ewrite(&payload, sizeof(payload)) != sizeof(payload)) 
     63      return -1; 
     64    if(Ewrite(msg.mess, msg.mess_len) != msg.mess_len) 
     65      return -1; 
     66    /* Note what the client must checkpoint */ 
     67    jcl->chkpt = jcl->start; 
     68 
     69    JLOG_ID_ADVANCE(&jcl->start); 
     70    jcl->count--; 
     71  } 
     72  return 0; 
     73} 
     74 
     75void * 
     76noit_jlog_thread_main(void *e_vptr) { 
     77  int mask, bytes_read; 
     78  eventer_t e = e_vptr; 
     79  acceptor_closure_t *ac = e->closure; 
     80  noit_jlog_closure_t *jcl = ac->service_ctx; 
     81  long off = 0; 
     82  char inbuff[sizeof(jlog_id)]; 
     83 
     84  /* Go into blocking mode */ 
     85  ioctl(e->fd, FIONBIO, &off); 
     86 
     87  while(1) { 
     88    jlog_id client_chkpt; 
     89    jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt); 
     90    jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish); 
     91    if(jcl->count > 0) { 
     92      if(noit_jlog_push(e, jcl)) { 
     93        goto alldone; 
     94      } 
     95    } 
     96    /* Read our jlog_id accounting for possibly short reads */ 
     97    bytes_read = 0; 
     98    while(bytes_read < sizeof(jlog_id)) { 
     99      int len; 
     100      if((len = e->opset->read(e->fd, inbuff + bytes_read, 
     101                               sizeof(jlog_id) - bytes_read, 
     102                               &mask, e)) <= 0) 
     103        goto alldone; 
     104      bytes_read += len; 
     105    } 
     106    memcpy(&client_chkpt, inbuff, sizeof(jlog_id)); 
     107    /* Fix the endian */ 
     108    client_chkpt.log = ntohl(client_chkpt.log); 
     109    client_chkpt.marker = ntohl(client_chkpt.marker); 
     110 
     111    if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) { 
     112      noitL(noit_error, 
     113            "client %s submitted invalid checkpoint %u:%u expected %u:%u\n", 
     114            ac->remote_cn, client_chkpt.log, client_chkpt.marker, 
     115            jcl->chkpt.log, jcl->chkpt.marker); 
     116      goto alldone; 
     117    } 
     118    jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt); 
     119    sleep(5); 
     120  } 
     121 
     122 alldone: 
     123  e->opset->close(e->fd, &mask, e); 
     124  if(jcl) noit_jlog_closure_free(jcl); 
     125  if(ac) acceptor_closure_free(ac); 
     126  return NULL; 
     127} 
     128 
    37129int 
    38130noit_jlog_handler(eventer_t e, int mask, void *closure, 
    39131                     struct timeval *now) { 
     132  eventer_t newe; 
     133  pthread_t tid; 
    40134  int newmask = EVENTER_READ | EVENTER_EXCEPTION; 
    41135  acceptor_closure_t *ac = closure; 
     
    72166      goto socket_error; 
    73167    } 
     168    if(!ac->remote_cn) { 
     169      noitL(noit_error, "jlog transit started to unidentified party.\n"); 
     170      goto socket_error; 
     171    } 
     172 
     173    jcl->jlog = jlog_new(ls->path); 
     174    if(jlog_ctx_open_reader(jcl->jlog, ac->remote_cn) == -1) { 
     175      noitL(noit_error, "jlog reader[%s] error: %s\n", ac->remote_cn, 
     176            jlog_ctx_err_string(jcl->jlog)); 
     177      goto socket_error; 
     178    } 
    74179  } 
    75180 
     181  /* The jlog stuff is disk I/O and can block us. 
     182   * We'll create a new thread to just handle this connection. 
     183   */ 
     184  eventer_remove_fd(e->fd); 
     185  newe = eventer_alloc(); 
     186  memcpy(newe, e, sizeof(*e)); 
     187  if(pthread_create(&tid, NULL, noit_jlog_thread_main, newe) == 0) { 
     188    return 0; 
     189  } 
     190 
     191  /* Undo our dup */ 
     192  eventer_free(newe); 
     193  /* Creating the thread failed, close it down and deschedule. */ 
    76194  e->opset->close(e->fd, &newmask, e); 
    77   eventer_remove_fd(e->fd); 
    78195  return 0; 
    79196} 
  • src/utils/noit_log.c

    r06f58e6 r0424292  
    44 */ 
    55 
    6 #define DEFAULT_JLOG_SUBSCRIBER "stratcond
     6#define DEFAULT_JLOG_SUBSCRIBER "stratcon
    77 
    88#include "noit_defines.h" 
     
    6969static int 
    7070jlog_logio_open(noit_log_stream_t ls) { 
     71  char path[PATH_MAX], *sub; 
    7172  jlog_ctx *log = NULL; 
    7273  if(!ls->path) return -1; 
     74  strlcpy(path, ls->path, sizeof(path)); 
     75  sub = strstr(path, "=>"); 
     76  if(sub) { 
     77    *sub = '\0'; 
     78    sub += 2; 
     79  } 
    7380  log = jlog_new(ls->path); 
    7481  if(!log) return -1; 
     
    96103    } 
    97104    /* The first time we open after an init, we should add the subscriber. */ 
    98     jlog_ctx_add_subscriber(log, DEFAULT_JLOG_SUBSCRIBER, JLOG_BEGIN); 
     105    if(sub) 
     106      jlog_ctx_add_subscriber(log, sub, JLOG_BEGIN); 
     107    else 
     108      jlog_ctx_add_subscriber(log, DEFAULT_JLOG_SUBSCRIBER, JLOG_BEGIN); 
    99109  } 
    100110  ls->op_ctx = log;