root/src/eventer/eventer_kqueue_impl.c

Revision f870be02daa5f1ef34211f67b8489564f94b0a88, 14.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

fix types

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_atomic.h"
36 #include "utils/noit_skiplist.h"
37 #include "utils/noit_log.h"
38 #include "eventer/dtrace_probes.h"
39
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <sys/event.h>
44 #include <pthread.h>
45 #include <assert.h>
46
47 struct _eventer_impl eventer_kqueue_impl;
48 #define LOCAL_EVENTER eventer_kqueue_impl
49 #define LOCAL_EVENTER_foreach_fdevent eventer_kqueue_impl_foreach_fdevent
50 #define LOCAL_EVENTER_foreach_timedevent eventer_kqueue_impl_foreach_timedevent
51 #define maxfds LOCAL_EVENTER.maxfds
52 #define master_fds LOCAL_EVENTER.master_fds
53
54 #include "eventer/eventer_impl_private.h"
55
56 static const struct timeval __dyna_increment = { 0, 1000 }; /* 1 ms */
57 static int kqueue_fd = -1;
58 typedef struct kqueue_setup {
59   struct kevent *__ke_vec;
60   unsigned int __ke_vec_a;
61   unsigned int __ke_vec_used;
62 } *kqs_t;
63
64 static pthread_mutex_t kqs_lock;
65 static kqs_t master_kqs = NULL;
66 static pthread_key_t kqueue_setup_key;
67 static int *masks;
68 #define KQUEUE_DECL kqs_t kqs
69 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
70 #define ke_vec kqs->__ke_vec
71 #define ke_vec_a kqs->__ke_vec_a
72 #define ke_vec_used kqs->__ke_vec_used
73
74 static void kqs_init(kqs_t kqs) {
75   enum { initial_alloc = 64 };
76   ke_vec_a = initial_alloc;
77   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
78 }
79 static void
80 ke_change (register int const ident,
81            register int const filter,
82            register int const flags,
83            register eventer_t e) {
84   register struct kevent *kep;
85   KQUEUE_DECL;
86   KQUEUE_SETUP;
87   if(!kqs) kqs = master_kqs;
88
89   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
90   if (!ke_vec_a) {
91     kqs_init(kqs);
92   }
93   else if (ke_vec_used == ke_vec_a) {
94     ke_vec_a <<= 1;
95     ke_vec = (struct kevent *) realloc(ke_vec,
96                                        ke_vec_a * sizeof (struct kevent));
97   }
98   kep = &ke_vec[ke_vec_used++];
99
100   EV_SET(kep, ident, filter, flags, 0, 0, (void *)(vpsized_int)e->fd);
101   noitL(eventer_deb, "debug: ke_change(fd:%d, filt:%x, flags:%x)\n",
102         ident, filter, flags);
103   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
104 }
105
106 static int eventer_kqueue_impl_init() {
107   struct rlimit rlim;
108   int rv;
109
110   /* super init */
111   if((rv = eventer_impl_init()) != 0) return rv;
112
113   signal(SIGPIPE, SIG_IGN);
114   kqueue_fd = kqueue();
115   if(kqueue_fd == -1) {
116     return -1;
117   }
118   pthread_mutex_init(&kqs_lock, NULL);
119   pthread_key_create(&kqueue_setup_key, NULL);
120   master_kqs = calloc(1, sizeof(*master_kqs));
121   kqs_init(master_kqs);
122   getrlimit(RLIMIT_NOFILE, &rlim);
123   maxfds = rlim.rlim_cur;
124   master_fds = calloc(maxfds, sizeof(*master_fds));
125   masks = calloc(maxfds, sizeof(*masks));
126   return 0;
127 }
128 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
129   if(eventer_impl_propset(key, value)) {
130     /* Do our kqueue local properties here */
131     return -1;
132   }
133   return 0;
134 }
135 static void eventer_kqueue_impl_add(eventer_t e) {
136   assert(e->mask);
137   ev_lock_state_t lockstate;
138   const char *cbname;
139   cbname = eventer_name_for_callback(e->callback);
140
141   if(e->mask & EVENTER_ASYNCH) {
142     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
143     eventer_add_asynch(NULL, e);
144     return;
145   }
146
147   /* Recurrent delegation */
148   if(e->mask & EVENTER_RECURRENT) {
149     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
150     eventer_add_recurrent(e);
151     return;
152   }
153
154   /* Timed events are simple */
155   if(e->mask & EVENTER_TIMER) {
156     eventer_add_timed(e);
157     return;
158   }
159
160   /* file descriptor event */
161   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
162   assert(e->whence.tv_sec == 0 && e->whence.tv_usec == 0);
163   lockstate = acquire_master_fd(e->fd);
164   master_fds[e->fd].e = e;
165   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
166     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
167   if(e->mask & (EVENTER_WRITE))
168     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
169   release_master_fd(e->fd, lockstate);
170 }
171 static eventer_t eventer_kqueue_impl_remove(eventer_t e) {
172   eventer_t removed = NULL;
173   if(e->mask & EVENTER_ASYNCH) {
174     abort();
175   }
176   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
177     ev_lock_state_t lockstate;
178     lockstate = acquire_master_fd(e->fd);
179     noitL(eventer_deb, "kqueue: remove(%d)\n", e->fd);
180     if(e == master_fds[e->fd].e) {
181       removed = e;
182       master_fds[e->fd].e = NULL;
183       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
184         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
185       if(e->mask & (EVENTER_WRITE))
186         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
187     } else
188       noitL(eventer_deb, "kqueue: remove(%d) failed.\n", e->fd);
189     release_master_fd(e->fd, lockstate);
190   }
191   else if(e->mask & EVENTER_TIMER) {
192     removed = eventer_remove_timed(e);
193   }
194   else if(e->mask & EVENTER_RECURRENT) {
195     removed = eventer_remove_recurrent(e);
196   }
197   else {
198     abort();
199   }
200   return removed;
201 }
202 static void eventer_kqueue_impl_update(eventer_t e, int mask) {
203   if(e->mask & EVENTER_TIMER) {
204     eventer_update_timed(e, mask);
205     return;
206   }
207   noitL(eventer_deb, "kqueue: update(%d, %x->%x)\n", e->fd, e->mask, mask);
208   /* Disable old, if they aren't active in the new */
209   if((e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
210      !(mask & (EVENTER_READ | EVENTER_EXCEPTION)))
211     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
212   if((e->mask & (EVENTER_WRITE)) &&
213      !(mask & (EVENTER_WRITE)))
214     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
215
216   /* Enable new, if the weren't in the old */
217   if((mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
218      !(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)))
219     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
220   if((mask & (EVENTER_WRITE)) &&
221      !(e->mask & (EVENTER_WRITE)))
222     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
223
224   /* Switch */
225   e->mask = mask;
226 }
227 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
228   eventer_t eiq = NULL;
229   ev_lock_state_t lockstate;
230   if(master_fds[fd].e) {
231     noitL(eventer_deb, "kqueue: remove_fd(%d)\n", fd);
232     lockstate = acquire_master_fd(fd);
233     eiq = master_fds[fd].e;
234     master_fds[fd].e = NULL;
235     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
236       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
237     if(eiq->mask & (EVENTER_WRITE))
238       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
239     release_master_fd(fd, lockstate);
240   }
241   return eiq;
242 }
243 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
244   return master_fds[fd].e;
245 }
246 static void eventer_kqueue_impl_trigger(eventer_t e, int mask) {
247   ev_lock_state_t lockstate;
248   struct timeval __now;
249   int oldmask, newmask;
250   const char *cbname;
251   int fd;
252
253   fd = e->fd;
254   if(e != master_fds[fd].e) return;
255   lockstate = acquire_master_fd(fd);
256   if(lockstate == EV_ALREADY_OWNED) return;
257   assert(lockstate == EV_OWNED);
258
259   gettimeofday(&__now, NULL);
260   /* We're going to lie to ourselves.  You'd think this should be:
261    * oldmask = e->mask;  However, we just fired with masks[fd], so
262    * kqueue is clearly looking for all of the events in masks[fd].
263    * So, we combine them "just to be safe."
264    */
265   oldmask = e->mask | masks[fd];
266   cbname = eventer_name_for_callback(e->callback);
267   noitLT(eventer_deb, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
268          fd, masks[fd], cbname?cbname:"???", e->callback);
269   EVENTER_CALLBACK_ENTRY((void *)e->callback, (char *)cbname, fd, e->mask, mask);
270   newmask = e->callback(e, mask, e->closure, &__now);
271   EVENTER_CALLBACK_RETURN((void *)e->callback, (char *)cbname, newmask);
272
273   if(newmask) {
274     /* toggle the read bits if needed */
275     if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
276       if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
277         ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
278     }
279     else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
280       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
281
282     /* toggle the write bits if needed */
283     if(newmask & EVENTER_WRITE) {
284       if(!(oldmask & EVENTER_WRITE))
285         ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
286     }
287     else if(oldmask & EVENTER_WRITE)
288       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
289
290     /* Set our mask */
291     e->mask = newmask;
292   }
293   else {
294     /*
295      * Long story long:
296      *  When integrating with a few external event systems, we find
297      *  it difficult to make their use of remove+add as an update
298      *  as it can be recurrent in a single handler call and you cannot
299      *  remove completely from the event system if you are going to
300      *  just update (otherwise the eventer_t in your call stack could
301      *  be stale).  What we do is perform a superficial remove, marking
302      *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
303      *  we already have an event, we just update the mask (as we
304      *  have not yet returned to the eventer's loop.
305      *  This leaves us in a tricky situation when a remove is called
306      *  and the add doesn't roll in, we return 0 (mask == 0) and hit
307      *  this spot.  We have intended to remove the event, but it still
308      *  resides at master_fds[fd].e -- even after we free it.
309      *  So, in the evnet that we return 0 and the event that
310      *  master_fds[fd].e == the event we're about to free... we NULL
311      *  it out.
312      */
313     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
314     eventer_free(e);
315   }
316   release_master_fd(fd, lockstate);
317 }
318 static int eventer_kqueue_impl_loop() {
319   struct timeval __dyna_sleep = { 0, 0 };
320   KQUEUE_DECL;
321   KQUEUE_SETUP;
322
323   if(!kqs) {
324     kqs = calloc(1, sizeof(*kqs));
325     kqs_init(kqs);
326   }
327   pthread_setspecific(kqueue_setup_key, kqs);
328   while(1) {
329     struct timeval __now, __sleeptime;
330     struct timespec __kqueue_sleeptime;
331     int fd_cnt = 0;
332
333     if(compare_timeval(eventer_max_sleeptime, __dyna_sleep) < 0)
334       __dyna_sleep = eventer_max_sleeptime;
335
336     __sleeptime = __dyna_sleep;
337
338     eventer_dispatch_timed(&__now, &__sleeptime);
339
340     if(compare_timeval(__sleeptime, __dyna_sleep) > 0)
341       __sleeptime = __dyna_sleep;
342
343     /* Handle recurrent events */
344     eventer_dispatch_recurrent(&__now);
345
346     /* If we're the master, we need to lock the master_kqs and make mods */
347     if(master_kqs->__ke_vec_used) {
348       struct timespec __zerotime = { 0, 0 };
349       pthread_mutex_lock(&kqs_lock);
350       fd_cnt = kevent(kqueue_fd,
351                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
352                       NULL, 0,
353                       &__zerotime);
354       noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
355       if(fd_cnt < 0) {
356         noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
357       }
358       master_kqs->__ke_vec_used = 0;
359       pthread_mutex_unlock(&kqs_lock);
360     }
361
362     /* Now we move on to our fd-based events */
363     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
364     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
365     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
366                     ke_vec, ke_vec_a,
367                     &__kqueue_sleeptime);
368     noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
369     ke_vec_used = 0;
370     if(fd_cnt < 0) {
371       noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
372     }
373     else if(fd_cnt == 0) {
374       /* timeout */
375       add_timeval(__dyna_sleep, __dyna_increment, &__dyna_sleep);
376     }
377     else {
378       int idx;
379       __dyna_sleep.tv_sec = __dyna_sleep.tv_usec = 0; /* reset */
380       /* loop once to clear */
381       for(idx = 0; idx < fd_cnt; idx++) {
382         struct kevent *ke;
383         ke = &ke_vec[idx];
384         if(ke->flags & EV_ERROR) continue;
385         masks[ke->ident] = 0;
386       }
387       /* Loop again to aggregate */
388       for(idx = 0; idx < fd_cnt; idx++) {
389         struct kevent *ke;
390         ke = &ke_vec[idx];
391         if(ke->flags & EV_ERROR) continue;
392         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
393         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
394       }
395       /* Loop a last time to process */
396       for(idx = 0; idx < fd_cnt; idx++) {
397         struct kevent *ke;
398         eventer_t e;
399         int fd;
400
401         ke = &ke_vec[idx];
402         if(ke->flags & EV_ERROR) {
403           if(ke->data != EBADF && ke->data != ENOENT)
404             noitLT(eventer_err, &__now, "error [%d]: %s\n",
405                    (int)ke->ident, strerror(ke->data));
406           continue;
407         }
408         assert((vpsized_int)ke->udata == (vpsized_int)ke->ident);
409         fd = ke->ident;
410         e = master_fds[fd].e;
411         /* If we've seen this fd, don't callback twice */
412         if(!masks[fd]) continue;
413         /* It's possible that someone removed the event and freed it
414          * before we got here.
415          */
416         if(e) eventer_kqueue_impl_trigger(e, masks[fd]);
417         masks[fd] = 0; /* indicates we've processed this fd */
418       }
419     }
420   }
421   /* NOTREACHED */
422   return 0;
423 }
424
425 struct _eventer_impl eventer_kqueue_impl = {
426   "kqueue",
427   eventer_kqueue_impl_init,
428   eventer_kqueue_impl_propset,
429   eventer_kqueue_impl_add,
430   eventer_kqueue_impl_remove,
431   eventer_kqueue_impl_update,
432   eventer_kqueue_impl_remove_fd,
433   eventer_kqueue_impl_find_fd,
434   eventer_kqueue_impl_trigger,
435   eventer_kqueue_impl_loop,
436   eventer_kqueue_impl_foreach_fdevent,
437   { 0, 200000 },
438   0,
439   NULL
440 };
Note: See TracBrowser for help on using the browser.