Changeset 3b3b432b41dd3bfb80c144aa7ba28e75daa2337f

Show
Ignore:
Timestamp:
02/18/08 21:52:18 (10 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1203371538 +0000
git-parent:

[40c5e311a891e372d5370e352f37231f70701098]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1203371538 +0000
Message:

asynchronous job queues

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/eventer/Makefile.in

    r01751d3 r3b3b432  
    1212OBJS=   eventer.o eventer_impl.o \ 
    1313        eventer_kqueue_impl.o \ 
    14         eventer_POSIX_fd_opset.o 
     14        eventer_POSIX_fd_opset.o \ 
     15        eventer_jobq.o 
    1516 
    1617all:    libeventer.a 
  • src/eventer/eventer.h

    raa127f7 r3b3b432  
    1111#include <sys/socket.h> 
    1212 
    13 #define EVENTER_READ       0x01 
    14 #define EVENTER_WRITE      0x02 
    15 #define EVENTER_EXCEPTION  0x04 
    16 #define EVENTER_TIMER      0x08 
    17 #define EVENTER_ASYNCH     0x10 
     13#define EVENTER_READ             0x01 
     14#define EVENTER_WRITE            0x02 
     15#define EVENTER_EXCEPTION        0x04 
     16#define EVENTER_TIMER            0x08 
     17#define EVENTER_ASYNCH_WORK      0x10 
     18#define EVENTER_ASYNCH_CLEANUP   0x20 
     19#define EVENTER_ASYNCH           (EVENTER_ASYNCH_WORK | EVENTER_ASYNCH_CLEANUP) 
     20#define EVENTER_RECURRENT        0x80 
    1821 
    1922/* All of these functions act like their POSIX couterparts with two 
     
    6568  int               (*propset)(const char *key, const char *value); 
    6669  void              (*add)(eventer_t e); 
    67   void              (*remove)(eventer_t e); 
     70  eventer_t         (*remove)(eventer_t e); 
    6871  void              (*update)(eventer_t e); 
    6972  eventer_t         (*remove_fd)(int fd); 
  • src/eventer/eventer_impl.c

    r01751d3 r3b3b432  
    77#include "eventer/eventer.h" 
    88#include "eventer/eventer_impl.h" 
     9#include "eventer/eventer_jobq.h" 
     10#include "utils/noit_log.h" 
     11#include <pthread.h> 
     12#include <assert.h> 
    913 
    1014#ifdef HAVE_KQUEUE 
     
    2024 
    2125eventer_impl_t __eventer = NULL; 
     26 
     27 
     28static int __default_queue_threads = 5; 
     29static eventer_jobq_t __global_backq, __default_jobq; 
     30static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER; 
     31struct recurrent_events { 
     32  eventer_t e; 
     33  struct recurrent_events *next; 
     34} *recurrent_events = NULL; 
     35 
     36 
     37int eventer_impl_propset(const char *key, const char *value) { 
     38  if(!strcasecmp(key, "default_queue_threads")) { 
     39    __default_queue_threads = atoi(value); 
     40    if(__default_queue_threads < 1) { 
     41      noitL(noit_error, "default_queue_threads must be >= 1\n"); 
     42      return -1; 
     43    } 
     44    return 0; 
     45  } 
     46  return -1; 
     47} 
     48 
     49int eventer_impl_init() { 
     50  int i; 
     51  eventer_t e; 
     52  eventer_jobq_init(&__global_backq); 
     53  e = eventer_alloc(); 
     54  e->mask = EVENTER_RECURRENT; 
     55  e->closure = &__global_backq; 
     56  e->callback = eventer_jobq_consume_available; 
     57 
     58  /* We call directly here as we may not be completely initialized */ 
     59  eventer_add_recurrent(e); 
     60 
     61  eventer_jobq_init(&__default_jobq); 
     62  __default_jobq.backq = &__global_backq; 
     63  for(i=0; i<__default_queue_threads; i++) 
     64    eventer_jobq_increase_concurrency(&__default_jobq); 
     65  return 0; 
     66} 
     67 
     68void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) { 
     69  eventer_job_t *job; 
     70  job = calloc(1, sizeof(*job)); 
     71  job->fd_event = e; 
     72  gettimeofday(&job->create_time, NULL); 
     73  if(e->whence.tv_sec) { 
     74    job->timeout_event = eventer_alloc(); 
     75    memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence)); 
     76    job->timeout_event->mask = EVENTER_TIMER; 
     77    job->timeout_event->closure = job; 
     78    job->timeout_event->callback = eventer_jobq_execute_timeout; 
     79    eventer_add(job->timeout_event); 
     80  } 
     81  eventer_jobq_enqueue(q ? q : &__default_jobq, job); 
     82} 
     83 
     84void eventer_dispatch_recurrent(struct timeval *now) { 
     85  struct recurrent_events *node; 
     86  struct timeval __now; 
     87  if(!now) { 
     88    gettimeofday(&__now, NULL); 
     89    now = &__now; 
     90  } 
     91  pthread_mutex_lock(&recurrent_lock); 
     92  for(node = recurrent_events; node; node = node->next) { 
     93    node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now); 
     94  } 
     95  pthread_mutex_unlock(&recurrent_lock); 
     96} 
     97eventer_t eventer_remove_recurrent(eventer_t e) { 
     98  struct recurrent_events *node, *prev = NULL; 
     99  pthread_mutex_lock(&recurrent_lock); 
     100  for(node = recurrent_events; node; node = node->next) { 
     101    if(node->e == e) { 
     102      if(prev) prev->next = node->next; 
     103      else recurrent_events = node->next; 
     104      free(node); 
     105      pthread_mutex_unlock(&recurrent_lock); 
     106      return e; 
     107    } 
     108    prev = node; 
     109  } 
     110  pthread_mutex_unlock(&recurrent_lock); 
     111  return NULL; 
     112} 
     113void eventer_add_recurrent(eventer_t e) { 
     114  struct recurrent_events *node; 
     115  assert(e->mask & EVENTER_RECURRENT); 
     116  pthread_mutex_lock(&recurrent_lock); 
     117  for(node = recurrent_events; node; node = node->next) 
     118    if(node->e == e) { 
     119      pthread_mutex_unlock(&recurrent_lock); 
     120      return; 
     121    } 
     122  node = calloc(1, sizeof(*node)); 
     123  node->e = e; 
     124  node->next = recurrent_events; 
     125  recurrent_events = node; 
     126  pthread_mutex_unlock(&recurrent_lock); 
     127} 
     128 
  • src/eventer/eventer_impl.h

    r01751d3 r3b3b432  
    99#include "noit_defines.h" 
    1010#include "eventer/eventer.h" 
     11#include "eventer/eventer_jobq.h" 
    1112 
    1213extern eventer_impl_t registered_eventers[]; 
    1314 
     15int eventer_impl_propset(const char *key, const char *value); 
     16int eventer_impl_init(); 
     17void eventer_add_asynch(eventer_jobq_t *q, eventer_t e); 
     18void eventer_dispatch_recurrent(struct timeval *now); 
     19eventer_t eventer_remove_recurrent(eventer_t e); 
     20void eventer_add_recurrent(eventer_t e); 
     21 
    1422#endif 
  • src/eventer/eventer_kqueue_impl.c

    r31ecb27 r3b3b432  
    99#include "utils/noit_skiplist.h" 
    1010#include "utils/noit_log.h" 
     11#include "eventer/eventer_impl.h" 
    1112 
    1213#include <errno.h> 
     
    100101static int eventer_kqueue_impl_init() { 
    101102  struct rlimit rlim; 
     103  int rv; 
     104 
     105  /* super init */ 
     106  if((rv = eventer_impl_init()) != 0) return rv; 
     107 
    102108  master_thread = pthread_self(); 
    103109  signal(SIGPIPE, SIG_IGN); 
     
    124130} 
    125131static int eventer_kqueue_impl_propset(const char *key, const char *value) { 
    126   return -1; 
     132  if(eventer_impl_propset(key, value)) { 
     133    /* Do our kqueue local properties here */ 
     134    return -1; 
     135  } 
     136  return 0; 
    127137} 
    128138static void eventer_kqueue_impl_add(eventer_t e) { 
    129139  assert(e->mask); 
    130140  ev_lock_state_t lockstate; 
     141 
     142  if(e->mask & EVENTER_ASYNCH) { 
     143    eventer_add_asynch(NULL, e); 
     144    return; 
     145  } 
     146 
     147  /* Recurrent delegation */ 
     148  if(e->mask & EVENTER_RECURRENT) { 
     149    eventer_add_recurrent(e); 
     150    return; 
     151  } 
     152 
    131153  /* Timed events are simple */ 
    132   if(e->mask == EVENTER_TIMER) { 
     154  if(e->mask & EVENTER_TIMER) { 
    133155    pthread_mutex_lock(&te_lock); 
    134156    noit_skiplist_insert(timed_events, e); 
     
    146168  release_master_fd(e->fd, lockstate); 
    147169} 
    148 static void eventer_kqueue_impl_remove(eventer_t e) { 
     170static eventer_t eventer_kqueue_impl_remove(eventer_t e) { 
     171  eventer_t removed = NULL; 
     172  if(e->mask & EVENTER_ASYNCH) { 
     173    abort(); 
     174  } 
    149175  if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) { 
    150176    ev_lock_state_t lockstate; 
    151177    lockstate = acquire_master_fd(e->fd); 
    152178    if(e == master_fds[e->fd].e) { 
     179      removed = e; 
    153180      master_fds[e->fd].e = NULL; 
    154181      if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     
    161188  else if(e->mask & EVENTER_TIMER) { 
    162189    pthread_mutex_lock(&te_lock); 
    163     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
     190    if(noit_skiplist_remove_compare(timed_events, e, NULL, 
     191                                    noit_compare_voidptr)) 
     192      removed = e; 
    164193    pthread_mutex_unlock(&te_lock); 
     194  } 
     195  else if(e->mask & EVENTER_RECURRENT) { 
     196    removed = eventer_remove_recurrent(e); 
    165197  } 
    166198  else { 
    167199    abort(); 
    168200  } 
     201  return removed; 
    169202} 
    170203static void eventer_kqueue_impl_update(eventer_t e) { 
     
    262295      memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime)); 
    263296    } 
     297 
     298    /* Handle recurrent events */ 
     299    eventer_dispatch_recurrent(&__now); 
    264300 
    265301    /* If we're the master, we need to lock the master_kqs and make mods */ 
  • src/noit_conf.c

    re781d1e r3b3b432  
    775775                         noit_console_state_t *state, void *closure) { 
    776776  int i, cnt, titled = 0, cliplen = 0; 
    777   const char *path, *basepath = NULL; 
     777  const char *path = "", *basepath = NULL; 
    778778  char xpath[1024]; 
    779779  noit_conf_t_userdata_t *info = NULL; 
  • src/noitd.c

    r1afde4e r3b3b432  
    1010#include "eventer/eventer.h" 
    1111#include "utils/noit_log.h" 
     12#include "utils/noit_hash.h" 
    1213#include "noit_listener.h" 
    1314#include "noit_console.h" 
    1415#include "noit_module.h" 
    1516#include "noit_conf.h" 
     17 
     18int test_asynch_cb(eventer_t e, int mask, void *closure, struct timeval *now) { 
     19  time_t tmp; 
     20  int seconds = (int)closure; 
     21 
     22  noitL(noit_error, "%d: test_asynch_cb fired on (%p) mask 0x%x\n", 
     23        (int)time(&tmp), e, mask); 
     24  if(mask & EVENTER_ASYNCH_WORK) { 
     25    noitL(noit_error, "%d: Starting test_asynch_cb(%p) for %d seconds\n", 
     26          (int)time(&tmp), e, seconds); 
     27    sleep(seconds); 
     28    noitL(noit_error, "%d: Finishing up test_asynch_cb(%p)\n", (int)time(&tmp), e); 
     29  } 
     30  if(mask & EVENTER_ASYNCH_CLEANUP) { 
     31    noitL(noit_error, "%d: Cleaning up test_asynch_cb(%p)\n", (int)time(&tmp), e); 
     32  } 
     33  e->mask = 0; 
     34  return 0; 
     35} 
     36void test_asynch() { 
     37  eventer_t e; 
     38 
     39  e = eventer_alloc(); 
     40  e->mask = EVENTER_ASYNCH; 
     41  gettimeofday(&e->whence, NULL); e->whence.tv_sec += 10; 
     42  e->closure = (void *)5; 
     43  e->callback = test_asynch_cb; 
     44  eventer_add(e); 
     45 
     46  e = eventer_alloc(); 
     47  e->mask = EVENTER_ASYNCH; 
     48  gettimeofday(&e->whence, NULL); e->whence.tv_sec += 2; 
     49  e->closure = (void *)10; 
     50  e->callback = test_asynch_cb; 
     51  eventer_add(e); 
     52} 
    1653 
    1754static char *config_file = ETC_DIR "/noit.conf"; 
     
    3471} 
    3572 
     73static 
     74int configure_eventer() { 
     75  int rv = 0; 
     76  noit_hash_table *table; 
     77  table = noit_conf_get_hash(NULL, "/noit/eventer/config/*"); 
     78  if(table) { 
     79    noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     80    const char *key, *value; 
     81    int klen; 
     82    while(noit_hash_next(table, &iter, &key, &klen, (void **)&value)) { 
     83      int subrv; 
     84      if((subrv = eventer_propset(key, value)) != 0) 
     85        rv = subrv; 
     86    } 
     87    noit_hash_destroy(table, free, free); 
     88    free(table); 
     89  } 
     90  return rv; 
     91} 
     92 
    3693int main(int argc, char **argv) { 
    3794  char conf_str[1024]; 
     95 
    3896  parse_clargs(argc, argv); 
    3997 
     
    66124    exit(-1); 
    67125  } 
     126  if(configure_eventer() != 0) { 
     127    noitL(noit_stderr, "Cannot configure eventer\n"); 
     128    exit(-1); 
     129  } 
    68130  if(eventer_init() == -1) { 
    69131    noitL(noit_stderr, "Cannot init eventer %s\n", conf_str); 
     
    75137  noit_listener_init(); 
    76138 
     139  test_asynch(); 
     140 
    77141  eventer_loop(); 
    78142  return 0; 
  • src/sample.conf

    re781d1e r3b3b432  
    33  <eventer> 
    44    <implementation>kqueue</implementation> 
     5    <config> 
     6      <default_queue_threads>10</default_queue_threads> 
     7    </config> 
    58  </eventer> 
    69  <modules> 
  • src/utils/noit_atomic.h

    r6a58044 r3b3b432  
    1919#include <libkern/OSAtomic.h> 
    2020typedef OSSpinLock noit_spinlock_t; 
    21 #define noit_atomic_cas32(ref,new,old) OSAtomicCompareAndSwap32(old,new,ref
    22 #define noit_atomic_cas64(ref,new,old) OSAtomicCompareAndSwap64(old,new,ref
     21#define noit_atomic_cas32(ref,new,old) (OSAtomicCompareAndSwap32(old,new,ref) ? old : new
     22#define noit_atomic_cas64(ref,new,old) (OSAtomicCompareAndSwap64(old,new,ref) ? old : new
    2323#define noit_atomic_inc32(ref) OSAtomicIncrement32(ref) 
    2424#define noit_atomic_inc64(ref) OSAtomicIncrement64(ref)