Changeset 78adbbc2d15b0107283a0394c6d472a590033106

Show
Ignore:
Timestamp:
05/14/11 21:08:21 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1305407301 -0400
git-parent:

[410716d9cc66847c0f708a31e3c4a72561e17e3b]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1305407301 -0400
Message:

We open the jlog in the monitor process, which does us no good.

We call reopen once we're in the child, so that needs to tear down
the asynch jlog thread if it exists and create a new one.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/utils/noit_log.c

    rf4a1380 r78adbbc  
    139139posix_logio_reopen(noit_log_stream_t ls) { 
    140140  if(ls->path) { 
     141    pthread_rwlock_t *lock = ls->lock; 
    141142    int newfd, oldfd, rv = -1; 
    142     if(ls->lock) pthread_rwlock_wrlock(ls->lock); 
     143    if(lock) pthread_rwlock_wrlock(lock); 
    143144    oldfd = (int)(vpsized_int)ls->op_ctx; 
    144145    newfd = open(ls->path, O_CREAT|O_WRONLY|O_APPEND, ls->mode); 
     
    151152      if(fstat(newfd, &sb) == 0) ls->written = (int32_t)sb.st_size; 
    152153    } 
    153     if(ls->lock) pthread_rwlock_unlock(ls->lock); 
     154    if(lock) pthread_rwlock_unlock(lock); 
    154155    return rv; 
    155156  } 
     
    159160posix_logio_write(noit_log_stream_t ls, const void *buf, size_t len) { 
    160161  int fd, rv = -1; 
    161   if(ls->lock) pthread_rwlock_rdlock(ls->lock); 
     162  pthread_rwlock_t *lock = ls->lock; 
     163  if(lock) pthread_rwlock_rdlock(lock); 
    162164  fd = (int)(vpsized_int)ls->op_ctx; 
    163165  debug_printf("writing to %d\n", fd); 
    164166  if(fd >= 0) rv = write(fd, buf, len); 
    165   if(ls->lock) pthread_rwlock_unlock(ls->lock); 
     167  if(lock) pthread_rwlock_unlock(lock); 
    166168  if(rv > 0) noit_atomic_add32(&ls->written, rv); 
    167169  return rv; 
     
    170172posix_logio_writev(noit_log_stream_t ls, const struct iovec *iov, int iovcnt) { 
    171173  int fd, rv = -1; 
    172   if(ls->lock) pthread_rwlock_rdlock(ls->lock); 
     174  pthread_rwlock_t *lock = ls->lock; 
     175  if(lock) pthread_rwlock_rdlock(lock); 
    173176  fd = (int)(vpsized_int)ls->op_ctx; 
    174177  debug_printf("writ(v)ing to %d\n", fd); 
    175178  if(fd >= 0) rv = writev(fd, iov, iovcnt); 
    176   if(ls->lock) pthread_rwlock_unlock(ls->lock); 
     179  if(lock) pthread_rwlock_unlock(lock); 
    177180  if(rv > 0) noit_atomic_add32(&ls->written, rv); 
    178181  return rv; 
     
    181184posix_logio_close(noit_log_stream_t ls) { 
    182185  int fd, rv; 
    183   if(ls->lock) pthread_rwlock_wrlock(ls->lock); 
     186  pthread_rwlock_t *lock = ls->lock; 
     187  if(lock) pthread_rwlock_wrlock(lock); 
    184188  fd = (int)(vpsized_int)ls->op_ctx; 
    185189  rv = close(fd); 
    186   if(ls->lock) pthread_rwlock_unlock(ls->lock); 
     190  if(lock) pthread_rwlock_unlock(lock); 
    187191  return rv; 
    188192} 
     
    192196  size_t s = (size_t)-1; 
    193197  struct stat sb; 
    194   if(ls->lock) pthread_rwlock_rdlock(ls->lock); 
     198  pthread_rwlock_t *lock = ls->lock; 
     199  if(lock) pthread_rwlock_rdlock(lock); 
    195200  fd = (int)(vpsized_int)ls->op_ctx; 
    196201  if(fstat(fd, &sb) == 0) { 
    197202    s = (size_t)sb.st_size; 
    198203  } 
    199   if(ls->lock) pthread_rwlock_unlock(ls->lock); 
     204  if(lock) pthread_rwlock_unlock(lock); 
    200205  return s; 
    201206} 
     
    204209  int rv = 0; 
    205210  char autoname[PATH_MAX]; 
     211  pthread_rwlock_t *lock = ls->lock; 
    206212  if(name == NOIT_LOG_RENAME_AUTOTIME) { 
    207213    time_t now = time(NULL); 
     
    211217  } 
    212218  if(!strcmp(name, ls->path)) return 0; /* noop */ 
    213   if(ls->lock) pthread_rwlock_rdlock(ls->lock); 
     219  if(lock) pthread_rwlock_rdlock(lock); 
    214220  rv = rename(ls->path, name); 
    215   if(ls->lock) pthread_rwlock_unlock(ls->lock); 
     221  if(lock) pthread_rwlock_unlock(lock); 
    216222  return -1; 
    217223} 
     
    238244  pthread_t writer; 
    239245  void *head; 
     246  int gen;  /* generation */ 
    240247} jlog_asynch_ctx; 
    241248 
     
    329336} 
    330337 
    331 static int 
    332 jlog_logio_reopen(noit_log_stream_t ls) { 
    333   char **subs; 
    334   jlog_asynch_ctx *actx = ls->op_ctx; 
    335   int i; 
    336   /* reopening only has the effect of removing temporary subscriptions */ 
    337   /* (they start with ~ in our hair-brained model */ 
    338  
    339   if(ls->lock) pthread_rwlock_wrlock(ls->lock); 
    340   if(jlog_ctx_list_subscribers(actx->log, &subs) == -1) 
    341     goto bail; 
    342  
    343   for(i=0;subs[i];i++) 
    344     if(subs[i][0] == '~') 
    345       jlog_ctx_remove_subscriber(actx->log, subs[i]); 
    346  
    347   jlog_ctx_list_subscribers_dispose(actx->log, subs); 
    348   jlog_logio_cleanse(ls); 
    349  bail: 
    350   if(ls->lock) pthread_rwlock_unlock(ls->lock); 
    351   return 0; 
    352 } 
    353338static jlog_line * 
    354339jlog_asynch_pop(jlog_asynch_ctx *actx, jlog_line **iter) { 
     
    390375  jlog_asynch_ctx *actx = ls->op_ctx; 
    391376  jlog_line *iter = NULL; 
    392   while(1) { 
     377  int gen = actx->gen; 
     378  noitL(noit_error, "Starting asynchronous jlog writer[%p]\n", (void *)pthread_self()); 
     379  while(gen == actx->gen) { 
     380    pthread_rwlock_t *lock; 
    393381    int fast = 0, max = 1000; 
    394382    jlog_line *line; 
    395     pthread_rwlock_rdlock(ls->lock); 
     383    lock = ls->lock; 
     384    if(lock) pthread_rwlock_rdlock(lock); 
    396385    while(max > 0 && NULL != (line = jlog_asynch_pop(actx, &iter))) { 
    397386      jlog_ctx_write(actx->log, line->buf_dynamic ? 
     
    404393      max--; 
    405394    } 
    406     pthread_rwlock_unlock(ls->lock); 
     395    if(lock) pthread_rwlock_unlock(lock); 
    407396    if(max > 0) { 
    408397      /* we didn't hit our limit... so we ran the queue dry */ 
     
    411400    } 
    412401  } 
     402  pthread_exit((void *)0); 
     403} 
     404static int 
     405jlog_logio_reopen(noit_log_stream_t ls) { 
     406  void *unused; 
     407  char **subs; 
     408  jlog_asynch_ctx *actx = ls->op_ctx; 
     409  pthread_rwlock_t *lock = ls->lock; 
     410  int i; 
     411  /* reopening only has the effect of removing temporary subscriptions */ 
     412  /* (they start with ~ in our hair-brained model */ 
     413 
     414  if(lock) pthread_rwlock_wrlock(lock); 
     415  if(jlog_ctx_list_subscribers(actx->log, &subs) == -1) 
     416    goto bail; 
     417 
     418  for(i=0;subs[i];i++) 
     419    if(subs[i][0] == '~') 
     420      jlog_ctx_remove_subscriber(actx->log, subs[i]); 
     421 
     422  jlog_ctx_list_subscribers_dispose(actx->log, subs); 
     423  jlog_logio_cleanse(ls); 
     424 bail: 
     425  if(lock) pthread_rwlock_unlock(lock); 
     426 
     427  actx->gen++; 
     428  pthread_join(actx->writer, &unused); 
     429  if(pthread_create(&actx->writer, NULL, jlog_logio_asynch_writer, ls) != 0) 
     430    return -1; 
     431   
     432  return 0; 
    413433} 
    414434static int 
     
    501521  actx->log = log; 
    502522  ls->op_ctx = actx; 
     523 
     524  if(pthread_create(&actx->writer, NULL, jlog_logio_asynch_writer, ls) != 0) 
     525    return -1; 
     526 
    503527  /* We do this to clean things up */ 
    504528  jlog_logio_reopen(ls); 
    505  
    506   pthread_attr_init(&tattr); 
    507   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); 
    508   if(pthread_create(&actx->writer, &tattr, jlog_logio_asynch_writer, ls) != 0) { 
    509     return -1; 
    510   } 
    511529  return 0; 
    512530} 
     
    543561  size_t size; 
    544562  jlog_asynch_ctx *actx; 
     563  pthread_rwlock_t *lock = ls->lock; 
    545564  if(!ls->op_ctx) return -1; 
    546565  actx = ls->op_ctx; 
    547   pthread_rwlock_rdlock(ls->lock); 
     566  if(lock) pthread_rwlock_rdlock(lock); 
    548567  size = jlog_raw_size(actx->log); 
    549   pthread_rwlock_unlock(ls->lock); 
     568  if(lock) pthread_rwlock_unlock(lock); 
    550569  return size; 
    551570}