Changeset b62cf2be087943dcb29b6e068bd4262862fcb17d

Show
Ignore:
Timestamp:
12/17/07 05:43:26 (7 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1197870206 +0000
git-parent:

[cd1ab55918856e8682710ede8ef2e44377887173]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1197870206 +0000
Message:

more work... fleshing out the eventer

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • configure.in

    rcd1ab55 rb62cf2b  
    1212AC_PATH_PROGS(PERL, perl) 
    1313AC_SUBST(PERL) 
     14 
     15if test "x$CC" = "xgcc" ; then 
     16  CFLAGS="$CFLAGS -g -Wall" 
     17fi 
    1418 
    1519# Checks for data types 
  • src/eventer/eventer.c

    r01751d3 rb62cf2b  
    88  e->opset = eventer_POSIX_fd_opset; 
    99  return e; 
     10} 
     11 
     12int eventer_timecompare(void *av, void *bv) { 
     13  /* Herein we avoid equality.  This function is only used as a comparator 
     14   * for a heap of timed events.  If they are equal, b is considered less 
     15   * just to maintain an order (despite it not being stable). 
     16   */ 
     17  eventer_t a = (eventer_t)av; 
     18  eventer_t b = (eventer_t)bv; 
     19  if(a->whence.tv_sec < b->whence.tv_sec) return -1; 
     20  if(a->whence.tv_sec == b->whence.tv_sec && 
     21     a->whence.tv_usec < b->whence.tv_usec) return -1; 
     22  return 1; 
    1023} 
    1124 
  • src/eventer/eventer.h

    r01751d3 rb62cf2b  
    4343struct _event { 
    4444  eventer_func_t      callback; 
    45   struct timeval     *whence; 
     45  struct timeval      whence; 
    4646  int                 fd; 
    4747  int                 mask; 
     
    5252API_EXPORT(eventer_t) eventer_alloc(); 
    5353API_EXPORT(void)      eventer_free(eventer_t); 
     54API_EXPORT(int)       eventer_timecompare(void *a, void *b); 
    5455 
    5556typedef struct _eventer_impl { 
  • src/eventer/eventer_POSIX_fd_opset.c

    r01751d3 rb62cf2b  
    88 
    99#include <sys/socket.h> 
     10#include <unistd.h> 
    1011 
    1112static int 
  • src/eventer/eventer_kqueue_impl.c

    rcd1ab55 rb62cf2b  
    77#include "eventer/eventer.h" 
    88#include "utils/noit_atomic.h" 
    9  
     9#include "utils/noit_skiplist.h" 
     10 
     11#include <errno.h> 
     12#include <stdio.h> 
     13#include <stdlib.h> 
    1014#include <sys/event.h> 
    1115#include <pthread.h> 
    12  
    13 int maxfds; 
    14 struct { 
     16#include <assert.h> 
     17 
     18static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */ 
     19static int maxfds; 
     20static struct { 
    1521  eventer_t e; 
    1622  pthread_t executor; 
    1723  noit_spinlock_t lock; 
    18 } **master_fds; 
    19  
    20 int kqueue_fd; 
     24} *master_fds = NULL; 
     25 
     26typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t; 
     27 
     28static ev_lock_state_t 
     29acquire_master_fd(int fd) { 
     30  if(noit_spinlock_trylock(&master_fds[fd].lock)) { 
     31    master_fds[fd].executor = pthread_self(); 
     32    return EV_OWNED; 
     33  } 
     34  if(pthread_equal(master_fds[fd].executor, pthread_self())) { 
     35    return EV_ALREADY_OWNED; 
     36  } 
     37  noit_spinlock_lock(&master_fds[fd].lock); 
     38  master_fds[fd].executor = pthread_self(); 
     39  return EV_OWNED; 
     40
     41static void 
     42release_master_fd(int fd, ev_lock_state_t as) { 
     43  if(as == EV_OWNED) { 
     44    memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor)); 
     45    noit_spinlock_unlock(&master_fds[fd].lock); 
     46  } 
     47
     48 
     49static pthread_t master_thread; 
     50static int kqueue_fd = -1; 
    2151typedef struct kqueue_setup { 
    2252  struct kevent *__ke_vec; 
    2353  unsigned int __ke_vec_a; 
    2454  unsigned int __ke_vec_used; 
    25 } * kqs_t; 
     55} *kqs_t; 
    2656 
    2757static pthread_mutex_t kqs_lock; 
     58static pthread_mutex_t te_lock; 
    2859static kqs_t master_kqs = NULL; 
    2960static pthread_key_t kqueue_setup_key; 
     61static noit_skiplist *timed_events = NULL; 
    3062#define KQUEUE_DECL kqs_t kqs 
    3163#define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key) 
     
    3466#define ke_vec_used kqs->__ke_vec_used 
    3567 
     68static void kqs_init(kqs_t kqs) { 
     69  enum { initial_alloc = 64 }; 
     70  ke_vec_a = initial_alloc; 
     71  ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent)); 
     72} 
    3673static void 
    3774ke_change (register int const ident, 
     
    3976           register int const flags, 
    4077           register void *const udata) { 
    41   enum { initial_alloc = 64 }; 
    4278  register struct kevent *kep; 
    4379  KQUEUE_DECL; 
    44  
    4580  KQUEUE_SETUP; 
     81  if(!kqs) kqs = master_kqs; 
     82 
     83  if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock); 
    4684  if (!ke_vec_a) { 
    47     ke_vec_a = initial_alloc; 
    48     ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent)); 
     85    kqs_init(kqs); 
    4986  } 
    5087  else if (ke_vec_used == ke_vec_a) { 
     
    5693 
    5794  EV_SET(kep, ident, filter, flags, 0, 0, udata); 
     95  if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock); 
    5896} 
    5997 
    6098static int eventer_kqueue_impl_init() { 
    6199  struct rlimit rlim; 
     100  master_thread = pthread_self(); 
    62101  kqueue_fd = kqueue(); 
    63102  if(kqueue_fd == -1) { 
     
    65104  } 
    66105  pthread_mutex_init(&kqs_lock, NULL); 
     106  pthread_mutex_init(&te_lock, NULL); 
     107  pthread_key_create(&kqueue_setup_key, NULL); 
    67108  master_kqs = calloc(1, sizeof(*master_kqs)); 
     109  kqs_init(master_kqs); 
    68110  getrlimit(RLIMIT_NOFILE, &rlim); 
    69111  maxfds = rlim.rlim_cur; 
    70112  master_fds = calloc(maxfds, sizeof(*master_fds)); 
     113  timed_events = calloc(1, sizeof(*timed_events)); 
     114  noit_skiplist_init(timed_events); 
     115  noit_skiplist_set_compare(timed_events, 
     116                            eventer_timecompare, eventer_timecompare); 
     117  noit_skiplist_add_index(timed_events, 
     118                          noit_compare_voidptr, noit_compare_voidptr); 
    71119  return 0; 
    72120} 
     
    75123} 
    76124static void eventer_kqueue_impl_add(eventer_t e) { 
     125  assert(e->mask); 
     126  ev_lock_state_t lockstate; 
     127 
     128  /* Timed events are simple */ 
     129  if(e->mask == EVENTER_TIMER) { 
     130    pthread_mutex_lock(&te_lock); 
     131    noit_skiplist_insert(timed_events, e); 
     132    pthread_mutex_unlock(&te_lock); 
     133    return; 
     134  } 
     135 
     136  /* file descriptor event */ 
     137  lockstate = acquire_master_fd(e->fd); 
     138  master_fds[e->fd].e = e; 
     139  if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     140    ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e); 
     141  if(e->mask & (EVENTER_WRITE)) 
     142    ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e); 
     143  release_master_fd(e->fd, lockstate); 
    77144} 
    78145static void eventer_kqueue_impl_remove(eventer_t e) { 
     146  if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) { 
     147    ev_lock_state_t lockstate; 
     148    lockstate = acquire_master_fd(e->fd); 
     149    if(e == master_fds[e->fd].e) { 
     150      master_fds[e->fd].e = NULL; 
     151      if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     152        ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     153      if(e->mask & (EVENTER_WRITE)) 
     154        ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
     155    } 
     156    release_master_fd(e->fd, lockstate); 
     157  } 
     158  else if(e->mask & EVENTER_TIMER) { 
     159    pthread_mutex_lock(&te_lock); 
     160    noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
     161    pthread_mutex_unlock(&te_lock); 
     162  } 
     163  else { 
     164    abort(); 
     165  } 
    79166} 
    80167static void eventer_kqueue_impl_update(eventer_t e) { 
     168  if(e->mask & EVENTER_TIMER) { 
     169    pthread_mutex_lock(&te_lock); 
     170    noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
     171    noit_skiplist_insert(timed_events, e); 
     172    pthread_mutex_unlock(&te_lock); 
     173    return; 
     174  } 
     175  ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     176  ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
     177  if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     178    ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     179  if(e->mask & (EVENTER_WRITE)) 
     180    ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
    81181} 
    82182static eventer_t eventer_kqueue_impl_remove_fd(int fd) { 
     183  eventer_t eiq = NULL; 
     184  ev_lock_state_t lockstate; 
     185  if(master_fds[fd].e) { 
     186    lockstate = acquire_master_fd(fd); 
     187    eiq = master_fds[fd].e; 
     188    master_fds[fd].e = NULL; 
     189    if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     190      ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq); 
     191    if(eiq->mask & (EVENTER_WRITE)) 
     192      ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq); 
     193    release_master_fd(fd, lockstate); 
     194  } 
     195  return eiq; 
    83196} 
    84197static void eventer_kqueue_impl_loop() { 
     198  int is_master_thread = 0; 
     199  pthread_t self; 
     200  KQUEUE_DECL; 
     201  KQUEUE_SETUP; 
     202 
     203  self = pthread_self(); 
     204  if(pthread_equal(self, master_thread)) is_master_thread = 1; 
     205 
     206  if(!kqs) { 
     207    kqs = calloc(1, sizeof(*kqs)); 
     208    kqs_init(kqs); 
     209  } 
     210  pthread_setspecific(kqueue_setup_key, kqs); 
     211  while(1) { 
     212    struct timeval __now, __sleeptime; 
     213    struct timespec __kqueue_sleeptime; 
     214    int fd_cnt = 0; 
     215    int max_timed_events_to_process; 
     216    int newmask; 
     217 
     218    __sleeptime = __max_sleeptime; 
     219 
     220    /* Handle timed events... 
     221     * we could be multithreaded, so if we pop forever we could starve 
     222     * ourselves. */ 
     223    max_timed_events_to_process = timed_events->size; 
     224    while(max_timed_events_to_process-- > 0) { 
     225      eventer_t timed_event; 
     226 
     227      gettimeofday(&__now, NULL); 
     228 
     229      pthread_mutex_lock(&te_lock); 
     230      /* Peek at our next timed event, if should fire, pop it. 
     231       * otherwise we noop and NULL it out to break the loop. */ 
     232      timed_event = noit_skiplist_peek(timed_events); 
     233      if(timed_event) { 
     234        if(compare_timeval(timed_event->whence, __now) < 0) { 
     235          timed_event = noit_skiplist_pop(timed_events, NULL); 
     236        } 
     237        else { 
     238          sub_timeval(timed_event->whence, __now, &__sleeptime); 
     239          timed_event = NULL; 
     240        } 
     241      } 
     242      pthread_mutex_unlock(&te_lock); 
     243      if(timed_event == NULL) break; 
     244 
     245      /* Make our call */ 
     246      newmask = timed_event->callback(timed_event, EVENTER_TIMER, 
     247                                      timed_event->closure, &__now); 
     248      if(newmask) 
     249        eventer_add(timed_event); 
     250      else 
     251        eventer_free(timed_event); 
     252    } 
     253 
     254    if(compare_timeval(__max_sleeptime, __sleeptime) < 0) { 
     255      /* we exceed our configured maximum, set it down */ 
     256      memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime)); 
     257    } 
     258 
     259    /* If we're the master, we need to lock the master_kqs and make mods */ 
     260    if(master_kqs->__ke_vec_used) { 
     261      struct timespec __zerotime = { 0, 0 }; 
     262      pthread_mutex_lock(&kqs_lock); 
     263      fd_cnt = kevent(kqueue_fd, 
     264                      master_kqs->__ke_vec, master_kqs->__ke_vec_used, 
     265                      NULL, 0, 
     266                      &__zerotime); 
     267      if(fd_cnt < 0) { 
     268        fprintf(stderr, "kevent: %s\n", strerror(errno)); 
     269      } 
     270      master_kqs->__ke_vec_used = 0; 
     271      pthread_mutex_unlock(&kqs_lock); 
     272    } 
     273 
     274    /* Now we move on to our fd-based events */ 
     275    __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec; 
     276    __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000; 
     277    fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used, 
     278                    ke_vec, ke_vec_a, 
     279                    &__kqueue_sleeptime); 
     280    ke_vec_used = 0; 
     281    if(fd_cnt < 0) { 
     282      fprintf(stderr, "kevent: %s\n", strerror(errno)); 
     283    } 
     284    else { 
     285      int idx; 
     286      for(idx = 0; idx < fd_cnt; idx++) { 
     287        ev_lock_state_t lockstate; 
     288        struct kevent *ke; 
     289        eventer_t e; 
     290        int fd, evmask, oldmask; 
     291 
     292        ke = &ke_vec[idx]; 
     293        e = (eventer_t)ke->udata; 
     294        fd = ke->ident; 
     295        assert(e == master_fds[fd].e); 
     296        lockstate = acquire_master_fd(fd); 
     297        assert(lockstate == EV_OWNED); 
     298 
     299        evmask = 0; 
     300        if(ke->filter == EVFILT_READ) evmask = EVENTER_READ; 
     301        if(ke->filter == EVFILT_WRITE) evmask = EVENTER_WRITE; 
     302        gettimeofday(&__now, NULL); 
     303        oldmask = e->mask; 
     304        newmask = e->callback(e, evmask, e->closure, &__now); 
     305 
     306        if(newmask) { 
     307          /* toggle the read bits if needed */ 
     308          if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) { 
     309            if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))) 
     310              ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e); 
     311          } 
     312          else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     313            ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     314   
     315          /* toggle the write bits if needed */ 
     316          if(newmask & EVENTER_WRITE) { 
     317            if(!(oldmask & EVENTER_WRITE)) 
     318              ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e); 
     319          } 
     320          else if(oldmask & EVENTER_WRITE) 
     321              ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
     322   
     323          /* Set our mask */ 
     324          e->mask = newmask; 
     325        } 
     326        else { 
     327          eventer_free(e); 
     328        } 
     329        release_master_fd(fd, lockstate); 
     330      } 
     331    } 
     332  } 
    85333} 
    86334 
  • src/noit_defines.h

    r01751d3 rb62cf2b  
    66#define API_EXPORT(type) extern type 
    77 
     8static inline int compare_timeval(struct timeval a, struct timeval b) { 
     9  if (a.tv_sec < b.tv_sec) return -1; 
     10  if (a.tv_sec > b.tv_sec) return 1; 
     11  if (a.tv_usec < b.tv_usec) return -1; 
     12  if (a.tv_usec > b.tv_usec) return 1; 
     13  return 0; 
     14} 
     15 
     16static inline void sub_timeval(struct timeval a, struct timeval b, 
     17                               struct timeval *out) 
     18{ 
     19  out->tv_usec = a.tv_usec - b.tv_usec; 
     20  if (out->tv_usec < 0L) { 
     21    a.tv_sec--; 
     22    out->tv_usec += 1000000L; 
     23  } 
     24  out->tv_sec = a.tv_sec - b.tv_sec; 
     25  if (out->tv_sec < 0L) { 
     26    out->tv_sec++; 
     27    out->tv_usec -= 1000000L; 
     28  } 
     29} 
     30 
    831#endif 
  • src/noitd.c

    rcd1ab55 rb62cf2b  
    55#include <stdlib.h> 
    66 
     7int stdin_handler(eventer_t e, int mask, void *closure, struct timeval *now) { 
     8  fprintf(stderr, "in stdin_handler:\n"); 
     9  return EVENTER_READ; 
     10} 
     11void stdin_sample() { 
     12  eventer_t e; 
     13  e = eventer_alloc(); 
     14  e->fd = 0; 
     15  e->mask = EVENTER_READ; 
     16  e->callback = stdin_handler; 
     17  eventer_add(e); 
     18} 
    719int main(int argc, char **argv) { 
    820  if(eventer_choose("kqueue") == -1) { 
     
    1426    exit(-1); 
    1527  } 
     28 
     29  stdin_sample(); 
     30  eventer_loop(); 
    1631  return 0; 
    1732} 
  • src/utils/Makefile.in

    r01751d3 rb62cf2b  
    1010top_srcdir=@top_srcdir@ 
    1111 
    12 OBJS=noit_hash.o 
     12OBJS=noit_hash.o noit_skiplist.o 
    1313 
    1414all:    libnoit_utils.a