Changeset 986ca9cdcea688fff7615506f7a522fed667967b

Show
Ignore:
Timestamp:
05/13/11 03:05:48 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1305255948 -0400
git-parent:

[d2b2cf6a9fe0bc981e17e3ca09ad4c053f699dff]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1305255948 -0400
Message:

add support for asynchronous jlog'in

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/utils/noit_log.c

    r2648da9 r986ca9c  
    226226}; 
    227227 
     228typedef struct jlog_line { 
     229  char *buf; 
     230  char buf_static[512]; 
     231  char *buf_dynamic; 
     232  int len; 
     233  void *next; 
     234} jlog_line; 
     235 
     236typedef struct { 
     237  jlog_ctx *log; 
     238  pthread_t writer; 
     239  void *head; 
     240} jlog_asynch_ctx; 
     241 
    228242static int 
    229243jlog_lspath_to_fspath(noit_log_stream_t ls, char *buff, int len, 
     
    267281static int 
    268282jlog_logio_cleanse(noit_log_stream_t ls) { 
     283  jlog_asynch_ctx *actx; 
    269284  jlog_ctx *log; 
    270285  DIR *d; 
     
    274289  int size = 0; 
    275290 
    276   log = (jlog_ctx *)ls->op_ctx; 
     291  actx = (jlog_asynch_ctx *)ls->op_ctx; 
     292  if(!actx) return -1; 
     293  log = actx->log; 
    277294  if(!log) return -1; 
    278295  if(jlog_lspath_to_fspath(ls, path, sizeof(path), NULL) <= 0) return -1; 
     
    315332jlog_logio_reopen(noit_log_stream_t ls) { 
    316333  char **subs; 
     334  jlog_asynch_ctx *actx = ls->op_ctx; 
    317335  int i; 
    318336  /* reopening only has the effect of removing temporary subscriptions */ 
     
    320338 
    321339  if(ls->lock) pthread_rwlock_wrlock(ls->lock); 
    322   if(jlog_ctx_list_subscribers(ls->op_ctx, &subs) == -1) 
     340  if(jlog_ctx_list_subscribers(actx->log, &subs) == -1) 
    323341    goto bail; 
    324342 
    325343  for(i=0;subs[i];i++) 
    326344    if(subs[i][0] == '~') 
    327       jlog_ctx_remove_subscriber(ls->op_ctx, subs[i]); 
    328  
    329   jlog_ctx_list_subscribers_dispose(ls->op_ctx, subs); 
     345      jlog_ctx_remove_subscriber(actx->log, subs[i]); 
     346 
     347  jlog_ctx_list_subscribers_dispose(actx->log, subs); 
    330348  jlog_logio_cleanse(ls); 
    331349 bail: 
     
    333351  return 0; 
    334352} 
     353static jlog_line * 
     354jlog_asynch_pop(jlog_asynch_ctx *actx, jlog_line **iter) { 
     355  jlog_line *h = NULL, *rev = NULL; 
     356 
     357  if(*iter) { /* we have more on the previous list */ 
     358    h = *iter; 
     359    *iter = h->next; 
     360    return h; 
     361  } 
     362 
     363  while(1) { 
     364    h = actx->head; 
     365    if(noit_atomic_casptr((volatile void **)&actx->head, NULL, h) == h) break; 
     366  } 
     367  while(h) { 
     368    /* which unshifted things into the queue -- it's backwards, reverse it */ 
     369    jlog_line *tmp = h; 
     370    h = h->next; 
     371    tmp->next = rev; 
     372    rev = tmp; 
     373  } 
     374  if(rev) *iter = rev->next; 
     375  else *iter = NULL; 
     376  return rev; 
     377} 
     378void 
     379jlog_asynch_push(jlog_asynch_ctx *actx, jlog_line *n) { 
     380  while(1) { 
     381    n->next = actx->head; 
     382    if(noit_atomic_casptr((volatile void **)&actx->head, n, n->next) == n->next) return; 
     383  } 
     384} 
     385static void * 
     386jlog_logio_asynch_writer(void *vls) { 
     387  noit_log_stream_t ls = vls; 
     388  jlog_asynch_ctx *actx = ls->op_ctx; 
     389  jlog_line *iter = NULL; 
     390  while(1) { 
     391    int fast = 0, max = 1000; 
     392    jlog_line *line; 
     393    pthread_rwlock_rdlock(ls->lock); 
     394    while(max > 0 && NULL != (line = jlog_asynch_pop(actx, &iter))) { 
     395      jlog_ctx_write(actx->log, line->buf_dynamic ? 
     396                                  line->buf_dynamic : 
     397                                  line->buf_static, 
     398                     line->len); 
     399      if(line->buf_dynamic != NULL) free(line->buf_dynamic); 
     400      free(line); 
     401      fast = 1; 
     402      max--; 
     403    } 
     404    pthread_rwlock_unlock(ls->lock); 
     405    if(max > 0) { 
     406      /* we didn't hit our limit... so we ran the queue dry */ 
     407      /* 200ms if there was nothing, 10ms otherwise */ 
     408      usleep(fast ? 10000 : 200000); 
     409    } 
     410  } 
     411} 
    335412static int 
    336413jlog_logio_open(noit_log_stream_t ls) { 
    337414  char path[PATH_MAX], *sub, **subs, *p; 
     415  jlog_asynch_ctx *actx; 
    338416  jlog_ctx *log = NULL; 
     417  pthread_attr_t tattr; 
    339418  int i, listed, found; 
    340419 
     
    417496  } 
    418497 
    419   ls->op_ctx = log; 
     498  actx = calloc(1, sizeof(*actx)); 
     499  actx->log = log; 
     500  ls->op_ctx = actx; 
    420501  /* We do this to clean things up */ 
    421502  jlog_logio_reopen(ls); 
     503 
     504  pthread_attr_init(&tattr); 
     505  pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); 
     506  if(pthread_create(&actx->writer, &tattr, jlog_logio_asynch_writer, ls) != 0) { 
     507    return -1; 
     508  } 
    422509  return 0; 
    423510} 
     
    425512jlog_logio_write(noit_log_stream_t ls, const void *buf, size_t len) { 
    426513  int rv = -1; 
     514  jlog_asynch_ctx *actx; 
     515  jlog_line *line; 
    427516  if(!ls->op_ctx) return -1; 
    428   pthread_rwlock_rdlock(ls->lock); 
    429   if(jlog_ctx_write((jlog_ctx *)ls->op_ctx, buf, len) == 0) 
    430     rv = len; 
    431   pthread_rwlock_unlock(ls->lock); 
     517  actx = ls->op_ctx; 
     518  line = calloc(1, sizeof(*line)); 
     519  if(len > sizeof(line->buf_static)) { 
     520    line->buf_dynamic = malloc(len); 
     521    memcpy(line->buf_dynamic, buf, len); 
     522  } 
     523  else { 
     524    memcpy(line->buf_static, buf, len); 
     525  } 
     526  line->len = len; 
     527  jlog_asynch_push(actx, line); 
    432528  return rv; 
    433529} 
     
    435531jlog_logio_close(noit_log_stream_t ls) { 
    436532  if(ls->op_ctx) { 
    437     jlog_ctx_close((jlog_ctx *)ls->op_ctx); 
     533    jlog_asynch_ctx *actx = ls->op_ctx; 
     534    jlog_ctx_close(actx->log); 
    438535    ls->op_ctx = NULL; 
    439536  } 
     
    443540jlog_logio_size(noit_log_stream_t ls) { 
    444541  size_t size; 
     542  jlog_asynch_ctx *actx; 
    445543  if(!ls->op_ctx) return -1; 
     544  actx = ls->op_ctx; 
    446545  pthread_rwlock_rdlock(ls->lock); 
    447   size = jlog_raw_size((jlog_ctx *)ls->op_ctx); 
     546  size = jlog_raw_size(actx->log); 
    448547  pthread_rwlock_unlock(ls->lock); 
    449548  return size;