Changeset 09ecd288337978ca01c842684ec58b4bad68ce6e

Show
Ignore:
Timestamp:
02/20/11 15:50:57 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1298217057 +0000
git-parent:

[f8c63e9fdef637cbef40fa8f4535742dce17d609]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1298217057 +0000
Message:

This patch does a lot, all refs #351

  • fix up the test harness to support noitd restarts and
    expected crashes
  • Add different cancellation methodologies to the jobq implemntation
    • "evil_brutal" which is the old siglongjmp way.
    • "cancel_deferred" which uses pthread_cancel w/ CANCEL_DEFERRED
    • "cancel_asynch" which uses pthread_cancel w/ CANCEL_ASYNCHRONOUS
  • Add a game over scenario is the cooperative cancellation mechanisms
    don't work and end up exhausting all the threads in a pool.
  • Reduce the minimum check period set via REST to 1s to enable better
    testing. NOTE: maybe this should be much smaller even.
  • Change the thread pool system to spawn as new jobs are queued.
    This isn't automatic demand-driven sizing, but rather we don't
    start the (N) threads until (N) events arrive (not necessarily
    concurrently).
  • Added a test_abort module that runs different types of faux workloads
    to assist in testing the functional correctness of each method.
    Workloads include, variable work time, variable method of cancellation
    type and interruptable (nanosleep) and uninterruptable (compute).
  • Added fairly thorough tests for each method under each workload
    condition. Tested on darwin (finding cancel_asynch to not work well).
    Needs testing on other platforms.
Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/eventer/eventer.h

    r31d42e5 r09ecd28  
    4848#define EVENTER_ASYNCH           (EVENTER_ASYNCH_WORK | EVENTER_ASYNCH_CLEANUP) 
    4949#define EVENTER_RECURRENT        0x80 
     50#define EVENTER_EVIL_BRUTAL     0x100 
     51#define EVENTER_CANCEL_DEFERRED 0x200 
     52#define EVENTER_CANCEL_ASYNCH   0x400 
     53#define EVENTER_CANCEL          (EVENTER_CANCEL_DEFERRED|EVENTER_CANCEL_ASYNCH) 
     54 
     55#define EVENTER_DEFAULT_ASYNCH_ABORT EVENTER_CANCEL_DEFERRED 
    5056 
    5157/* All of these functions act like their POSIX couterparts with two 
  • src/eventer/eventer_impl.c

    r3d39c2b r09ecd28  
    192192  job = calloc(1, sizeof(*job)); 
    193193  job->fd_event = e; 
     194  job->jobq = q ? q : &__default_jobq; 
    194195  gettimeofday(&job->create_time, NULL); 
    195196  /* If we're debugging the eventer, these cross thread timeouts will 
  • src/eventer/eventer_jobq.c

    re3106ad r09ecd28  
    6161  env = pthread_getspecific(jobq->threadenv); 
    6262  job = pthread_getspecific(jobq->activejob); 
    63   if(env && job
     63  if(env && job && job->fd_event->mask & EVENTER_EVIL_BRUTAL
    6464    if(noit_atomic_cas32(&job->inflight, 0, 1) == 1) 
    6565       siglongjmp(*env, 1); 
     
    142142} 
    143143 
     144static void * 
     145eventer_jobq_consumer_pthreadentry(void *vp) { 
     146  return eventer_jobq_consumer((eventer_jobq_t *)vp); 
     147} 
     148static void 
     149eventer_jobq_maybe_spawn(eventer_jobq_t *jobq) { 
     150  int32_t current = jobq->concurrency; 
     151  /* if we've no desired concurrency, this doesn't apply to us */ 
     152  if(jobq->desired_concurrency == 0) return; 
     153  /* See if we need to launch one */ 
     154  if(jobq->desired_concurrency > current) { 
     155    /* we need another thread, maybe... this is a race as we do the 
     156     * increment in the new thread, but we check there and back it out 
     157     * if we did something we weren't supposed to. */ 
     158    pthread_t tid; 
     159    noitL(eventer_deb, "Starting queue[%s] thread now at %d\n", 
     160          jobq->queue_name, jobq->concurrency); 
     161    pthread_create(&tid, NULL, eventer_jobq_consumer_pthreadentry, jobq); 
     162  } 
     163  noitL(eventer_deb, "jobq_queue[%s] pending cancels [%d/%d]\n", 
     164        jobq->queue_name, jobq->pending_cancels, 
     165        jobq->desired_concurrency); 
     166  if(jobq->pending_cancels == jobq->desired_concurrency) { 
     167    /* we're absolutely screwed at this point... it's time to just die */ 
     168    noitL(noit_error, "jobq_queue[%s] induced [%d/%d] game over.\n", 
     169          jobq->queue_name, jobq->pending_cancels, 
     170          jobq->desired_concurrency); 
     171    assert(jobq->pending_cancels != jobq->desired_concurrency); 
     172  } 
     173} 
    144174void 
    145175eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job) { 
    146176  job->next = NULL; 
    147  
     177  eventer_jobq_maybe_spawn(jobq); 
    148178  pthread_mutex_lock(&jobq->lock); 
    149179  if(jobq->tailq) { 
     
    206236  job->timeout_event = NULL; 
    207237  noitL(eventer_deb, "%p jobq -> timeout job [%p]\n", pthread_self(), job); 
    208   if(job->inflight) pthread_kill(job->executor, JOBQ_SIGNAL); 
     238  if(job->inflight) { 
     239    eventer_job_t *jobcopy; 
     240    if(job->fd_event->mask & (EVENTER_CANCEL)) { 
     241      eventer_t my_precious = job->fd_event; 
     242      /* we set this to null so we can't complete on it */ 
     243      job->fd_event = NULL; 
     244      noitL(eventer_deb, "[inline] timeout cancelling job\n"); 
     245      noit_atomic_inc32(&job->jobq->pending_cancels); 
     246      pthread_cancel(job->executor); 
     247      /* complete on it ourselves */ 
     248      if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) { 
     249        /* We need to cleanup... we haven't done it yet. */ 
     250        noitL(eventer_deb, "[inline] %p jobq[%s] -> cleanup [%p]\n", 
     251              pthread_self(), job->jobq->queue_name, job); 
     252        /* This is the real question... asynch cleanup is supposed to 
     253         * be called asynch -- we're going to call it synchronously 
     254         * I think this is a bad idea, but not cleaning up seems worse. 
     255         * Because we're synchronous, if we hang, we'll be watchdogged. 
     256         * 
     257         * Uncooperative plugins/third-party libs can truly suck 
     258         * one's soul out. 
     259         */ 
     260        if(my_precious) 
     261          my_precious->callback(my_precious, EVENTER_ASYNCH_CLEANUP, 
     262                                my_precious->closure, &job->finish_time); 
     263      } 
     264      jobcopy = malloc(sizeof(*jobcopy)); 
     265      memcpy(jobcopy, job, sizeof(*jobcopy)); 
     266      free(job); 
     267      jobcopy->fd_event = my_precious; 
     268      eventer_jobq_maybe_spawn(jobcopy->jobq); 
     269      eventer_jobq_enqueue(jobcopy->jobq->backq, jobcopy); 
     270    } 
     271    else 
     272      pthread_kill(job->executor, JOBQ_SIGNAL); 
     273  } 
    209274  return 0; 
    210275} 
     
    233298  return EVENTER_RECURRENT; 
    234299} 
    235 static void * 
    236 eventer_jobq_consumer_pthreadentry(void *vp) { 
    237   return eventer_jobq_consumer((eventer_jobq_t *)vp); 
     300static void 
     301eventer_jobq_cancel_cleanup(void *vp) { 
     302  eventer_jobq_t *jobq = vp; 
     303  noit_atomic_dec32(&jobq->pending_cancels); 
     304  noit_atomic_dec32(&jobq->concurrency); 
    238305} 
    239306void * 
    240307eventer_jobq_consumer(eventer_jobq_t *jobq) { 
    241308  eventer_job_t *job; 
     309  int32_t current_count; 
    242310  sigjmp_buf env; 
    243311 
    244312  assert(jobq->backq); 
    245   noit_atomic_inc32(&jobq->concurrency); 
     313  current_count = noit_atomic_inc32(&jobq->concurrency); 
     314  noitL(eventer_deb, "jobq[%s] -> %d\n", jobq->queue_name, current_count); 
     315  if(current_count > jobq->desired_concurrency) { 
     316    noitL(eventer_deb, "jobq[%s] over provisioned, backing out.", 
     317          jobq->queue_name); 
     318    noit_atomic_dec32(&jobq->concurrency); 
     319    pthread_exit(NULL); 
     320    return NULL; 
     321  } 
    246322  /* Each thread can consume from only one queue */ 
    247323  pthread_setspecific(threads_jobq, jobq); 
    248324  pthread_setspecific(jobq->threadenv, &env); 
     325  pthread_cleanup_push(eventer_jobq_cancel_cleanup, jobq); 
    249326 
    250327  while(1) { 
     
    284361    } 
    285362    pthread_mutex_unlock(&job->lock); 
    286      
     363 
    287364    /* Run the job, if we timeout, will be killed with a JOBQ_SIGNAL from 
    288365     * the master thread.  We handle the alarm by longjmp'd out back here. 
    289366     */ 
    290367    job->executor = pthread_self(); 
    291     if(sigsetjmp(env, 1) == 0) { 
     368    if(0 == (job->fd_event->mask & EVENTER_EVIL_BRUTAL) || 
     369       sigsetjmp(env, 1) == 0) { 
    292370      /* We could get hit right here... (timeout and terminated from 
    293371       * another thread.  inflight isn't yet set (next line), so it 
     
    298376        if(!job->timeout_triggered) { 
    299377          noitL(eventer_deb, "%p jobq[%s] -> executing [%p]\n", pthread_self(), jobq->queue_name, job); 
     378          /* Choose the right cancellation policy (or none) */ 
     379          if(job->fd_event->mask & EVENTER_CANCEL_ASYNCH) { 
     380            noitL(eventer_deb, "PTHREAD_CANCEL_ASYNCHRONOUS\n"); 
     381            pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 
     382            pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 
     383          } 
     384          else if(job->fd_event->mask & EVENTER_CANCEL_DEFERRED) { 
     385            noitL(eventer_deb, "PTHREAD_CANCEL_DEFERRED\n"); 
     386            pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 
     387            pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); 
     388          } 
     389          else { 
     390            noitL(eventer_deb, "PTHREAD_CANCEL_DISABLE\n"); 
     391            pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); 
     392          } 
     393          /* run the job */ 
     394          noitL(eventer_deb, "jobq[%s] -> dispatch BEGIN\n", jobq->queue_name); 
    300395          job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_WORK, 
    301396                                  job->fd_event->closure, &job->start_time); 
     397          noitL(eventer_deb, "jobq[%s] -> dispatch END\n", jobq->queue_name); 
     398          if(job->fd_event && job->fd_event->mask & EVENTER_CANCEL) 
     399            pthread_testcancel(); 
     400          /* reset the cancellation policy */ 
     401          pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL); 
     402          pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL); 
    302403        } 
    303404      } 
     
    318419      /* We need to cleanup... we haven't done it yet. */ 
    319420      noitL(eventer_deb, "%p jobq[%s] -> cleanup [%p]\n", pthread_self(), jobq->queue_name, job); 
    320       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP, 
    321                               job->fd_event->closure, &job->finish_time); 
     421      if(job->fd_event) 
     422        job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP, 
     423                                job->fd_event->closure, &job->finish_time); 
    322424    } 
    323425    eventer_jobq_enqueue(jobq->backq, job); 
    324426  } 
     427  pthread_cleanup_pop(0); 
    325428  noit_atomic_dec32(&jobq->concurrency); 
    326429  pthread_exit(NULL); 
     
    329432 
    330433void eventer_jobq_increase_concurrency(eventer_jobq_t *jobq) { 
    331   pthread_t tid; 
    332   pthread_create(&tid, NULL, eventer_jobq_consumer_pthreadentry, jobq); 
     434  noit_atomic_inc32(&jobq->desired_concurrency); 
    333435} 
    334436void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq) { 
    335437  eventer_job_t *job; 
     438  noit_atomic_dec32(&jobq->desired_concurrency); 
    336439  job = calloc(1, sizeof(*job)); 
    337440  eventer_jobq_enqueue(jobq, job); 
  • src/eventer/eventer_jobq.h

    re3106ad r09ecd28  
    5959  void                  (*cleanup)(struct _eventer_job_t *); 
    6060  struct _eventer_job_t  *next; 
     61  struct _eventer_jobq_t *jobq; 
    6162} eventer_job_t; 
    6263 
     
    6667  sem_t                   semaphore; 
    6768  noit_atomic32_t         concurrency; 
     69  noit_atomic32_t         desired_concurrency; 
     70  noit_atomic32_t         pending_cancels; 
    6871  eventer_job_t          *headq; 
    6972  eventer_job_t          *tailq; 
  • src/modules/Makefile.in

    r5fbae1c r09ecd28  
    3131SMODULES=stomp_driver.@MODULEEXT@ 
    3232 
    33 all:    $(MODULES) $(SMODULES) 
     33all:    $(MODULES) $(SMODULES) test_abort.@MODULEEXT@ 
    3434 
    3535.xml.xmlh: 
  • src/noit_check_rest.c

    rb553f9a r09ecd28  
    290290          pint = noit_conf_string_to_int((char *)tmp); 
    291291          xmlFree(tmp); 
    292           if(pint < 5000 || pint > 300000) { 
     292          if(pint < 1000 || pint > 300000) { 
    293293            *error = "invalid period"; 
    294294            return 0; 
  • src/noit_check_tools.c

    r78bf903 r09ecd28  
    113113 
    114114void 
    115 noit_check_run_full_asynch(noit_check_t *check, eventer_func_t callback) { 
     115noit_check_run_full_asynch_opts(noit_check_t *check, eventer_func_t callback, 
     116                                int mask) { 
    116117  struct timeval __now, p_int; 
    117118  eventer_t e; 
    118119  e = eventer_alloc(); 
    119120  e->fd = -1; 
    120   e->mask = EVENTER_ASYNCH;  
     121  e->mask = EVENTER_ASYNCH | mask; 
    121122  gettimeofday(&__now, NULL); 
    122123  memcpy(&e->whence, &__now, sizeof(__now)); 
     
    128129  eventer_add(e); 
    129130} 
     131void 
     132noit_check_run_full_asynch(noit_check_t *check, eventer_func_t callback) { 
     133  noit_check_run_full_asynch_opts(check, callback, 
     134                                  EVENTER_DEFAULT_ASYNCH_ABORT); 
     135} 
    130136 
    131137void 
  • src/noit_check_tools.h

    r4b55797 r09ecd28  
    5252 
    5353API_EXPORT(void) 
     54  noit_check_run_full_asynch_opts(noit_check_t *check, eventer_func_t callback, 
     55                                  int mask); 
     56API_EXPORT(void) 
    5457  noit_check_run_full_asynch(noit_check_t *check, eventer_func_t callback); 
    5558 
  • test/t/testconfig.pm

    re754bda r09ecd28  
    66use Exporter 'import'; 
    77use Data::Dumper; 
     8use IO::File; 
    89use strict; 
    910use vars qw/@EXPORT/; 
    1011 
    1112my $noit_pid = 0; 
     13my $noit_log = undef; 
    1214my $stratcon_pid = 0; 
     15my $stratcon_log = undef; 
    1316 
    1417 
     
    1720             $STRATCON_API_PORT $STRATCON_CLI_PORT 
    1821             $STRATCON_WEB_PORT 
    19              pg make_noit_config start_noit stop_noit 
    20              make_stratcon_config start_stratcon stop_stratcon 
     22             pg make_noit_config start_noit stop_noit get_noit_log 
     23             make_stratcon_config start_stratcon stop_stratcon get_stratcon_log 
    2124             $MODULES_DIR $LUA_DIR $all_noit_modules $all_stratcon_modules); 
    2225 
     
    3134  'mysql' => { 'image' => 'mysql' }, 
    3235  'postgres' => { 'image' => 'postgres' }, 
     36  'test_abort' => { 'image' => 'test_abort' }, 
    3337  'varnish' => { 'loader' => 'lua', 'object' => 'noit.module.varnish' }, 
    3438  'http' => { 'loader' => 'lua', 'object' => 'noit.module.http' }, 
     
    125129    <console_output> 
    126130      <outlet name="stderr"/> 
    127       <log name="error" disabled="$opts->{logs_error}->{''}"/> 
    128       <log name="debug" disabled="$opts->{logs_debug}->{''}"/> 
     131      <log name="error" disabled="$opts->{logs_error}->{''}" timestamps="true"/> 
     132      <log name="debug" disabled="$opts->{logs_debug}->{''}" timestamps="true"/> 
    129133    </console_output> 
    130134    <feeds> 
     
    480484} 
    481485 
     486$SIG{CHLD} = sub { 
     487  my $pid = wait; 
     488  $noit_pid = 0 if($pid == $noit_pid); 
     489  $stratcon_pid = 0 if($pid == $stratcon_pid); 
     490}; 
     491 
    482492sub start_noit { 
    483493  my $name = shift; 
     
    488498  $noit_pid = fork(); 
    489499  mkdir "logs"; 
     500  $noit_log = "logs/${name}_noit.log"; 
    490501  if($noit_pid == 0) { 
     502    $noit_pid = $$; 
     503    $noit_log = "logs/${name}_noit.log"; 
    491504    close(STDIN); 
    492505    open(STDIN, "</dev/null"); 
     
    494507    open(STDOUT, ">/dev/null"); 
    495508    close(STDERR); 
    496     open(STDERR, ">logs/${name}_noit.log"); 
     509    open(STDERR, ">$noit_log"); 
    497510    my @args = ( 'noitd', '-D', '-c', $conf ); 
    498511    exec { '../../src/noitd' } @args; 
     
    501514  return $noit_pid; 
    502515} 
     516sub get_noit_log { 
     517  return IO::File->new("<$noit_log"); 
     518} 
    503519sub stop_noit { 
    504   kill 9, $noit_pid if($noit_pid && kill 1, $noit_pid); 
     520  return 0 unless ($noit_pid && kill 0, $noit_pid); 
     521  kill 9, $noit_pid; 
    505522  $noit_pid = 0; 
    506523  return 1; 
     
    515532  $stratcon_pid = fork(); 
    516533  mkdir "logs"; 
     534  $stratcon_log = "logs/${name}_stratcon.log"; 
    517535  if($stratcon_pid == 0) { 
     536    $stratcon_pid = $$; 
     537    $stratcon_log = "logs/${name}_stratcon.log"; 
    518538    close(STDIN); 
    519539    open(STDIN, "</dev/null"); 
     
    521541    open(STDOUT, ">/dev/null"); 
    522542    close(STDERR); 
    523     open(STDERR, ">logs/${name}_stratcon.log"); 
     543    open(STDERR, ">$stratcon_log"); 
    524544    my @args = ( 'stratcond', '-D', '-c', $conf ); 
    525545    exec { '../../src/stratcond' } @args; 
     
    528548  return $stratcon_pid; 
    529549} 
     550sub get_stratcon_log { 
     551  return IO::File->new("<$stratcon_log"); 
     552} 
    530553sub stop_stratcon { 
    531   kill 9, $stratcon_pid if($stratcon_pid && kill 1, $stratcon_pid); 
     554  return 0 unless ($stratcon_pid && kill 0, $stratcon_pid); 
     555  kill 9, $stratcon_pid; 
    532556  $stratcon_pid = 0; 
    533557  return 1;