Changeset 31d42e564259174c10ca8df5d8b206096a29c957

Show
Ignore:
Timestamp:
12/03/09 05:57:10 (8 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1259819830 +0000
git-parent:

[e2d923ccebf661d3dbbe5e85c1465e089adb7e23]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1259819830 +0000
Message:

This pulls the timer stuff into the shared base and consolidates a lot
of repeated code across the different scheduler implementations.

times and fdevents are API exposed now and the console exposes them via:

show eventer debug timers
show eventer debug sockets

(the console stuff need to be cleaned up to support autocomplete)

refs #221

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/eventer/eventer.h

    r57cd7b0 r31d42e5  
    3636#include "noit_defines.h" 
    3737#include "utils/noit_log.h" 
     38#include "utils/noit_atomic.h" 
    3839#include <sys/time.h> 
    3940#include <sys/socket.h> 
     
    9697                      eventer_callback_for_name(const char *name); 
    9798 
     99 
    98100typedef struct _eventer_impl { 
    99101  const char         *name; 
     
    107109  void              (*trigger)(eventer_t e, int mask); 
    108110  int               (*loop)(); 
     111  void              (*foreach_fdevent)(void (*f)(eventer_t, void *), void *); 
     112  struct timeval    max_sleeptime; 
     113  int               maxfds; 
     114  struct { 
     115    eventer_t e; 
     116    pthread_t executor; 
     117    noit_spinlock_t lock; 
     118  }                 *master_fds; 
    109119} *eventer_impl_t; 
    110120 
     
    128138#define eventer_loop          __eventer->loop 
    129139#define eventer_trigger       __eventer->trigger 
     140#define eventer_max_sleeptime __eventer->max_sleeptime 
     141#define eventer_foreach_fdevent  __eventer->foreach_fdevent 
    130142 
    131143extern eventer_impl_t registered_eventers[]; 
     
    137149API_EXPORT(int) eventer_impl_init(); 
    138150API_EXPORT(void) eventer_add_asynch(eventer_jobq_t *q, eventer_t e); 
     151API_EXPORT(void) eventer_add_timed(eventer_t e); 
     152API_EXPORT(eventer_t) eventer_remove_timed(eventer_t e); 
     153API_EXPORT(void) eventer_update_timed(eventer_t e, int mask); 
     154API_EXPORT(void) eventer_dispatch_timed(struct timeval *now, 
     155                                        struct timeval *next); 
     156API_EXPORT(void) 
     157  eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure); 
    139158API_EXPORT(void) eventer_dispatch_recurrent(struct timeval *now); 
    140159API_EXPORT(eventer_t) eventer_remove_recurrent(eventer_t e); 
  • src/eventer/eventer_epoll_impl.c

    r88a7178 r31d42e5  
    4545#include <assert.h> 
    4646 
    47 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */ 
    48 static int maxfds; 
    49 static struct { 
    50   eventer_t e; 
    51   pthread_t executor; 
    52   noit_spinlock_t lock; 
    53 } *master_fds = NULL; 
     47struct _eventer_impl eventer_epoll_impl; 
     48#define LOCAL_EVENTER eventer_epoll_impl 
     49#define LOCAL_EVENTER_foreach_fdevent eventer_epoll_impl_foreach_fdevent 
     50#define maxfds LOCAL_EVENTER.maxfds 
     51#define master_fds LOCAL_EVENTER.master_fds 
     52 
     53#include "eventer/eventer_impl_private.h" 
     54 
     55static pthread_t master_thread; 
    5456static int *masks; 
    55  
    56 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t; 
    57  
    58 static ev_lock_state_t 
    59 acquire_master_fd(int fd) { 
    60   if(noit_spinlock_trylock(&master_fds[fd].lock)) { 
    61     master_fds[fd].executor = pthread_self(); 
    62     return EV_OWNED; 
    63   } 
    64   if(pthread_equal(master_fds[fd].executor, pthread_self())) { 
    65     return EV_ALREADY_OWNED; 
    66   } 
    67   noit_spinlock_lock(&master_fds[fd].lock); 
    68   master_fds[fd].executor = pthread_self(); 
    69   return EV_OWNED; 
    70 } 
    71 static void 
    72 release_master_fd(int fd, ev_lock_state_t as) { 
    73   if(as == EV_OWNED) { 
    74     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor)); 
    75     noit_spinlock_unlock(&master_fds[fd].lock); 
    76   } 
    77 } 
    78  
    79 static pthread_t master_thread; 
    8057static int epoll_fd = -1; 
    81 static pthread_mutex_t te_lock; 
    82 static noit_skiplist *timed_events = NULL; 
    8358 
    8459static int eventer_epoll_impl_init() { 
     
    9570    return -1; 
    9671  } 
    97   pthread_mutex_init(&te_lock, NULL); 
    9872  getrlimit(RLIMIT_NOFILE, &rlim); 
    9973  maxfds = rlim.rlim_cur; 
    10074  master_fds = calloc(maxfds, sizeof(*master_fds)); 
    10175  masks = calloc(maxfds, sizeof(*masks)); 
    102   timed_events = calloc(1, sizeof(*timed_events)); 
    103   noit_skiplist_init(timed_events); 
    104   noit_skiplist_set_compare(timed_events, 
    105                             eventer_timecompare, eventer_timecompare); 
    106   noit_skiplist_add_index(timed_events, 
    107                           noit_compare_voidptr, noit_compare_voidptr); 
    10876  return 0; 
    10977} 
     
    133101  /* Timed events are simple */ 
    134102  if(e->mask & EVENTER_TIMER) { 
    135     pthread_mutex_lock(&te_lock); 
    136     noit_skiplist_insert(timed_events, e); 
    137     pthread_mutex_unlock(&te_lock); 
     103    eventer_add_timed(e); 
    138104    return; 
    139105  } 
     
    172138  } 
    173139  else if(e->mask & EVENTER_TIMER) { 
    174     pthread_mutex_lock(&te_lock); 
    175     if(noit_skiplist_remove_compare(timed_events, e, NULL, 
    176                                     noit_compare_voidptr)) 
    177       removed = e; 
    178     pthread_mutex_unlock(&te_lock); 
     140    removed = eventer_remove_timed(e); 
    179141  } 
    180142  else if(e->mask & EVENTER_RECURRENT) { 
     
    189151  struct epoll_event _ev; 
    190152  if(e->mask & EVENTER_TIMER) { 
    191     assert(e->mask & EVENTER_TIMER); 
    192     pthread_mutex_lock(&te_lock); 
    193     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
    194     noit_skiplist_insert(timed_events, e); 
    195     pthread_mutex_unlock(&te_lock); 
     153    eventer_update_timed(e,mask); 
    196154    return; 
    197155  } 
     
    275233    struct timeval __now, __sleeptime; 
    276234    int fd_cnt = 0; 
    277     int max_timed_events_to_process; 
    278     int newmask; 
    279  
    280     __sleeptime = __max_sleeptime; 
    281  
    282     /* Handle timed events... 
    283      * we could be multithreaded, so if we pop forever we could starve 
    284      * ourselves. */ 
    285     max_timed_events_to_process = timed_events->size; 
    286     while(max_timed_events_to_process-- > 0) { 
    287       eventer_t timed_event; 
    288  
    289       gettimeofday(&__now, NULL); 
    290  
    291       pthread_mutex_lock(&te_lock); 
    292       /* Peek at our next timed event, if should fire, pop it. 
    293        * otherwise we noop and NULL it out to break the loop. */ 
    294       timed_event = noit_skiplist_peek(timed_events); 
    295       if(timed_event) { 
    296         if(compare_timeval(timed_event->whence, __now) < 0) { 
    297           timed_event = noit_skiplist_pop(timed_events, NULL); 
    298         } 
    299         else { 
    300           sub_timeval(timed_event->whence, __now, &__sleeptime); 
    301           timed_event = NULL; 
    302         } 
    303       } 
    304       pthread_mutex_unlock(&te_lock); 
    305       if(timed_event == NULL) break; 
    306  
    307       /* Make our call */ 
    308       newmask = timed_event->callback(timed_event, EVENTER_TIMER, 
    309                                       timed_event->closure, &__now); 
    310       if(newmask) 
    311         eventer_add(timed_event); 
    312       else 
    313         eventer_free(timed_event); 
    314     } 
    315  
    316     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) { 
    317       /* we exceed our configured maximum, set it down */ 
    318       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime)); 
    319     } 
     235 
     236    __sleeptime = eventer_max_sleeptime; 
     237 
     238    eventer_dispatch_timed(&__now, &__sleeptime); 
    320239 
    321240    /* Handle recurrent events */ 
     
    369288  eventer_epoll_impl_find_fd, 
    370289  eventer_epoll_impl_trigger, 
    371   eventer_epoll_impl_loop 
     290  eventer_epoll_impl_loop, 
     291  eventer_epoll_impl_foreach_fdevent, 
     292  { 0, 200000 }, 
     293  0, 
     294  NULL 
    372295}; 
  • src/eventer/eventer_impl.c

    r28b073c r31d42e5  
    3434#include "eventer/eventer.h" 
    3535#include "utils/noit_log.h" 
     36#include "utils/noit_skiplist.h" 
    3637#include <pthread.h> 
    3738#include <assert.h> 
     
    3940static struct timeval *eventer_impl_epoch = NULL; 
    4041static int EVENTER_DEBUGGING = 0; 
     42static pthread_mutex_t te_lock; 
     43static noit_skiplist *timed_events = NULL; 
    4144 
    4245#ifdef HAVE_KQUEUE 
     
    129132  eventer_impl_epoch = malloc(sizeof(struct timeval)); 
    130133  gettimeofday(eventer_impl_epoch, NULL); 
     134  pthread_mutex_init(&te_lock, NULL); 
     135    timed_events = calloc(1, sizeof(*timed_events)); 
     136  noit_skiplist_init(timed_events); 
     137  noit_skiplist_set_compare(timed_events, 
     138                            eventer_timecompare, eventer_timecompare); 
     139  noit_skiplist_add_index(timed_events, 
     140                          noit_compare_voidptr, noit_compare_voidptr); 
    131141 
    132142  eventer_err = noit_log_stream_find("error/eventer"); 
     
    170180} 
    171181 
     182void eventer_add_timed(eventer_t e) { 
     183  assert(e->mask & EVENTER_TIMER); 
     184  if(EVENTER_DEBUGGING) { 
     185    const char *cbname; 
     186    cbname = eventer_name_for_callback(e->callback); 
     187    noitL(eventer_deb, "debug: eventer_add timed (%s)\n", 
     188          cbname ? cbname : "???"); 
     189  } 
     190  pthread_mutex_lock(&te_lock); 
     191  noit_skiplist_insert(timed_events, e); 
     192  pthread_mutex_unlock(&te_lock); 
     193} 
     194eventer_t eventer_remove_timed(eventer_t e) { 
     195  eventer_t removed = NULL; 
     196  assert(e->mask & EVENTER_TIMER); 
     197  pthread_mutex_lock(&te_lock); 
     198  if(noit_skiplist_remove_compare(timed_events, e, NULL, 
     199                                  noit_compare_voidptr)) 
     200    removed = e; 
     201  pthread_mutex_unlock(&te_lock); 
     202  return removed; 
     203} 
     204void eventer_update_timed(eventer_t e, int mask) { 
     205  assert(mask & EVENTER_TIMER); 
     206  pthread_mutex_lock(&te_lock); 
     207  noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
     208  noit_skiplist_insert(timed_events, e); 
     209  pthread_mutex_unlock(&te_lock); 
     210} 
     211void eventer_dispatch_timed(struct timeval *now, struct timeval *next) { 
     212  int max_timed_events_to_process; 
     213    /* Handle timed events... 
     214     * we could be multithreaded, so if we pop forever we could starve 
     215     * ourselves. */ 
     216  max_timed_events_to_process = timed_events->size; 
     217  while(max_timed_events_to_process-- > 0) { 
     218    int newmask; 
     219    eventer_t timed_event; 
     220 
     221    gettimeofday(now, NULL); 
     222 
     223    pthread_mutex_lock(&te_lock); 
     224    /* Peek at our next timed event, if should fire, pop it. 
     225     * otherwise we noop and NULL it out to break the loop. */ 
     226    timed_event = noit_skiplist_peek(timed_events); 
     227    if(timed_event) { 
     228      if(compare_timeval(timed_event->whence, *now) < 0) { 
     229        timed_event = noit_skiplist_pop(timed_events, NULL); 
     230      } 
     231      else { 
     232        sub_timeval(timed_event->whence, *now, next); 
     233        timed_event = NULL; 
     234      } 
     235    } 
     236    pthread_mutex_unlock(&te_lock); 
     237    if(timed_event == NULL) break; 
     238    if(EVENTER_DEBUGGING) { 
     239      const char *cbname; 
     240      cbname = eventer_name_for_callback(timed_event->callback); 
     241      noitLT(eventer_deb, now, "debug: timed dispatch(%s)\n", 
     242             cbname ? cbname : "???"); 
     243    } 
     244    /* Make our call */ 
     245    newmask = timed_event->callback(timed_event, EVENTER_TIMER, 
     246                                    timed_event->closure, now); 
     247    if(newmask) 
     248      eventer_add_timed(timed_event); 
     249    else 
     250      eventer_free(timed_event); 
     251  } 
     252 
     253  if(compare_timeval(eventer_max_sleeptime, *next) < 0) { 
     254    /* we exceed our configured maximum, set it down */ 
     255    memcpy(next, &eventer_max_sleeptime, sizeof(*next)); 
     256  } 
     257} 
     258void 
     259eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure) { 
     260  noit_skiplist_node *iter = NULL; 
     261  pthread_mutex_lock(&te_lock); 
     262  for(iter = noit_skiplist_getlist(timed_events); iter; 
     263      noit_skiplist_next(timed_events,&iter)) { 
     264    if(iter->data) f(iter->data, closure); 
     265  } 
     266  pthread_mutex_unlock(&te_lock); 
     267} 
     268 
    172269void eventer_dispatch_recurrent(struct timeval *now) { 
    173270  struct recurrent_events *node; 
  • src/eventer/eventer_kqueue_impl.c

    r6492d5c r31d42e5  
    4444#include <assert.h> 
    4545 
    46 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */ 
    47 static int maxfds; 
    48 static struct { 
    49   eventer_t e; 
    50   pthread_t executor; 
    51   noit_spinlock_t lock; 
    52 } *master_fds = NULL; 
    53 static int *masks; 
    54  
    55 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t; 
    56  
    57 static ev_lock_state_t 
    58 acquire_master_fd(int fd) { 
    59   if(noit_spinlock_trylock(&master_fds[fd].lock)) { 
    60     master_fds[fd].executor = pthread_self(); 
    61     return EV_OWNED; 
    62   } 
    63   if(pthread_equal(master_fds[fd].executor, pthread_self())) { 
    64     return EV_ALREADY_OWNED; 
    65   } 
    66   noit_spinlock_lock(&master_fds[fd].lock); 
    67   master_fds[fd].executor = pthread_self(); 
    68   return EV_OWNED; 
    69 
    70 static void 
    71 release_master_fd(int fd, ev_lock_state_t as) { 
    72   if(as == EV_OWNED) { 
    73     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor)); 
    74     noit_spinlock_unlock(&master_fds[fd].lock); 
    75   } 
    76 
     46struct _eventer_impl eventer_kqueue_impl; 
     47#define LOCAL_EVENTER eventer_kqueue_impl 
     48#define LOCAL_EVENTER_foreach_fdevent eventer_kqueue_impl_foreach_fdevent 
     49#define LOCAL_EVENTER_foreach_timedevent eventer_kqueue_impl_foreach_timedevent 
     50#define maxfds LOCAL_EVENTER.maxfds 
     51#define master_fds LOCAL_EVENTER.master_fds 
     52 
     53#include "eventer/eventer_impl_private.h" 
    7754 
    7855static pthread_t master_thread; 
     
    8562 
    8663static pthread_mutex_t kqs_lock; 
    87 static pthread_mutex_t te_lock; 
    8864static kqs_t master_kqs = NULL; 
    8965static pthread_key_t kqueue_setup_key; 
    90 static noit_skiplist *timed_events = NULL
     66static int *masks
    9167#define KQUEUE_DECL kqs_t kqs 
    9268#define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key) 
     
    12197  kep = &ke_vec[ke_vec_used++]; 
    12298 
    123   EV_SET(kep, ident, filter, flags, 0, 0, (void *)e->fd); 
     99  EV_SET(kep, ident, filter, flags, 0, 0, (void *)(vpsized_int)e->fd); 
    124100  noitL(eventer_deb, "debug: ke_change(fd:%d, filt:%x, flags:%x)\n", 
    125101        ident, filter, flags); 
     
    141117  } 
    142118  pthread_mutex_init(&kqs_lock, NULL); 
    143   pthread_mutex_init(&te_lock, NULL); 
    144119  pthread_key_create(&kqueue_setup_key, NULL); 
    145120  master_kqs = calloc(1, sizeof(*master_kqs)); 
     
    149124  master_fds = calloc(maxfds, sizeof(*master_fds)); 
    150125  masks = calloc(maxfds, sizeof(*masks)); 
    151   timed_events = calloc(1, sizeof(*timed_events)); 
    152   noit_skiplist_init(timed_events); 
    153   noit_skiplist_set_compare(timed_events, 
    154                             eventer_timecompare, eventer_timecompare); 
    155   noit_skiplist_add_index(timed_events, 
    156                           noit_compare_voidptr, noit_compare_voidptr); 
    157126  return 0; 
    158127} 
     
    185154  /* Timed events are simple */ 
    186155  if(e->mask & EVENTER_TIMER) { 
    187     noitL(eventer_deb, "debug: eventer_add timed (%s)\n", cbname ? cbname : "???"); 
    188     pthread_mutex_lock(&te_lock); 
    189     noit_skiplist_insert(timed_events, e); 
    190     pthread_mutex_unlock(&te_lock); 
     156    eventer_add_timed(e); 
    191157    return; 
    192158  } 
     
    223189  } 
    224190  else if(e->mask & EVENTER_TIMER) { 
    225     pthread_mutex_lock(&te_lock); 
    226     if(noit_skiplist_remove_compare(timed_events, e, NULL, 
    227                                     noit_compare_voidptr)) 
    228       removed = e; 
    229     pthread_mutex_unlock(&te_lock); 
     191    removed = eventer_remove_timed(e); 
    230192  } 
    231193  else if(e->mask & EVENTER_RECURRENT) { 
     
    239201static void eventer_kqueue_impl_update(eventer_t e, int mask) { 
    240202  if(e->mask & EVENTER_TIMER) { 
    241     assert(mask & EVENTER_TIMER); 
    242     pthread_mutex_lock(&te_lock); 
    243     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
    244     noit_skiplist_insert(timed_events, e); 
    245     pthread_mutex_unlock(&te_lock); 
     203    eventer_update_timed(e, mask); 
    246204    return; 
    247205  } 
     
    370328  pthread_setspecific(kqueue_setup_key, kqs); 
    371329  while(1) { 
    372     const char *cbname; 
    373330    struct timeval __now, __sleeptime; 
    374331    struct timespec __kqueue_sleeptime; 
    375332    int fd_cnt = 0; 
    376     int max_timed_events_to_process; 
    377     int newmask; 
    378  
    379     __sleeptime = __max_sleeptime; 
    380  
    381     /* Handle timed events... 
    382      * we could be multithreaded, so if we pop forever we could starve 
    383      * ourselves. */ 
    384     max_timed_events_to_process = timed_events->size; 
    385     while(max_timed_events_to_process-- > 0) { 
    386       eventer_t timed_event; 
    387  
    388       gettimeofday(&__now, NULL); 
    389  
    390       pthread_mutex_lock(&te_lock); 
    391       /* Peek at our next timed event, if should fire, pop it. 
    392        * otherwise we noop and NULL it out to break the loop. */ 
    393       timed_event = noit_skiplist_peek(timed_events); 
    394       if(timed_event) { 
    395         if(compare_timeval(timed_event->whence, __now) < 0) { 
    396           timed_event = noit_skiplist_pop(timed_events, NULL); 
    397         } 
    398         else { 
    399           sub_timeval(timed_event->whence, __now, &__sleeptime); 
    400           timed_event = NULL; 
    401         } 
    402       } 
    403       pthread_mutex_unlock(&te_lock); 
    404       if(timed_event == NULL) break; 
    405  
    406       cbname = eventer_name_for_callback(timed_event->callback); 
    407       noitLT(eventer_deb, &__now, "debug: timed dispatch(%s)\n", cbname ? cbname : "???"); 
    408       /* Make our call */ 
    409       newmask = timed_event->callback(timed_event, EVENTER_TIMER, 
    410                                       timed_event->closure, &__now); 
    411       if(newmask) 
    412         eventer_add(timed_event); 
    413       else 
    414         eventer_free(timed_event); 
    415     } 
    416  
    417     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) { 
    418       /* we exceed our configured maximum, set it down */ 
    419       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime)); 
    420     } 
     333 
     334    __sleeptime = eventer_max_sleeptime; 
     335 
     336    eventer_dispatch_timed(&__now, &__sleeptime); 
    421337 
    422338    /* Handle recurrent events */ 
     
    480396          continue; 
    481397        } 
    482         assert((int)ke->udata == ke->ident); 
     398        assert((vpsized_int)ke->udata == (vpsized_int)ke->ident); 
    483399        fd = ke->ident; 
    484400        e = master_fds[fd].e; 
     
    507423  eventer_kqueue_impl_find_fd, 
    508424  eventer_kqueue_impl_trigger, 
    509   eventer_kqueue_impl_loop 
     425  eventer_kqueue_impl_loop, 
     426  eventer_kqueue_impl_foreach_fdevent, 
     427  { 0, 200000 }, 
     428  0, 
     429  NULL 
    510430}; 
  • src/eventer/eventer_ports_impl.c

    r45dc60a r31d42e5  
    4747#define MAX_PORT_EVENTS 1024 
    4848 
    49 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */ 
    50 static int maxfds; 
    51 static struct { 
    52   eventer_t e; 
    53   pthread_t executor; 
    54   noit_spinlock_t lock; 
    55 } *master_fds = NULL; 
    56  
    57 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t; 
    58  
    59 static ev_lock_state_t 
    60 acquire_master_fd(int fd) { 
    61   if(noit_spinlock_trylock(&master_fds[fd].lock)) { 
    62     master_fds[fd].executor = pthread_self(); 
    63     return EV_OWNED; 
    64   } 
    65   if(pthread_equal(master_fds[fd].executor, pthread_self())) { 
    66     return EV_ALREADY_OWNED; 
    67   } 
    68   noit_spinlock_lock(&master_fds[fd].lock); 
    69   master_fds[fd].executor = pthread_self(); 
    70   return EV_OWNED; 
    71 
    72 static void 
    73 release_master_fd(int fd, ev_lock_state_t as) { 
    74   if(as == EV_OWNED) { 
    75     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor)); 
    76     noit_spinlock_unlock(&master_fds[fd].lock); 
    77   } 
    78 
     49struct _eventer_impl eventer_ports_impl; 
     50#define LOCAL_EVENTER eventer_ports_impl 
     51#define LOCAL_EVENTER_foreach_fdevent eventer_ports_impl_foreach_fdevent 
     52#define LOCAL_EVENTER_foreach_timedevent eventer_ports_impl_foreach_timedevent 
     53#define maxfds LOCAL_EVENTER.maxfds 
     54#define master_fds LOCAL_EVENTER.master_fds 
     55 
     56#include "eventer/eventer_impl_private.h" 
    7957 
    8058static pthread_t master_thread; 
    8159static int port_fd = -1; 
    82  
    83 static pthread_mutex_t te_lock; 
    84 static noit_skiplist *timed_events = NULL; 
    8560 
    8661static int eventer_ports_impl_init() { 
     
    9772    return -1; 
    9873  } 
    99   pthread_mutex_init(&te_lock, NULL); 
    10074  getrlimit(RLIMIT_NOFILE, &rlim); 
    10175  maxfds = rlim.rlim_cur; 
    10276  master_fds = calloc(maxfds, sizeof(*master_fds)); 
    103   timed_events = calloc(1, sizeof(*timed_events)); 
    104   noit_skiplist_init(timed_events); 
    105   noit_skiplist_set_compare(timed_events, 
    106                             eventer_timecompare, eventer_timecompare); 
    107   noit_skiplist_add_index(timed_events, 
    108                           noit_compare_voidptr, noit_compare_voidptr); 
    10977  return 0; 
    11078} 
     
    158126  /* Timed events are simple */ 
    159127  if(e->mask & EVENTER_TIMER) { 
    160     noitL(eventer_deb, "debug: eventer_add timed (%s)\n", cbname ? cbname : "???"); 
    161     pthread_mutex_lock(&te_lock); 
    162     noit_skiplist_insert(timed_events, e); 
    163     pthread_mutex_unlock(&te_lock); 
     128    eventer_add_timed(e); 
    164129    return; 
    165130  } 
     
    188153  } 
    189154  else if(e->mask & EVENTER_TIMER) { 
    190     pthread_mutex_lock(&te_lock); 
    191     if(noit_skiplist_remove_compare(timed_events, e, NULL, 
    192                                     noit_compare_voidptr)) 
    193       removed = e; 
    194     pthread_mutex_unlock(&te_lock); 
     155    removed = eventer_remove_timed(e); 
    195156  } 
    196157  else if(e->mask & EVENTER_RECURRENT) { 
     
    204165static void eventer_ports_impl_update(eventer_t e, int mask) { 
    205166  if(e->mask & EVENTER_TIMER) { 
    206     assert(mask & EVENTER_TIMER); 
    207     pthread_mutex_lock(&te_lock); 
    208     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
    209     noit_skiplist_insert(timed_events, e); 
    210     pthread_mutex_unlock(&te_lock); 
     167    eventer_update_timed(e,mask); 
    211168    return; 
    212169  } 
     
    300257    int newmask; 
    301258 
    302     __sleeptime = __max_sleeptime; 
    303  
    304     /* Handle timed events... 
    305      * we could be multithreaded, so if we pop forever we could starve 
    306      * ourselves. */ 
    307     max_timed_events_to_process = timed_events->size; 
    308     while(max_timed_events_to_process-- > 0) { 
    309       eventer_t timed_event; 
    310  
    311       gettimeofday(&__now, NULL); 
    312  
    313       pthread_mutex_lock(&te_lock); 
    314       /* Peek at our next timed event, if should fire, pop it. 
    315        * otherwise we noop and NULL it out to break the loop. */ 
    316       timed_event = noit_skiplist_peek(timed_events); 
    317       if(timed_event) { 
    318         if(compare_timeval(timed_event->whence, __now) < 0) { 
    319           timed_event = noit_skiplist_pop(timed_events, NULL); 
    320         } 
    321         else { 
    322           sub_timeval(timed_event->whence, __now, &__sleeptime); 
    323           timed_event = NULL; 
    324         } 
    325       } 
    326       pthread_mutex_unlock(&te_lock); 
    327       if(timed_event == NULL) break; 
    328  
    329       cbname = eventer_name_for_callback(timed_event->callback); 
    330       noitLT(eventer_deb, &__now, "debug: timed dispatch(%s)\n", cbname ? cbname : "???"); 
    331       /* Make our call */ 
    332       newmask = timed_event->callback(timed_event, EVENTER_TIMER, 
    333                                       timed_event->closure, &__now); 
    334       if(newmask) 
    335         eventer_add(timed_event); 
    336       else 
    337         eventer_free(timed_event); 
    338     } 
    339  
    340     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) { 
    341       /* we exceed our configured maximum, set it down */ 
    342       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime)); 
    343     } 
     259    __sleeptime = eventer_max_sleeptime; 
     260 
     261    eventer_dispatch_timed(&__now, &__sleeptime); 
    344262 
    345263    /* Handle recurrent events */ 
     
    408326  eventer_ports_impl_find_fd, 
    409327  eventer_ports_impl_trigger, 
    410   eventer_ports_impl_loop 
     328  eventer_ports_impl_loop, 
     329  eventer_ports_impl_foreach_fdevent, 
     330  { 0, 200000 }, 
     331  0, 
     332  NULL 
    411333}; 
  • src/noit_console_state.c

    r88a7178 r31d42e5  
    5555  return strcasecmp(a->name, b->name); 
    5656} 
     57static void 
     58noit_console_spit_event(eventer_t e, void *c) { 
     59  struct timeval now, diff; 
     60  noit_console_closure_t ncct = c; 
     61  char fdstr[12]; 
     62  char wfn[42]; 
     63  char funcptr[20]; 
     64  const char *cname; 
     65 
     66  cname = eventer_name_for_callback(e->callback); 
     67  snprintf(fdstr, sizeof(fdstr), " fd: %d", e->fd); 
     68  gettimeofday(&now, NULL); 
     69  sub_timeval(e->whence, now, &diff); 
     70  snprintf(wfn, sizeof(wfn), " fires: %lld.%06ds", (long long)diff.tv_sec, diff.tv_usec); 
     71  snprintf(funcptr, sizeof(funcptr), "%p", e->callback); 
     72  nc_printf(ncct, "  [%p]%s%s [%c%c%c%c] -> %s(%p)\n", 
     73            e, 
     74            e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION) ? fdstr : "", 
     75            e->mask & (EVENTER_TIMER) ?  wfn : "", 
     76            e->mask & EVENTER_READ ? 'r' : '-', 
     77            e->mask & EVENTER_WRITE ? 'w' : '-', 
     78            e->mask & EVENTER_EXCEPTION ? 'e' : '-', 
     79            e->mask & EVENTER_TIMER ? 't' : '-', 
     80            cname ? cname : funcptr, e->closure); 
     81} 
     82static int 
     83noit_console_eventer(noit_console_closure_t ncct, int argc, char **argv, 
     84                     noit_console_state_t *dstate, void *unused) { 
     85  if(argc < 2 || strcmp(argv[0], "debug")) { 
     86    nc_printf(ncct, "eventer <debug> ...\n"); 
     87    return -1; 
     88  } 
     89  if(argc == 2) { 
     90    if(!strcmp(argv[1], "sockets")) { 
     91      eventer_foreach_fdevent(noit_console_spit_event, ncct); 
     92      return 0; 
     93    } 
     94    if(!strcmp(argv[1], "timers")) { 
     95      eventer_foreach_timedevent(noit_console_spit_event, ncct); 
     96      return 0; 
     97    } 
     98  } 
     99  nc_printf(ncct, "eventer command not understood\n"); 
     100  return -1; 
     101} 
    57102 
    58103cmd_info_t console_command_help = { 
     
    67112cmd_info_t console_command_restart = { 
    68113  "restart", noit_console_restart, NULL, NULL, NULL 
     114}; 
     115cmd_info_t console_command_eventer = { 
     116  "eventer", noit_console_eventer, NULL, NULL, NULL 
    69117}; 
    70118 
     
    411459  static noit_console_state_t *_top_level_state = NULL; 
    412460  if(!_top_level_state) { 
    413     static noit_console_state_t *no_state
     461    static noit_console_state_t *no_state, *show_state
    414462    _top_level_state = noit_console_state_alloc(); 
    415463    noit_console_state_add_cmd(_top_level_state, &console_command_exit); 
     464    show_state = noit_console_state_alloc(); 
    416465    noit_console_state_add_cmd(_top_level_state, 
    417466      NCSCMD("show", noit_console_state_delegate, noit_console_opt_delegate, 
    418              noit_console_state_alloc(), NULL)); 
     467             show_state, NULL)); 
    419468    no_state = noit_console_state_alloc(); 
    420469    noit_console_state_add_cmd(_top_level_state, 
     
    424473    noit_console_state_add_cmd(_top_level_state, &console_command_shutdown); 
    425474    noit_console_state_add_cmd(_top_level_state, &console_command_restart); 
     475    noit_console_state_add_cmd(show_state, &console_command_eventer); 
    426476  } 
    427477  return _top_level_state;