root/src/eventer/eventer_kqueue_impl.c

Revision 3a48d08926ff8a3ea95003402260d96f360cc064, 15.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 weeks ago)

update eventer callback probe.
Cover all callsites and pass eventer_t as first arg.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * Copyright (c) 2014, Circonus, Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "noit_defines.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_atomic.h"
37 #include "utils/noit_skiplist.h"
38 #include "utils/noit_log.h"
39 #include "dtrace_probes.h"
40
41 #include <errno.h>
42 #include <stdio.h>
43 #include <stdlib.h>
44 #include <sys/event.h>
45 #include <pthread.h>
46 #include <assert.h>
47
48 struct _eventer_impl eventer_kqueue_impl;
49 #define LOCAL_EVENTER eventer_kqueue_impl
50 #define LOCAL_EVENTER_foreach_fdevent eventer_kqueue_impl_foreach_fdevent
51 #define LOCAL_EVENTER_foreach_timedevent eventer_kqueue_impl_foreach_timedevent
52 #define maxfds LOCAL_EVENTER.maxfds
53 #define master_fds LOCAL_EVENTER.master_fds
54
55 #include "eventer/eventer_impl_private.h"
56
57 static const struct timeval __dyna_increment = { 0, 10000 }; /* 10 ms */
58 static int kqueue_fd = -1;
59 typedef struct kqueue_setup {
60   struct kevent *__ke_vec;
61   unsigned int __ke_vec_a;
62   unsigned int __ke_vec_used;
63 } *kqs_t;
64
65 static pthread_mutex_t kqs_lock;
66 static kqs_t master_kqs = NULL;
67 static pthread_key_t kqueue_setup_key;
68 static int *masks;
69 #define KQUEUE_DECL kqs_t kqs
70 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
71 #define ke_vec kqs->__ke_vec
72 #define ke_vec_a kqs->__ke_vec_a
73 #define ke_vec_used kqs->__ke_vec_used
74
75 static void kqs_init(kqs_t kqs) {
76   enum { initial_alloc = 64 };
77   ke_vec_a = initial_alloc;
78   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
79 }
80 static void
81 ke_change (register int const ident,
82            register int const filter,
83            register int const flags,
84            register eventer_t e) {
85   register struct kevent *kep;
86   KQUEUE_DECL;
87   KQUEUE_SETUP;
88   if(!kqs) kqs = master_kqs;
89
90   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
91   if (!ke_vec_a) {
92     kqs_init(kqs);
93   }
94   else if (ke_vec_used == ke_vec_a) {
95     ke_vec_a <<= 1;
96     ke_vec = (struct kevent *) realloc(ke_vec,
97                                        ke_vec_a * sizeof (struct kevent));
98   }
99   kep = &ke_vec[ke_vec_used++];
100
101   EV_SET(kep, ident, filter, flags, 0, 0, (void *)(vpsized_int)e->fd);
102   noitL(eventer_deb, "debug: ke_change(fd:%d, filt:%x, flags:%x)\n",
103         ident, filter, flags);
104   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
105 }
106
107 static int eventer_kqueue_impl_register_wakeup() {
108   struct kevent kev;
109   EV_SET(&kev, 0, EVFILT_USER, EV_ADD|EV_ONESHOT, NOTE_FFNOP, 0, NULL);
110   noitL(noit_debug, "wakeup... reregister\n");
111   return kevent(kqueue_fd, &kev, 1, NULL, 0, NULL);
112 }
113
114 static int eventer_kqueue_impl_init() {
115   struct rlimit rlim;
116   int rv;
117
118   /* super init */
119   if((rv = eventer_impl_init()) != 0) return rv;
120
121   signal(SIGPIPE, SIG_IGN);
122   kqueue_fd = kqueue();
123   if(kqueue_fd == -1) {
124     return -1;
125   }
126
127         if(eventer_kqueue_impl_register_wakeup() == -1)
128     return -1;
129
130   pthread_mutex_init(&kqs_lock, NULL);
131   pthread_key_create(&kqueue_setup_key, NULL);
132   master_kqs = calloc(1, sizeof(*master_kqs));
133   kqs_init(master_kqs);
134   getrlimit(RLIMIT_NOFILE, &rlim);
135   maxfds = rlim.rlim_cur;
136   master_fds = calloc(maxfds, sizeof(*master_fds));
137   masks = calloc(maxfds, sizeof(*masks));
138   return 0;
139 }
140 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
141   if(eventer_impl_propset(key, value)) {
142     /* Do our kqueue local properties here */
143     return -1;
144   }
145   return 0;
146 }
147 static void eventer_kqueue_impl_add(eventer_t e) {
148   assert(e->mask);
149   ev_lock_state_t lockstate;
150   const char *cbname;
151   cbname = eventer_name_for_callback_e(e->callback, e);
152
153   if(e->mask & EVENTER_ASYNCH) {
154     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
155     eventer_add_asynch(NULL, e);
156     return;
157   }
158
159   /* Recurrent delegation */
160   if(e->mask & EVENTER_RECURRENT) {
161     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
162     eventer_add_recurrent(e);
163     return;
164   }
165
166   /* Timed events are simple */
167   if(e->mask & EVENTER_TIMER) {
168     eventer_add_timed(e);
169     return;
170   }
171
172   /* file descriptor event */
173   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
174   assert(e->whence.tv_sec == 0 && e->whence.tv_usec == 0);
175   lockstate = acquire_master_fd(e->fd);
176   master_fds[e->fd].e = e;
177   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
178     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
179   if(e->mask & (EVENTER_WRITE))
180     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
181   release_master_fd(e->fd, lockstate);
182 }
183 static eventer_t eventer_kqueue_impl_remove(eventer_t e) {
184   eventer_t removed = NULL;
185   if(e->mask & EVENTER_ASYNCH) {
186     abort();
187   }
188   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
189     ev_lock_state_t lockstate;
190     lockstate = acquire_master_fd(e->fd);
191     noitL(eventer_deb, "kqueue: remove(%d)\n", e->fd);
192     if(e == master_fds[e->fd].e) {
193       removed = e;
194       master_fds[e->fd].e = NULL;
195       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
196         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
197       if(e->mask & (EVENTER_WRITE))
198         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
199     } else
200       noitL(eventer_deb, "kqueue: remove(%d) failed.\n", e->fd);
201     release_master_fd(e->fd, lockstate);
202   }
203   else if(e->mask & EVENTER_TIMER) {
204     removed = eventer_remove_timed(e);
205   }
206   else if(e->mask & EVENTER_RECURRENT) {
207     removed = eventer_remove_recurrent(e);
208   }
209   else {
210     abort();
211   }
212   return removed;
213 }
214 static void eventer_kqueue_impl_update(eventer_t e, int mask) {
215   if(e->mask & EVENTER_TIMER) {
216     eventer_update_timed(e, mask);
217     return;
218   }
219   noitL(eventer_deb, "kqueue: update(%d, %x->%x)\n", e->fd, e->mask, mask);
220   /* Disable old, if they aren't active in the new */
221   if((e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
222      !(mask & (EVENTER_READ | EVENTER_EXCEPTION)))
223     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
224   if((e->mask & (EVENTER_WRITE)) &&
225      !(mask & (EVENTER_WRITE)))
226     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
227
228   /* Enable new, if the weren't in the old */
229   if((mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
230      !(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)))
231     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
232   if((mask & (EVENTER_WRITE)) &&
233      !(e->mask & (EVENTER_WRITE)))
234     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
235
236   /* Switch */
237   e->mask = mask;
238 }
239 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
240   eventer_t eiq = NULL;
241   ev_lock_state_t lockstate;
242   if(master_fds[fd].e) {
243     noitL(eventer_deb, "kqueue: remove_fd(%d)\n", fd);
244     lockstate = acquire_master_fd(fd);
245     eiq = master_fds[fd].e;
246     master_fds[fd].e = NULL;
247     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
248       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
249     if(eiq->mask & (EVENTER_WRITE))
250       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
251     release_master_fd(fd, lockstate);
252   }
253   return eiq;
254 }
255 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
256   return master_fds[fd].e;
257 }
258 static void eventer_kqueue_impl_trigger(eventer_t e, int mask) {
259   ev_lock_state_t lockstate;
260   struct timeval __now;
261   int oldmask, newmask;
262   const char *cbname;
263   int fd;
264
265   fd = e->fd;
266   if(e != master_fds[fd].e) return;
267   lockstate = acquire_master_fd(fd);
268   if(lockstate == EV_ALREADY_OWNED) return;
269   assert(lockstate == EV_OWNED);
270
271   gettimeofday(&__now, NULL);
272   /* We're going to lie to ourselves.  You'd think this should be:
273    * oldmask = e->mask;  However, we just fired with masks[fd], so
274    * kqueue is clearly looking for all of the events in masks[fd].
275    * So, we combine them "just to be safe."
276    */
277   oldmask = e->mask | masks[fd];
278   cbname = eventer_name_for_callback_e(e->callback, e);
279   noitLT(eventer_deb, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
280          fd, masks[fd], cbname?cbname:"???", e->callback);
281   EVENTER_CALLBACK_ENTRY((void *)e, (void *)e->callback, (char *)cbname, fd, e->mask, mask);
282   newmask = e->callback(e, mask, e->closure, &__now);
283   EVENTER_CALLBACK_RETURN((void *)e, (void *)e->callback, (char *)cbname, newmask);
284
285   if(newmask) {
286     /* toggle the read bits if needed */
287     if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
288       if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
289         ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
290     }
291     else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
292       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
293
294     /* toggle the write bits if needed */
295     if(newmask & EVENTER_WRITE) {
296       if(!(oldmask & EVENTER_WRITE))
297         ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
298     }
299     else if(oldmask & EVENTER_WRITE)
300       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
301
302     /* Set our mask */
303     e->mask = newmask;
304   }
305   else {
306     /*
307      * Long story long:
308      *  When integrating with a few external event systems, we find
309      *  it difficult to make their use of remove+add as an update
310      *  as it can be recurrent in a single handler call and you cannot
311      *  remove completely from the event system if you are going to
312      *  just update (otherwise the eventer_t in your call stack could
313      *  be stale).  What we do is perform a superficial remove, marking
314      *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
315      *  we already have an event, we just update the mask (as we
316      *  have not yet returned to the eventer's loop.
317      *  This leaves us in a tricky situation when a remove is called
318      *  and the add doesn't roll in, we return 0 (mask == 0) and hit
319      *  this spot.  We have intended to remove the event, but it still
320      *  resides at master_fds[fd].e -- even after we free it.
321      *  So, in the evnet that we return 0 and the event that
322      *  master_fds[fd].e == the event we're about to free... we NULL
323      *  it out.
324      */
325     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
326     eventer_free(e);
327   }
328   release_master_fd(fd, lockstate);
329 }
330 static int eventer_kqueue_impl_loop() {
331   struct timeval __dyna_sleep = { 0, 0 };
332   KQUEUE_DECL;
333   KQUEUE_SETUP;
334
335   if(!kqs) {
336     kqs = calloc(1, sizeof(*kqs));
337     kqs_init(kqs);
338   }
339   pthread_setspecific(kqueue_setup_key, kqs);
340   while(1) {
341     struct timeval __now, __sleeptime;
342     struct timespec __kqueue_sleeptime;
343     int fd_cnt = 0;
344
345     if(compare_timeval(eventer_max_sleeptime, __dyna_sleep) < 0)
346       __dyna_sleep = eventer_max_sleeptime;
347
348     __sleeptime = __dyna_sleep;
349
350     eventer_dispatch_timed(&__now, &__sleeptime);
351
352     if(compare_timeval(__sleeptime, __dyna_sleep) > 0)
353       __sleeptime = __dyna_sleep;
354
355     /* Handle recurrent events */
356     eventer_dispatch_recurrent(&__now);
357
358     /* If we're the master, we need to lock the master_kqs and make mods */
359     if(master_kqs->__ke_vec_used) {
360       struct timespec __zerotime = { 0, 0 };
361       pthread_mutex_lock(&kqs_lock);
362       fd_cnt = kevent(kqueue_fd,
363                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
364                       NULL, 0,
365                       &__zerotime);
366       noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
367       if(fd_cnt < 0) {
368         noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
369       }
370       master_kqs->__ke_vec_used = 0;
371       pthread_mutex_unlock(&kqs_lock);
372     }
373
374     /* Now we move on to our fd-based events */
375     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
376     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
377     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
378                     ke_vec, ke_vec_a,
379                     &__kqueue_sleeptime);
380     noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
381     ke_vec_used = 0;
382     if(fd_cnt < 0) {
383       noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
384     }
385     else if(fd_cnt == 0 ||
386             (fd_cnt == 1 && ke_vec[0].filter == EVFILT_USER)) {
387       /* timeout */
388       if(fd_cnt) eventer_kqueue_impl_register_wakeup();
389       add_timeval(__dyna_sleep, __dyna_increment, &__dyna_sleep);
390     }
391     else {
392       int idx;
393       __dyna_sleep.tv_sec = __dyna_sleep.tv_usec = 0; /* reset */
394       /* loop once to clear */
395       for(idx = 0; idx < fd_cnt; idx++) {
396         struct kevent *ke;
397         ke = &ke_vec[idx];
398         if(ke->flags & EV_ERROR) continue;
399         if(ke->filter == EVFILT_USER) {
400           eventer_kqueue_impl_register_wakeup();
401           continue;
402         }
403         masks[ke->ident] = 0;
404       }
405       /* Loop again to aggregate */
406       for(idx = 0; idx < fd_cnt; idx++) {
407         struct kevent *ke;
408         ke = &ke_vec[idx];
409         if(ke->flags & EV_ERROR) continue;
410         if(ke->filter == EVFILT_USER) continue;
411         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
412         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
413       }
414       /* Loop a last time to process */
415       for(idx = 0; idx < fd_cnt; idx++) {
416         struct kevent *ke;
417         eventer_t e;
418         int fd;
419
420         ke = &ke_vec[idx];
421         if(ke->filter == EVFILT_USER) continue;
422         if(ke->flags & EV_ERROR) {
423           if(ke->data != EBADF && ke->data != ENOENT)
424             noitLT(eventer_err, &__now, "error [%d]: %s\n",
425                    (int)ke->ident, strerror(ke->data));
426           continue;
427         }
428         assert((vpsized_int)ke->udata == (vpsized_int)ke->ident);
429         fd = ke->ident;
430         e = master_fds[fd].e;
431         /* If we've seen this fd, don't callback twice */
432         if(!masks[fd]) continue;
433         /* It's possible that someone removed the event and freed it
434          * before we got here.
435          */
436         if(e) eventer_kqueue_impl_trigger(e, masks[fd]);
437         masks[fd] = 0; /* indicates we've processed this fd */
438       }
439     }
440   }
441   /* NOTREACHED */
442   return 0;
443 }
444
445 void eventer_kqueue_impl_wakeup() {
446   struct kevent kev;
447         EV_SET(&kev, 0, EVFILT_USER, 0, NOTE_FFCOPY|NOTE_TRIGGER|0x1, 0, NULL);
448         kevent(kqueue_fd, &kev, 1, NULL, 0, NULL);
449 }
450
451 struct _eventer_impl eventer_kqueue_impl = {
452   "kqueue",
453   eventer_kqueue_impl_init,
454   eventer_kqueue_impl_propset,
455   eventer_kqueue_impl_add,
456   eventer_kqueue_impl_remove,
457   eventer_kqueue_impl_update,
458   eventer_kqueue_impl_remove_fd,
459   eventer_kqueue_impl_find_fd,
460   eventer_kqueue_impl_trigger,
461   eventer_kqueue_impl_loop,
462   eventer_kqueue_impl_foreach_fdevent,
463   eventer_kqueue_impl_wakeup,
464   { 0, 200000 },
465   0,
466   NULL
467 };
Note: See TracBrowser for help on using the browser.