Show
Ignore:
Timestamp:
12/17/07 05:43:26 (10 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1197870206 +0000
git-parent:

[cd1ab55918856e8682710ede8ef2e44377887173]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1197870206 +0000
Message:

more work... fleshing out the eventer

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/eventer/eventer_kqueue_impl.c

    rcd1ab55 rb62cf2b  
    77#include "eventer/eventer.h" 
    88#include "utils/noit_atomic.h" 
    9  
     9#include "utils/noit_skiplist.h" 
     10 
     11#include <errno.h> 
     12#include <stdio.h> 
     13#include <stdlib.h> 
    1014#include <sys/event.h> 
    1115#include <pthread.h> 
    12  
    13 int maxfds; 
    14 struct { 
     16#include <assert.h> 
     17 
     18static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */ 
     19static int maxfds; 
     20static struct { 
    1521  eventer_t e; 
    1622  pthread_t executor; 
    1723  noit_spinlock_t lock; 
    18 } **master_fds; 
    19  
    20 int kqueue_fd; 
     24} *master_fds = NULL; 
     25 
     26typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t; 
     27 
     28static ev_lock_state_t 
     29acquire_master_fd(int fd) { 
     30  if(noit_spinlock_trylock(&master_fds[fd].lock)) { 
     31    master_fds[fd].executor = pthread_self(); 
     32    return EV_OWNED; 
     33  } 
     34  if(pthread_equal(master_fds[fd].executor, pthread_self())) { 
     35    return EV_ALREADY_OWNED; 
     36  } 
     37  noit_spinlock_lock(&master_fds[fd].lock); 
     38  master_fds[fd].executor = pthread_self(); 
     39  return EV_OWNED; 
     40
     41static void 
     42release_master_fd(int fd, ev_lock_state_t as) { 
     43  if(as == EV_OWNED) { 
     44    memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor)); 
     45    noit_spinlock_unlock(&master_fds[fd].lock); 
     46  } 
     47
     48 
     49static pthread_t master_thread; 
     50static int kqueue_fd = -1; 
    2151typedef struct kqueue_setup { 
    2252  struct kevent *__ke_vec; 
    2353  unsigned int __ke_vec_a; 
    2454  unsigned int __ke_vec_used; 
    25 } * kqs_t; 
     55} *kqs_t; 
    2656 
    2757static pthread_mutex_t kqs_lock; 
     58static pthread_mutex_t te_lock; 
    2859static kqs_t master_kqs = NULL; 
    2960static pthread_key_t kqueue_setup_key; 
     61static noit_skiplist *timed_events = NULL; 
    3062#define KQUEUE_DECL kqs_t kqs 
    3163#define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key) 
     
    3466#define ke_vec_used kqs->__ke_vec_used 
    3567 
     68static void kqs_init(kqs_t kqs) { 
     69  enum { initial_alloc = 64 }; 
     70  ke_vec_a = initial_alloc; 
     71  ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent)); 
     72} 
    3673static void 
    3774ke_change (register int const ident, 
     
    3976           register int const flags, 
    4077           register void *const udata) { 
    41   enum { initial_alloc = 64 }; 
    4278  register struct kevent *kep; 
    4379  KQUEUE_DECL; 
    44  
    4580  KQUEUE_SETUP; 
     81  if(!kqs) kqs = master_kqs; 
     82 
     83  if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock); 
    4684  if (!ke_vec_a) { 
    47     ke_vec_a = initial_alloc; 
    48     ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent)); 
     85    kqs_init(kqs); 
    4986  } 
    5087  else if (ke_vec_used == ke_vec_a) { 
     
    5693 
    5794  EV_SET(kep, ident, filter, flags, 0, 0, udata); 
     95  if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock); 
    5896} 
    5997 
    6098static int eventer_kqueue_impl_init() { 
    6199  struct rlimit rlim; 
     100  master_thread = pthread_self(); 
    62101  kqueue_fd = kqueue(); 
    63102  if(kqueue_fd == -1) { 
     
    65104  } 
    66105  pthread_mutex_init(&kqs_lock, NULL); 
     106  pthread_mutex_init(&te_lock, NULL); 
     107  pthread_key_create(&kqueue_setup_key, NULL); 
    67108  master_kqs = calloc(1, sizeof(*master_kqs)); 
     109  kqs_init(master_kqs); 
    68110  getrlimit(RLIMIT_NOFILE, &rlim); 
    69111  maxfds = rlim.rlim_cur; 
    70112  master_fds = calloc(maxfds, sizeof(*master_fds)); 
     113  timed_events = calloc(1, sizeof(*timed_events)); 
     114  noit_skiplist_init(timed_events); 
     115  noit_skiplist_set_compare(timed_events, 
     116                            eventer_timecompare, eventer_timecompare); 
     117  noit_skiplist_add_index(timed_events, 
     118                          noit_compare_voidptr, noit_compare_voidptr); 
    71119  return 0; 
    72120} 
     
    75123} 
    76124static void eventer_kqueue_impl_add(eventer_t e) { 
     125  assert(e->mask); 
     126  ev_lock_state_t lockstate; 
     127 
     128  /* Timed events are simple */ 
     129  if(e->mask == EVENTER_TIMER) { 
     130    pthread_mutex_lock(&te_lock); 
     131    noit_skiplist_insert(timed_events, e); 
     132    pthread_mutex_unlock(&te_lock); 
     133    return; 
     134  } 
     135 
     136  /* file descriptor event */ 
     137  lockstate = acquire_master_fd(e->fd); 
     138  master_fds[e->fd].e = e; 
     139  if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     140    ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e); 
     141  if(e->mask & (EVENTER_WRITE)) 
     142    ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e); 
     143  release_master_fd(e->fd, lockstate); 
    77144} 
    78145static void eventer_kqueue_impl_remove(eventer_t e) { 
     146  if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) { 
     147    ev_lock_state_t lockstate; 
     148    lockstate = acquire_master_fd(e->fd); 
     149    if(e == master_fds[e->fd].e) { 
     150      master_fds[e->fd].e = NULL; 
     151      if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     152        ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     153      if(e->mask & (EVENTER_WRITE)) 
     154        ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
     155    } 
     156    release_master_fd(e->fd, lockstate); 
     157  } 
     158  else if(e->mask & EVENTER_TIMER) { 
     159    pthread_mutex_lock(&te_lock); 
     160    noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
     161    pthread_mutex_unlock(&te_lock); 
     162  } 
     163  else { 
     164    abort(); 
     165  } 
    79166} 
    80167static void eventer_kqueue_impl_update(eventer_t e) { 
     168  if(e->mask & EVENTER_TIMER) { 
     169    pthread_mutex_lock(&te_lock); 
     170    noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); 
     171    noit_skiplist_insert(timed_events, e); 
     172    pthread_mutex_unlock(&te_lock); 
     173    return; 
     174  } 
     175  ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     176  ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
     177  if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     178    ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     179  if(e->mask & (EVENTER_WRITE)) 
     180    ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
    81181} 
    82182static eventer_t eventer_kqueue_impl_remove_fd(int fd) { 
     183  eventer_t eiq = NULL; 
     184  ev_lock_state_t lockstate; 
     185  if(master_fds[fd].e) { 
     186    lockstate = acquire_master_fd(fd); 
     187    eiq = master_fds[fd].e; 
     188    master_fds[fd].e = NULL; 
     189    if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     190      ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq); 
     191    if(eiq->mask & (EVENTER_WRITE)) 
     192      ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq); 
     193    release_master_fd(fd, lockstate); 
     194  } 
     195  return eiq; 
    83196} 
    84197static void eventer_kqueue_impl_loop() { 
     198  int is_master_thread = 0; 
     199  pthread_t self; 
     200  KQUEUE_DECL; 
     201  KQUEUE_SETUP; 
     202 
     203  self = pthread_self(); 
     204  if(pthread_equal(self, master_thread)) is_master_thread = 1; 
     205 
     206  if(!kqs) { 
     207    kqs = calloc(1, sizeof(*kqs)); 
     208    kqs_init(kqs); 
     209  } 
     210  pthread_setspecific(kqueue_setup_key, kqs); 
     211  while(1) { 
     212    struct timeval __now, __sleeptime; 
     213    struct timespec __kqueue_sleeptime; 
     214    int fd_cnt = 0; 
     215    int max_timed_events_to_process; 
     216    int newmask; 
     217 
     218    __sleeptime = __max_sleeptime; 
     219 
     220    /* Handle timed events... 
     221     * we could be multithreaded, so if we pop forever we could starve 
     222     * ourselves. */ 
     223    max_timed_events_to_process = timed_events->size; 
     224    while(max_timed_events_to_process-- > 0) { 
     225      eventer_t timed_event; 
     226 
     227      gettimeofday(&__now, NULL); 
     228 
     229      pthread_mutex_lock(&te_lock); 
     230      /* Peek at our next timed event, if should fire, pop it. 
     231       * otherwise we noop and NULL it out to break the loop. */ 
     232      timed_event = noit_skiplist_peek(timed_events); 
     233      if(timed_event) { 
     234        if(compare_timeval(timed_event->whence, __now) < 0) { 
     235          timed_event = noit_skiplist_pop(timed_events, NULL); 
     236        } 
     237        else { 
     238          sub_timeval(timed_event->whence, __now, &__sleeptime); 
     239          timed_event = NULL; 
     240        } 
     241      } 
     242      pthread_mutex_unlock(&te_lock); 
     243      if(timed_event == NULL) break; 
     244 
     245      /* Make our call */ 
     246      newmask = timed_event->callback(timed_event, EVENTER_TIMER, 
     247                                      timed_event->closure, &__now); 
     248      if(newmask) 
     249        eventer_add(timed_event); 
     250      else 
     251        eventer_free(timed_event); 
     252    } 
     253 
     254    if(compare_timeval(__max_sleeptime, __sleeptime) < 0) { 
     255      /* we exceed our configured maximum, set it down */ 
     256      memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime)); 
     257    } 
     258 
     259    /* If we're the master, we need to lock the master_kqs and make mods */ 
     260    if(master_kqs->__ke_vec_used) { 
     261      struct timespec __zerotime = { 0, 0 }; 
     262      pthread_mutex_lock(&kqs_lock); 
     263      fd_cnt = kevent(kqueue_fd, 
     264                      master_kqs->__ke_vec, master_kqs->__ke_vec_used, 
     265                      NULL, 0, 
     266                      &__zerotime); 
     267      if(fd_cnt < 0) { 
     268        fprintf(stderr, "kevent: %s\n", strerror(errno)); 
     269      } 
     270      master_kqs->__ke_vec_used = 0; 
     271      pthread_mutex_unlock(&kqs_lock); 
     272    } 
     273 
     274    /* Now we move on to our fd-based events */ 
     275    __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec; 
     276    __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000; 
     277    fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used, 
     278                    ke_vec, ke_vec_a, 
     279                    &__kqueue_sleeptime); 
     280    ke_vec_used = 0; 
     281    if(fd_cnt < 0) { 
     282      fprintf(stderr, "kevent: %s\n", strerror(errno)); 
     283    } 
     284    else { 
     285      int idx; 
     286      for(idx = 0; idx < fd_cnt; idx++) { 
     287        ev_lock_state_t lockstate; 
     288        struct kevent *ke; 
     289        eventer_t e; 
     290        int fd, evmask, oldmask; 
     291 
     292        ke = &ke_vec[idx]; 
     293        e = (eventer_t)ke->udata; 
     294        fd = ke->ident; 
     295        assert(e == master_fds[fd].e); 
     296        lockstate = acquire_master_fd(fd); 
     297        assert(lockstate == EV_OWNED); 
     298 
     299        evmask = 0; 
     300        if(ke->filter == EVFILT_READ) evmask = EVENTER_READ; 
     301        if(ke->filter == EVFILT_WRITE) evmask = EVENTER_WRITE; 
     302        gettimeofday(&__now, NULL); 
     303        oldmask = e->mask; 
     304        newmask = e->callback(e, evmask, e->closure, &__now); 
     305 
     306        if(newmask) { 
     307          /* toggle the read bits if needed */ 
     308          if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) { 
     309            if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))) 
     310              ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e); 
     311          } 
     312          else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)) 
     313            ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e); 
     314   
     315          /* toggle the write bits if needed */ 
     316          if(newmask & EVENTER_WRITE) { 
     317            if(!(oldmask & EVENTER_WRITE)) 
     318              ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e); 
     319          } 
     320          else if(oldmask & EVENTER_WRITE) 
     321              ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e); 
     322   
     323          /* Set our mask */ 
     324          e->mask = newmask; 
     325        } 
     326        else { 
     327          eventer_free(e); 
     328        } 
     329        release_master_fd(fd, lockstate); 
     330      } 
     331    } 
     332  } 
    85333} 
    86334