root/src/eventer/eventer_kqueue_impl.c

Revision df2e1eb8a0d6b779a1327d8df4d867b961ce4b07, 14.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fixed #66

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/event.h>
16 #include <pthread.h>
17 #include <assert.h>
18
19 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
20 static int maxfds;
21 static struct {
22   eventer_t e;
23   pthread_t executor;
24   noit_spinlock_t lock;
25 } *master_fds = NULL;
26 static int *masks;
27
28 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
29
30 static ev_lock_state_t
31 acquire_master_fd(int fd) {
32   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
33     master_fds[fd].executor = pthread_self();
34     return EV_OWNED;
35   }
36   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
37     return EV_ALREADY_OWNED;
38   }
39   noit_spinlock_lock(&master_fds[fd].lock);
40   master_fds[fd].executor = pthread_self();
41   return EV_OWNED;
42 }
43 static void
44 release_master_fd(int fd, ev_lock_state_t as) {
45   if(as == EV_OWNED) {
46     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
47     noit_spinlock_unlock(&master_fds[fd].lock);
48   }
49 }
50
51 static pthread_t master_thread;
52 static int kqueue_fd = -1;
53 typedef struct kqueue_setup {
54   struct kevent *__ke_vec;
55   unsigned int __ke_vec_a;
56   unsigned int __ke_vec_used;
57 } *kqs_t;
58
59 static pthread_mutex_t kqs_lock;
60 static pthread_mutex_t te_lock;
61 static kqs_t master_kqs = NULL;
62 static pthread_key_t kqueue_setup_key;
63 static noit_skiplist *timed_events = NULL;
64 #define KQUEUE_DECL kqs_t kqs
65 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
66 #define ke_vec kqs->__ke_vec
67 #define ke_vec_a kqs->__ke_vec_a
68 #define ke_vec_used kqs->__ke_vec_used
69
70 static void kqs_init(kqs_t kqs) {
71   enum { initial_alloc = 64 };
72   ke_vec_a = initial_alloc;
73   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
74 }
75 static void
76 ke_change (register int const ident,
77            register int const filter,
78            register int const flags,
79            register void *const udata) {
80   register struct kevent *kep;
81   KQUEUE_DECL;
82   KQUEUE_SETUP;
83   if(!kqs) kqs = master_kqs;
84
85   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
86   if (!ke_vec_a) {
87     kqs_init(kqs);
88   }
89   else if (ke_vec_used == ke_vec_a) {
90     ke_vec_a <<= 1;
91     ke_vec = (struct kevent *) realloc(ke_vec,
92                                        ke_vec_a * sizeof (struct kevent));
93   }
94   kep = &ke_vec[ke_vec_used++];
95
96   EV_SET(kep, ident, filter, flags, 0, 0, udata);
97   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
98 }
99
100 static int eventer_kqueue_impl_init() {
101   struct rlimit rlim;
102   int rv;
103
104   /* super init */
105   if((rv = eventer_impl_init()) != 0) return rv;
106
107   master_thread = pthread_self();
108   signal(SIGPIPE, SIG_IGN);
109   kqueue_fd = kqueue();
110   if(kqueue_fd == -1) {
111     return -1;
112   }
113   pthread_mutex_init(&kqs_lock, NULL);
114   pthread_mutex_init(&te_lock, NULL);
115   pthread_key_create(&kqueue_setup_key, NULL);
116   master_kqs = calloc(1, sizeof(*master_kqs));
117   kqs_init(master_kqs);
118   getrlimit(RLIMIT_NOFILE, &rlim);
119   maxfds = rlim.rlim_cur;
120   master_fds = calloc(maxfds, sizeof(*master_fds));
121   masks = calloc(maxfds, sizeof(*masks));
122   timed_events = calloc(1, sizeof(*timed_events));
123   noit_skiplist_init(timed_events);
124   noit_skiplist_set_compare(timed_events,
125                             eventer_timecompare, eventer_timecompare);
126   noit_skiplist_add_index(timed_events,
127                           noit_compare_voidptr, noit_compare_voidptr);
128   return 0;
129 }
130 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
131   if(eventer_impl_propset(key, value)) {
132     /* Do our kqueue local properties here */
133     return -1;
134   }
135   return 0;
136 }
137 static void eventer_kqueue_impl_add(eventer_t e) {
138   assert(e->mask);
139   ev_lock_state_t lockstate;
140   const char *cbname;
141   cbname = eventer_name_for_callback(e->callback);
142
143   if(e->mask & EVENTER_ASYNCH) {
144     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
145     eventer_add_asynch(NULL, e);
146     return;
147   }
148
149   /* Recurrent delegation */
150   if(e->mask & EVENTER_RECURRENT) {
151     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
152     eventer_add_recurrent(e);
153     return;
154   }
155
156   /* Timed events are simple */
157   if(e->mask & EVENTER_TIMER) {
158     noitL(eventer_deb, "debug: eventer_add timed (%s)\n", cbname ? cbname : "???");
159     pthread_mutex_lock(&te_lock);
160     noit_skiplist_insert(timed_events, e);
161     pthread_mutex_unlock(&te_lock);
162     return;
163   }
164
165   /* file descriptor event */
166   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
167   lockstate = acquire_master_fd(e->fd);
168   master_fds[e->fd].e = e;
169   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
170     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
171   if(e->mask & (EVENTER_WRITE))
172     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
173   release_master_fd(e->fd, lockstate);
174 }
175 static eventer_t eventer_kqueue_impl_remove(eventer_t e) {
176   eventer_t removed = NULL;
177   if(e->mask & EVENTER_ASYNCH) {
178     abort();
179   }
180   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
181     ev_lock_state_t lockstate;
182     lockstate = acquire_master_fd(e->fd);
183     if(e == master_fds[e->fd].e) {
184       removed = e;
185       master_fds[e->fd].e = NULL;
186       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
187         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
188       if(e->mask & (EVENTER_WRITE))
189         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
190     }
191     release_master_fd(e->fd, lockstate);
192   }
193   else if(e->mask & EVENTER_TIMER) {
194     pthread_mutex_lock(&te_lock);
195     if(noit_skiplist_remove_compare(timed_events, e, NULL,
196                                     noit_compare_voidptr))
197       removed = e;
198     pthread_mutex_unlock(&te_lock);
199   }
200   else if(e->mask & EVENTER_RECURRENT) {
201     removed = eventer_remove_recurrent(e);
202   }
203   else {
204     abort();
205   }
206   return removed;
207 }
208 static void eventer_kqueue_impl_update(eventer_t e, int mask) {
209   if(e->mask & EVENTER_TIMER) {
210     assert(mask & EVENTER_TIMER);
211     pthread_mutex_lock(&te_lock);
212     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
213     noit_skiplist_insert(timed_events, e);
214     pthread_mutex_unlock(&te_lock);
215     return;
216   }
217   /* Disable old, if they aren't active in the new */
218   if((e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
219      !(mask & (EVENTER_READ | EVENTER_EXCEPTION)))
220     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
221   if((e->mask & (EVENTER_WRITE)) &&
222      !(mask & (EVENTER_WRITE)))
223     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
224
225   /* Enable new, if the weren't in the old */
226   if((mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
227      !(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)))
228     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
229   if((mask & (EVENTER_WRITE)) &&
230      !(e->mask & (EVENTER_WRITE)))
231     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
232
233   /* Switch */
234   e->mask = mask;
235 }
236 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
237   eventer_t eiq = NULL;
238   ev_lock_state_t lockstate;
239   if(master_fds[fd].e) {
240     lockstate = acquire_master_fd(fd);
241     eiq = master_fds[fd].e;
242     master_fds[fd].e = NULL;
243     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
244       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
245     if(eiq->mask & (EVENTER_WRITE))
246       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
247     release_master_fd(fd, lockstate);
248   }
249   return eiq;
250 }
251 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
252   return master_fds[fd].e;
253 }
254 static int eventer_kqueue_impl_loop() {
255   int is_master_thread = 0;
256   pthread_t self;
257   KQUEUE_DECL;
258   KQUEUE_SETUP;
259
260   self = pthread_self();
261   if(pthread_equal(self, master_thread)) is_master_thread = 1;
262
263   if(!kqs) {
264     kqs = calloc(1, sizeof(*kqs));
265     kqs_init(kqs);
266   }
267   pthread_setspecific(kqueue_setup_key, kqs);
268   while(1) {
269     const char *cbname;
270     struct timeval __now, __sleeptime;
271     struct timespec __kqueue_sleeptime;
272     int fd_cnt = 0;
273     int max_timed_events_to_process;
274     int newmask;
275
276     __sleeptime = __max_sleeptime;
277
278     /* Handle timed events...
279      * we could be multithreaded, so if we pop forever we could starve
280      * ourselves. */
281     max_timed_events_to_process = timed_events->size;
282     while(max_timed_events_to_process-- > 0) {
283       eventer_t timed_event;
284
285       gettimeofday(&__now, NULL);
286
287       pthread_mutex_lock(&te_lock);
288       /* Peek at our next timed event, if should fire, pop it.
289        * otherwise we noop and NULL it out to break the loop. */
290       timed_event = noit_skiplist_peek(timed_events);
291       if(timed_event) {
292         if(compare_timeval(timed_event->whence, __now) < 0) {
293           timed_event = noit_skiplist_pop(timed_events, NULL);
294         }
295         else {
296           sub_timeval(timed_event->whence, __now, &__sleeptime);
297           timed_event = NULL;
298         }
299       }
300       pthread_mutex_unlock(&te_lock);
301       if(timed_event == NULL) break;
302
303       cbname = eventer_name_for_callback(timed_event->callback);
304       noitLT(eventer_deb, &__now, "debug: timed dispatch(%s)\n", cbname ? cbname : "???");
305       /* Make our call */
306       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
307                                       timed_event->closure, &__now);
308       if(newmask)
309         eventer_add(timed_event);
310       else
311         eventer_free(timed_event);
312     }
313
314     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
315       /* we exceed our configured maximum, set it down */
316       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
317     }
318
319     /* Handle recurrent events */
320     eventer_dispatch_recurrent(&__now);
321
322     /* If we're the master, we need to lock the master_kqs and make mods */
323     if(master_kqs->__ke_vec_used) {
324       struct timespec __zerotime = { 0, 0 };
325       pthread_mutex_lock(&kqs_lock);
326       fd_cnt = kevent(kqueue_fd,
327                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
328                       NULL, 0,
329                       &__zerotime);
330       noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
331       if(fd_cnt < 0) {
332         noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
333       }
334       master_kqs->__ke_vec_used = 0;
335       pthread_mutex_unlock(&kqs_lock);
336     }
337
338     /* Now we move on to our fd-based events */
339     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
340     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
341     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
342                     ke_vec, ke_vec_a,
343                     &__kqueue_sleeptime);
344     noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
345     ke_vec_used = 0;
346     if(fd_cnt < 0) {
347       noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
348     }
349     else {
350       int idx;
351       /* loop once to clear */
352       for(idx = 0; idx < fd_cnt; idx++) {
353         struct kevent *ke;
354         ke = &ke_vec[idx];
355         if(ke->flags & EV_ERROR) continue;
356         masks[ke->ident] = 0;
357       }
358       /* Loop again to aggregate */
359       for(idx = 0; idx < fd_cnt; idx++) {
360         struct kevent *ke;
361         ke = &ke_vec[idx];
362         if(ke->flags & EV_ERROR) continue;
363         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
364         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
365       }
366       /* Loop a last time to process */
367       for(idx = 0; idx < fd_cnt; idx++) {
368         ev_lock_state_t lockstate;
369         struct kevent *ke;
370         eventer_t e;
371         int fd, oldmask;
372
373         ke = &ke_vec[idx];
374         if(ke->flags & EV_ERROR) {
375           if(ke->data != EBADF)
376             noitLT(eventer_err, &__now, "error: %s\n", strerror(ke->data));
377           continue;
378         }
379         e = (eventer_t)ke->udata;
380         fd = ke->ident;
381         /* If we've seen this fd, don't callback twice */
382         if(!masks[fd]) continue;
383         /* It's possible that someone removed the event and freed it
384          * before we got here.
385          */
386         if(e != master_fds[fd].e) continue;
387         lockstate = acquire_master_fd(fd);
388         assert(lockstate == EV_OWNED);
389
390         gettimeofday(&__now, NULL);
391         oldmask = e->mask;
392         cbname = eventer_name_for_callback(e->callback);
393         noitLT(eventer_deb, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
394                fd, masks[fd], cbname?cbname:"???", e->callback);
395         newmask = e->callback(e, masks[fd], e->closure, &__now);
396         masks[fd] = 0; /* indicates we've processed this fd */
397
398         if(newmask) {
399           /* toggle the read bits if needed */
400           if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
401             if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
402               ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
403           }
404           else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
405             ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
406  
407           /* toggle the write bits if needed */
408           if(newmask & EVENTER_WRITE) {
409             if(!(oldmask & EVENTER_WRITE))
410               ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
411           }
412           else if(oldmask & EVENTER_WRITE)
413               ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
414  
415           /* Set our mask */
416           e->mask = newmask;
417         }
418         else {
419           /*
420            * Long story long:
421            *  When integrating with a few external event systems, we find
422            *  it difficult to make their use of remove+add as an update
423            *  as it can be recurrent in a single handler call and you cannot
424            *  remove completely from the event system if you are going to
425            *  just update (otherwise the eventer_t in your call stack could
426            *  be stale).  What we do is perform a superficial remove, marking
427            *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
428            *  we already have an event, we just update the mask (as we
429            *  have not yet returned to the eventer's loop.
430            *  This leaves us in a tricky situation when a remove is called
431            *  and the add doesn't roll in, we return 0 (mask == 0) and hit
432            *  this spot.  We have intended to remove the event, but it still
433            *  resides at master_fds[fd].e -- even after we free it.
434            *  So, in the evnet that we return 0 and the event that
435            *  master_fds[fd].e == the event we're about to free... we NULL
436            *  it out.
437            */
438           if(master_fds[fd].e == e) master_fds[fd].e = NULL;
439           eventer_free(e);
440         }
441         release_master_fd(fd, lockstate);
442       }
443     }
444   }
445   /* NOTREACHED */
446   return 0;
447 }
448
449 struct _eventer_impl eventer_kqueue_impl = {
450   "kqueue",
451   eventer_kqueue_impl_init,
452   eventer_kqueue_impl_propset,
453   eventer_kqueue_impl_add,
454   eventer_kqueue_impl_remove,
455   eventer_kqueue_impl_update,
456   eventer_kqueue_impl_remove_fd,
457   eventer_kqueue_impl_find_fd,
458   eventer_kqueue_impl_loop
459 };
Note: See TracBrowser for help on using the browser.