root/src/eventer/eventer_kqueue_impl.c

Revision 7762d58b8665d52113441e017d9449404ba78d69, 15.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 months ago)

wakeup for Mac OS X

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_atomic.h"
36 #include "utils/noit_skiplist.h"
37 #include "utils/noit_log.h"
38 #include "dtrace_probes.h"
39
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <sys/event.h>
44 #include <pthread.h>
45 #include <assert.h>
46
47 struct _eventer_impl eventer_kqueue_impl;
48 #define LOCAL_EVENTER eventer_kqueue_impl
49 #define LOCAL_EVENTER_foreach_fdevent eventer_kqueue_impl_foreach_fdevent
50 #define LOCAL_EVENTER_foreach_timedevent eventer_kqueue_impl_foreach_timedevent
51 #define maxfds LOCAL_EVENTER.maxfds
52 #define master_fds LOCAL_EVENTER.master_fds
53
54 #include "eventer/eventer_impl_private.h"
55
56 static const struct timeval __dyna_increment = { 0, 10000 }; /* 10 ms */
57 static int kqueue_fd = -1;
58 typedef struct kqueue_setup {
59   struct kevent *__ke_vec;
60   unsigned int __ke_vec_a;
61   unsigned int __ke_vec_used;
62 } *kqs_t;
63
64 static pthread_mutex_t kqs_lock;
65 static kqs_t master_kqs = NULL;
66 static pthread_key_t kqueue_setup_key;
67 static int *masks;
68 #define KQUEUE_DECL kqs_t kqs
69 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
70 #define ke_vec kqs->__ke_vec
71 #define ke_vec_a kqs->__ke_vec_a
72 #define ke_vec_used kqs->__ke_vec_used
73
74 static void kqs_init(kqs_t kqs) {
75   enum { initial_alloc = 64 };
76   ke_vec_a = initial_alloc;
77   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
78 }
79 static void
80 ke_change (register int const ident,
81            register int const filter,
82            register int const flags,
83            register eventer_t e) {
84   register struct kevent *kep;
85   KQUEUE_DECL;
86   KQUEUE_SETUP;
87   if(!kqs) kqs = master_kqs;
88
89   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
90   if (!ke_vec_a) {
91     kqs_init(kqs);
92   }
93   else if (ke_vec_used == ke_vec_a) {
94     ke_vec_a <<= 1;
95     ke_vec = (struct kevent *) realloc(ke_vec,
96                                        ke_vec_a * sizeof (struct kevent));
97   }
98   kep = &ke_vec[ke_vec_used++];
99
100   EV_SET(kep, ident, filter, flags, 0, 0, (void *)(vpsized_int)e->fd);
101   noitL(eventer_deb, "debug: ke_change(fd:%d, filt:%x, flags:%x)\n",
102         ident, filter, flags);
103   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
104 }
105
106 static int eventer_kqueue_impl_register_wakeup() {
107   struct kevent kev;
108   EV_SET(&kev, 0, EVFILT_USER, EV_ADD|EV_ONESHOT, NOTE_FFNOP, 0, NULL);
109   noitL(noit_debug, "wakeup... reregister\n");
110   return kevent(kqueue_fd, &kev, 1, NULL, 0, NULL);
111 }
112
113 static int eventer_kqueue_impl_init() {
114   struct rlimit rlim;
115   int rv;
116
117   /* super init */
118   if((rv = eventer_impl_init()) != 0) return rv;
119
120   signal(SIGPIPE, SIG_IGN);
121   kqueue_fd = kqueue();
122   if(kqueue_fd == -1) {
123     return -1;
124   }
125
126         if(eventer_kqueue_impl_register_wakeup() == -1)
127     return -1;
128
129   pthread_mutex_init(&kqs_lock, NULL);
130   pthread_key_create(&kqueue_setup_key, NULL);
131   master_kqs = calloc(1, sizeof(*master_kqs));
132   kqs_init(master_kqs);
133   getrlimit(RLIMIT_NOFILE, &rlim);
134   maxfds = rlim.rlim_cur;
135   master_fds = calloc(maxfds, sizeof(*master_fds));
136   masks = calloc(maxfds, sizeof(*masks));
137   return 0;
138 }
139 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
140   if(eventer_impl_propset(key, value)) {
141     /* Do our kqueue local properties here */
142     return -1;
143   }
144   return 0;
145 }
146 static void eventer_kqueue_impl_add(eventer_t e) {
147   assert(e->mask);
148   ev_lock_state_t lockstate;
149   const char *cbname;
150   cbname = eventer_name_for_callback_e(e->callback, e);
151
152   if(e->mask & EVENTER_ASYNCH) {
153     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
154     eventer_add_asynch(NULL, e);
155     return;
156   }
157
158   /* Recurrent delegation */
159   if(e->mask & EVENTER_RECURRENT) {
160     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
161     eventer_add_recurrent(e);
162     return;
163   }
164
165   /* Timed events are simple */
166   if(e->mask & EVENTER_TIMER) {
167     eventer_add_timed(e);
168     return;
169   }
170
171   /* file descriptor event */
172   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
173   assert(e->whence.tv_sec == 0 && e->whence.tv_usec == 0);
174   lockstate = acquire_master_fd(e->fd);
175   master_fds[e->fd].e = e;
176   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
177     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
178   if(e->mask & (EVENTER_WRITE))
179     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
180   release_master_fd(e->fd, lockstate);
181 }
182 static eventer_t eventer_kqueue_impl_remove(eventer_t e) {
183   eventer_t removed = NULL;
184   if(e->mask & EVENTER_ASYNCH) {
185     abort();
186   }
187   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
188     ev_lock_state_t lockstate;
189     lockstate = acquire_master_fd(e->fd);
190     noitL(eventer_deb, "kqueue: remove(%d)\n", e->fd);
191     if(e == master_fds[e->fd].e) {
192       removed = e;
193       master_fds[e->fd].e = NULL;
194       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
195         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
196       if(e->mask & (EVENTER_WRITE))
197         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
198     } else
199       noitL(eventer_deb, "kqueue: remove(%d) failed.\n", e->fd);
200     release_master_fd(e->fd, lockstate);
201   }
202   else if(e->mask & EVENTER_TIMER) {
203     removed = eventer_remove_timed(e);
204   }
205   else if(e->mask & EVENTER_RECURRENT) {
206     removed = eventer_remove_recurrent(e);
207   }
208   else {
209     abort();
210   }
211   return removed;
212 }
213 static void eventer_kqueue_impl_update(eventer_t e, int mask) {
214   if(e->mask & EVENTER_TIMER) {
215     eventer_update_timed(e, mask);
216     return;
217   }
218   noitL(eventer_deb, "kqueue: update(%d, %x->%x)\n", e->fd, e->mask, mask);
219   /* Disable old, if they aren't active in the new */
220   if((e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
221      !(mask & (EVENTER_READ | EVENTER_EXCEPTION)))
222     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
223   if((e->mask & (EVENTER_WRITE)) &&
224      !(mask & (EVENTER_WRITE)))
225     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
226
227   /* Enable new, if the weren't in the old */
228   if((mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
229      !(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)))
230     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
231   if((mask & (EVENTER_WRITE)) &&
232      !(e->mask & (EVENTER_WRITE)))
233     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
234
235   /* Switch */
236   e->mask = mask;
237 }
238 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
239   eventer_t eiq = NULL;
240   ev_lock_state_t lockstate;
241   if(master_fds[fd].e) {
242     noitL(eventer_deb, "kqueue: remove_fd(%d)\n", fd);
243     lockstate = acquire_master_fd(fd);
244     eiq = master_fds[fd].e;
245     master_fds[fd].e = NULL;
246     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
247       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
248     if(eiq->mask & (EVENTER_WRITE))
249       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
250     release_master_fd(fd, lockstate);
251   }
252   return eiq;
253 }
254 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
255   return master_fds[fd].e;
256 }
257 static void eventer_kqueue_impl_trigger(eventer_t e, int mask) {
258   ev_lock_state_t lockstate;
259   struct timeval __now;
260   int oldmask, newmask;
261   const char *cbname;
262   int fd;
263
264   fd = e->fd;
265   if(e != master_fds[fd].e) return;
266   lockstate = acquire_master_fd(fd);
267   if(lockstate == EV_ALREADY_OWNED) return;
268   assert(lockstate == EV_OWNED);
269
270   gettimeofday(&__now, NULL);
271   /* We're going to lie to ourselves.  You'd think this should be:
272    * oldmask = e->mask;  However, we just fired with masks[fd], so
273    * kqueue is clearly looking for all of the events in masks[fd].
274    * So, we combine them "just to be safe."
275    */
276   oldmask = e->mask | masks[fd];
277   cbname = eventer_name_for_callback_e(e->callback, e);
278   noitLT(eventer_deb, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
279          fd, masks[fd], cbname?cbname:"???", e->callback);
280   EVENTER_CALLBACK_ENTRY((void *)e->callback, (char *)cbname, fd, e->mask, mask);
281   newmask = e->callback(e, mask, e->closure, &__now);
282   EVENTER_CALLBACK_RETURN((void *)e->callback, (char *)cbname, newmask);
283
284   if(newmask) {
285     /* toggle the read bits if needed */
286     if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
287       if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
288         ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
289     }
290     else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
291       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
292
293     /* toggle the write bits if needed */
294     if(newmask & EVENTER_WRITE) {
295       if(!(oldmask & EVENTER_WRITE))
296         ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
297     }
298     else if(oldmask & EVENTER_WRITE)
299       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
300
301     /* Set our mask */
302     e->mask = newmask;
303   }
304   else {
305     /*
306      * Long story long:
307      *  When integrating with a few external event systems, we find
308      *  it difficult to make their use of remove+add as an update
309      *  as it can be recurrent in a single handler call and you cannot
310      *  remove completely from the event system if you are going to
311      *  just update (otherwise the eventer_t in your call stack could
312      *  be stale).  What we do is perform a superficial remove, marking
313      *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
314      *  we already have an event, we just update the mask (as we
315      *  have not yet returned to the eventer's loop.
316      *  This leaves us in a tricky situation when a remove is called
317      *  and the add doesn't roll in, we return 0 (mask == 0) and hit
318      *  this spot.  We have intended to remove the event, but it still
319      *  resides at master_fds[fd].e -- even after we free it.
320      *  So, in the evnet that we return 0 and the event that
321      *  master_fds[fd].e == the event we're about to free... we NULL
322      *  it out.
323      */
324     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
325     eventer_free(e);
326   }
327   release_master_fd(fd, lockstate);
328 }
329 static int eventer_kqueue_impl_loop() {
330   struct timeval __dyna_sleep = { 0, 0 };
331   KQUEUE_DECL;
332   KQUEUE_SETUP;
333
334   if(!kqs) {
335     kqs = calloc(1, sizeof(*kqs));
336     kqs_init(kqs);
337   }
338   pthread_setspecific(kqueue_setup_key, kqs);
339   while(1) {
340     struct timeval __now, __sleeptime;
341     struct timespec __kqueue_sleeptime;
342     int fd_cnt = 0;
343
344     if(compare_timeval(eventer_max_sleeptime, __dyna_sleep) < 0)
345       __dyna_sleep = eventer_max_sleeptime;
346
347     __sleeptime = __dyna_sleep;
348
349     eventer_dispatch_timed(&__now, &__sleeptime);
350
351     if(compare_timeval(__sleeptime, __dyna_sleep) > 0)
352       __sleeptime = __dyna_sleep;
353
354     /* Handle recurrent events */
355     eventer_dispatch_recurrent(&__now);
356
357     /* If we're the master, we need to lock the master_kqs and make mods */
358     if(master_kqs->__ke_vec_used) {
359       struct timespec __zerotime = { 0, 0 };
360       pthread_mutex_lock(&kqs_lock);
361       fd_cnt = kevent(kqueue_fd,
362                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
363                       NULL, 0,
364                       &__zerotime);
365       noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
366       if(fd_cnt < 0) {
367         noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
368       }
369       master_kqs->__ke_vec_used = 0;
370       pthread_mutex_unlock(&kqs_lock);
371     }
372
373     /* Now we move on to our fd-based events */
374     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
375     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
376     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
377                     ke_vec, ke_vec_a,
378                     &__kqueue_sleeptime);
379     noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
380     ke_vec_used = 0;
381     if(fd_cnt < 0) {
382       noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
383     }
384     else if(fd_cnt == 0 ||
385             (fd_cnt == 1 && ke_vec[0].filter == EVFILT_USER)) {
386       /* timeout */
387       if(fd_cnt) eventer_kqueue_impl_register_wakeup();
388       add_timeval(__dyna_sleep, __dyna_increment, &__dyna_sleep);
389     }
390     else {
391       int idx;
392       __dyna_sleep.tv_sec = __dyna_sleep.tv_usec = 0; /* reset */
393       /* loop once to clear */
394       for(idx = 0; idx < fd_cnt; idx++) {
395         struct kevent *ke;
396         ke = &ke_vec[idx];
397         if(ke->flags & EV_ERROR) continue;
398         if(ke->filter == EVFILT_USER) {
399           eventer_kqueue_impl_register_wakeup();
400           continue;
401         }
402         masks[ke->ident] = 0;
403       }
404       /* Loop again to aggregate */
405       for(idx = 0; idx < fd_cnt; idx++) {
406         struct kevent *ke;
407         ke = &ke_vec[idx];
408         if(ke->flags & EV_ERROR) continue;
409         if(ke->filter == EVFILT_USER) continue;
410         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
411         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
412       }
413       /* Loop a last time to process */
414       for(idx = 0; idx < fd_cnt; idx++) {
415         struct kevent *ke;
416         eventer_t e;
417         int fd;
418
419         ke = &ke_vec[idx];
420         if(ke->filter == EVFILT_USER) continue;
421         if(ke->flags & EV_ERROR) {
422           if(ke->data != EBADF && ke->data != ENOENT)
423             noitLT(eventer_err, &__now, "error [%d]: %s\n",
424                    (int)ke->ident, strerror(ke->data));
425           continue;
426         }
427         assert((vpsized_int)ke->udata == (vpsized_int)ke->ident);
428         fd = ke->ident;
429         e = master_fds[fd].e;
430         /* If we've seen this fd, don't callback twice */
431         if(!masks[fd]) continue;
432         /* It's possible that someone removed the event and freed it
433          * before we got here.
434          */
435         if(e) eventer_kqueue_impl_trigger(e, masks[fd]);
436         masks[fd] = 0; /* indicates we've processed this fd */
437       }
438     }
439   }
440   /* NOTREACHED */
441   return 0;
442 }
443
444 void eventer_kqueue_impl_wakeup() {
445   struct kevent kev;
446         EV_SET(&kev, 0, EVFILT_USER, 0, NOTE_FFCOPY|NOTE_TRIGGER|0x1, 0, NULL);
447         kevent(kqueue_fd, &kev, 1, NULL, 0, NULL);
448 }
449
450 struct _eventer_impl eventer_kqueue_impl = {
451   "kqueue",
452   eventer_kqueue_impl_init,
453   eventer_kqueue_impl_propset,
454   eventer_kqueue_impl_add,
455   eventer_kqueue_impl_remove,
456   eventer_kqueue_impl_update,
457   eventer_kqueue_impl_remove_fd,
458   eventer_kqueue_impl_find_fd,
459   eventer_kqueue_impl_trigger,
460   eventer_kqueue_impl_loop,
461   eventer_kqueue_impl_foreach_fdevent,
462   eventer_kqueue_impl_wakeup,
463   { 0, 200000 },
464   0,
465   NULL
466 };
Note: See TracBrowser for help on using the browser.