Changeset 0477cc94a4188def7b6882f43e6fd222237071ea

Show
Ignore:
Timestamp:
05/14/11 02:00:40 (7 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1305338440 -0400
git-parent:

[65abbae9594e84d43262cb5176eb757e960360e6], [f4a13804007c7b1213e462a1395026bdbafe3e2e]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1305338440 -0400
Message:

Merge branch 'asynch-jlog'

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/utils/noit_log.c

    r2648da9 rf4a1380  
    226226}; 
    227227 
     228typedef struct jlog_line { 
     229  char *buf; 
     230  char buf_static[512]; 
     231  char *buf_dynamic; 
     232  int len; 
     233  void *next; 
     234} jlog_line; 
     235 
     236typedef struct { 
     237  jlog_ctx *log; 
     238  pthread_t writer; 
     239  void *head; 
     240} jlog_asynch_ctx; 
     241 
    228242static int 
    229243jlog_lspath_to_fspath(noit_log_stream_t ls, char *buff, int len, 
     
    267281static int 
    268282jlog_logio_cleanse(noit_log_stream_t ls) { 
     283  jlog_asynch_ctx *actx; 
    269284  jlog_ctx *log; 
    270285  DIR *d; 
     
    274289  int size = 0; 
    275290 
    276   log = (jlog_ctx *)ls->op_ctx; 
     291  actx = (jlog_asynch_ctx *)ls->op_ctx; 
     292  if(!actx) return -1; 
     293  log = actx->log; 
    277294  if(!log) return -1; 
    278295  if(jlog_lspath_to_fspath(ls, path, sizeof(path), NULL) <= 0) return -1; 
     
    315332jlog_logio_reopen(noit_log_stream_t ls) { 
    316333  char **subs; 
     334  jlog_asynch_ctx *actx = ls->op_ctx; 
    317335  int i; 
    318336  /* reopening only has the effect of removing temporary subscriptions */ 
     
    320338 
    321339  if(ls->lock) pthread_rwlock_wrlock(ls->lock); 
    322   if(jlog_ctx_list_subscribers(ls->op_ctx, &subs) == -1) 
     340  if(jlog_ctx_list_subscribers(actx->log, &subs) == -1) 
    323341    goto bail; 
    324342 
    325343  for(i=0;subs[i];i++) 
    326344    if(subs[i][0] == '~') 
    327       jlog_ctx_remove_subscriber(ls->op_ctx, subs[i]); 
    328  
    329   jlog_ctx_list_subscribers_dispose(ls->op_ctx, subs); 
     345      jlog_ctx_remove_subscriber(actx->log, subs[i]); 
     346 
     347  jlog_ctx_list_subscribers_dispose(actx->log, subs); 
    330348  jlog_logio_cleanse(ls); 
    331349 bail: 
     
    333351  return 0; 
    334352} 
     353static jlog_line * 
     354jlog_asynch_pop(jlog_asynch_ctx *actx, jlog_line **iter) { 
     355  jlog_line *h = NULL, *rev = NULL; 
     356 
     357  if(*iter) { /* we have more on the previous list */ 
     358    h = *iter; 
     359    *iter = h->next; 
     360    return h; 
     361  } 
     362 
     363  while(1) { 
     364    h = (void *)(volatile void *)actx->head; 
     365    if(noit_atomic_casptr((volatile void **)&actx->head, NULL, h) == h) break; 
     366    /* TODO: load-load */ 
     367  } 
     368  while(h) { 
     369    /* which unshifted things into the queue -- it's backwards, reverse it */ 
     370    jlog_line *tmp = h; 
     371    h = h->next; 
     372    tmp->next = rev; 
     373    rev = tmp; 
     374  } 
     375  if(rev) *iter = rev->next; 
     376  else *iter = NULL; 
     377  return rev; 
     378} 
     379void 
     380jlog_asynch_push(jlog_asynch_ctx *actx, jlog_line *n) { 
     381  while(1) { 
     382    n->next = (void *)(volatile void *)actx->head; 
     383    if(noit_atomic_casptr((volatile void **)&actx->head, n, n->next) == n->next) return; 
     384    /* TODO: load-load */ 
     385  } 
     386} 
     387static void * 
     388jlog_logio_asynch_writer(void *vls) { 
     389  noit_log_stream_t ls = vls; 
     390  jlog_asynch_ctx *actx = ls->op_ctx; 
     391  jlog_line *iter = NULL; 
     392  while(1) { 
     393    int fast = 0, max = 1000; 
     394    jlog_line *line; 
     395    pthread_rwlock_rdlock(ls->lock); 
     396    while(max > 0 && NULL != (line = jlog_asynch_pop(actx, &iter))) { 
     397      jlog_ctx_write(actx->log, line->buf_dynamic ? 
     398                                  line->buf_dynamic : 
     399                                  line->buf_static, 
     400                     line->len); 
     401      if(line->buf_dynamic != NULL) free(line->buf_dynamic); 
     402      free(line); 
     403      fast = 1; 
     404      max--; 
     405    } 
     406    pthread_rwlock_unlock(ls->lock); 
     407    if(max > 0) { 
     408      /* we didn't hit our limit... so we ran the queue dry */ 
     409      /* 200ms if there was nothing, 10ms otherwise */ 
     410      usleep(fast ? 10000 : 200000); 
     411    } 
     412  } 
     413} 
    335414static int 
    336415jlog_logio_open(noit_log_stream_t ls) { 
    337416  char path[PATH_MAX], *sub, **subs, *p; 
     417  jlog_asynch_ctx *actx; 
    338418  jlog_ctx *log = NULL; 
     419  pthread_attr_t tattr; 
    339420  int i, listed, found; 
    340421 
     
    417498  } 
    418499 
    419   ls->op_ctx = log; 
     500  actx = calloc(1, sizeof(*actx)); 
     501  actx->log = log; 
     502  ls->op_ctx = actx; 
    420503  /* We do this to clean things up */ 
    421504  jlog_logio_reopen(ls); 
     505 
     506  pthread_attr_init(&tattr); 
     507  pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); 
     508  if(pthread_create(&actx->writer, &tattr, jlog_logio_asynch_writer, ls) != 0) { 
     509    return -1; 
     510  } 
    422511  return 0; 
    423512} 
     
    425514jlog_logio_write(noit_log_stream_t ls, const void *buf, size_t len) { 
    426515  int rv = -1; 
     516  jlog_asynch_ctx *actx; 
     517  jlog_line *line; 
    427518  if(!ls->op_ctx) return -1; 
    428   pthread_rwlock_rdlock(ls->lock); 
    429   if(jlog_ctx_write((jlog_ctx *)ls->op_ctx, buf, len) == 0) 
    430     rv = len; 
    431   pthread_rwlock_unlock(ls->lock); 
     519  actx = ls->op_ctx; 
     520  line = calloc(1, sizeof(*line)); 
     521  if(len > sizeof(line->buf_static)) { 
     522    line->buf_dynamic = malloc(len); 
     523    memcpy(line->buf_dynamic, buf, len); 
     524  } 
     525  else { 
     526    memcpy(line->buf_static, buf, len); 
     527  } 
     528  line->len = len; 
     529  jlog_asynch_push(actx, line); 
    432530  return rv; 
    433531} 
     
    435533jlog_logio_close(noit_log_stream_t ls) { 
    436534  if(ls->op_ctx) { 
    437     jlog_ctx_close((jlog_ctx *)ls->op_ctx); 
     535    jlog_asynch_ctx *actx = ls->op_ctx; 
     536    jlog_ctx_close(actx->log); 
    438537    ls->op_ctx = NULL; 
    439538  } 
     
    443542jlog_logio_size(noit_log_stream_t ls) { 
    444543  size_t size; 
     544  jlog_asynch_ctx *actx; 
    445545  if(!ls->op_ctx) return -1; 
     546  actx = ls->op_ctx; 
    446547  pthread_rwlock_rdlock(ls->lock); 
    447   size = jlog_raw_size((jlog_ctx *)ls->op_ctx); 
     548  size = jlog_raw_size(actx->log); 
    448549  pthread_rwlock_unlock(ls->lock); 
    449550  return size;