root/src/eventer/eventer_kqueue_impl.c

Revision a3118173f47a3680d2178910858e8c703879a53e, 16.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 1 week ago)

Move the rlim setting before eventer_impl_init so that maxfds
is set and we can allocate our arrays. This is important because
we spin up loops for concurrent eventer threads *in* impl_init
and maxfds will be 0 for a while. This was causing spurious
'epoll_wait: Invalid argument' on Linux.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * Copyright (c) 2014, Circonus, Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "noit_defines.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_atomic.h"
37 #include "utils/noit_skiplist.h"
38 #include "utils/noit_memory.h"
39 #include "utils/noit_log.h"
40 #include "dtrace_probes.h"
41
42 #include <errno.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <sys/event.h>
46 #include <pthread.h>
47 #include <assert.h>
48
49 struct _eventer_impl eventer_kqueue_impl;
50 #define LOCAL_EVENTER eventer_kqueue_impl
51 #define LOCAL_EVENTER_foreach_fdevent eventer_kqueue_impl_foreach_fdevent
52 #define LOCAL_EVENTER_foreach_timedevent eventer_kqueue_impl_foreach_timedevent
53 #define maxfds LOCAL_EVENTER.maxfds
54 #define master_fds LOCAL_EVENTER.master_fds
55
56 #include "eventer/eventer_impl_private.h"
57
58 static const struct timeval __dyna_increment = { 0, 10000 }; /* 10 ms */
59 typedef struct kqueue_spec {
60   int kqueue_fd;
61   noit_spinlock_t wakeup_notify;
62   pthread_mutex_t lock;
63   struct {
64     struct kevent *__ke_vec;
65     unsigned int __ke_vec_a;
66     unsigned int __ke_vec_used;
67   } q;
68 } *kqs_t;
69
70 static int *masks;
71 #define KQUEUE_DECL kqs_t kqs
72 #define KQUEUE_SETUP(e) kqs = (kqs_t) eventer_get_spec_for_event(e)
73 #define ke_vec kqs->q.__ke_vec
74 #define ke_vec_a kqs->q.__ke_vec_a
75 #define ke_vec_used kqs->q.__ke_vec_used
76
77 static void kqs_init(kqs_t kqs) {
78   enum { initial_alloc = 64 };
79   ke_vec_a = initial_alloc;
80   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
81 }
82 static void
83 ke_change (register int const ident,
84            register int const filter,
85            register int const flags,
86            register eventer_t e) {
87   register struct kevent *kep;
88   KQUEUE_DECL;
89   KQUEUE_SETUP(e);
90
91   pthread_mutex_lock(&kqs->lock);
92   if (!ke_vec_a) {
93     kqs_init(kqs);
94   }
95   else if (ke_vec_used == ke_vec_a) {
96     ke_vec_a <<= 1;
97     ke_vec = (struct kevent *) realloc(ke_vec,
98                                        ke_vec_a * sizeof (struct kevent));
99   }
100   kep = &ke_vec[ke_vec_used++];
101
102   EV_SET(kep, ident, filter, flags, 0, 0, (void *)(vpsized_int)e->fd);
103   noitL(eventer_deb, "debug: ke_change(fd:%d, filt:%x, flags:%x)\n",
104         ident, filter, flags);
105   pthread_mutex_unlock(&kqs->lock);
106 }
107
108 static void eventer_kqueue_impl_wakeup_spec(struct kqueue_spec *spec) {
109   struct kevent kev;
110         EV_SET(&kev, 0, EVFILT_USER, 0, NOTE_FFCOPY|NOTE_TRIGGER|0x1, 0, NULL);
111         kevent(spec->kqueue_fd, &kev, 1, NULL, 0, NULL);
112 }
113
114 static int eventer_kqueue_impl_register_wakeup(struct kqueue_spec *spec) {
115   struct kevent kev;
116   EV_SET(&kev, 0, EVFILT_USER, EV_ADD|EV_ONESHOT, NOTE_FFNOP, 0, NULL);
117   noitL(eventer_deb, "wakeup... reregister\n");
118   return kevent(spec->kqueue_fd, &kev, 1, NULL, 0, NULL);
119 }
120
121 static void *eventer_kqueue_spec_alloc() {
122   struct kqueue_spec *spec;
123   spec = calloc(1, sizeof(*spec));
124   spec->kqueue_fd = kqueue();
125   if(spec->kqueue_fd == -1) abort();
126   kqs_init(spec);
127   pthread_mutex_init(&spec->lock, NULL);
128   return spec;
129 }
130
131 static int eventer_kqueue_impl_init() {
132   int rv;
133
134   maxfds = eventer_impl_setrlimit();
135   master_fds = calloc(maxfds, sizeof(*master_fds));
136   masks = calloc(maxfds, sizeof(*masks));
137   master_fds = calloc(maxfds, sizeof(*master_fds));
138   masks = calloc(maxfds, sizeof(*masks));
139
140   /* super init */
141   if((rv = eventer_impl_init()) != 0) return rv;
142
143   signal(SIGPIPE, SIG_IGN);
144   return 0;
145 }
146 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
147   if(eventer_impl_propset(key, value)) {
148     /* Do our kqueue local properties here */
149     return -1;
150   }
151   return 0;
152 }
153 static void eventer_kqueue_impl_add(eventer_t e) {
154   assert(e->mask);
155   assert(eventer_is_loop(e->thr_owner));
156   ev_lock_state_t lockstate;
157   const char *cbname;
158   cbname = eventer_name_for_callback_e(e->callback, e);
159
160   if(e->mask & EVENTER_ASYNCH) {
161     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
162     eventer_add_asynch(NULL, e);
163     return;
164   }
165
166   /* Recurrent delegation */
167   if(e->mask & EVENTER_RECURRENT) {
168     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
169     eventer_add_recurrent(e);
170     return;
171   }
172
173   /* Timed events are simple */
174   if(e->mask & EVENTER_TIMER) {
175     eventer_add_timed(e);
176     return;
177   }
178
179   /* file descriptor event */
180   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
181   assert(e->whence.tv_sec == 0 && e->whence.tv_usec == 0);
182   lockstate = acquire_master_fd(e->fd);
183   master_fds[e->fd].e = e;
184   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
185     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
186   if(e->mask & (EVENTER_WRITE))
187     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
188   release_master_fd(e->fd, lockstate);
189 }
190 static eventer_t eventer_kqueue_impl_remove(eventer_t e) {
191   eventer_t removed = NULL;
192   if(e->mask & EVENTER_ASYNCH) {
193     abort();
194   }
195   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
196     ev_lock_state_t lockstate;
197     lockstate = acquire_master_fd(e->fd);
198     noitL(eventer_deb, "kqueue: remove(%d)\n", e->fd);
199     if(e == master_fds[e->fd].e) {
200       removed = e;
201       master_fds[e->fd].e = NULL;
202       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
203         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
204       if(e->mask & (EVENTER_WRITE))
205         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
206     } else
207       noitL(eventer_deb, "kqueue: remove(%d) failed.\n", e->fd);
208     release_master_fd(e->fd, lockstate);
209   }
210   else if(e->mask & EVENTER_TIMER) {
211     removed = eventer_remove_timed(e);
212   }
213   else if(e->mask & EVENTER_RECURRENT) {
214     removed = eventer_remove_recurrent(e);
215   }
216   else {
217     abort();
218   }
219   return removed;
220 }
221 static void eventer_kqueue_impl_update(eventer_t e, int mask) {
222   if(e->mask & EVENTER_TIMER) {
223     eventer_update_timed(e, mask);
224     return;
225   }
226   noitL(eventer_deb, "kqueue: update(%d, %x->%x)\n", e->fd, e->mask, mask);
227   /* Disable old, if they aren't active in the new */
228   if((e->mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
229      !(mask & (EVENTER_READ | EVENTER_EXCEPTION)))
230     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
231   if((e->mask & (EVENTER_WRITE)) &&
232      !(mask & (EVENTER_WRITE)))
233     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
234
235   /* Enable new, if the weren't in the old */
236   if((mask & (EVENTER_READ | EVENTER_EXCEPTION)) &&
237      !(e->mask & (EVENTER_READ | EVENTER_EXCEPTION)))
238     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
239   if((mask & (EVENTER_WRITE)) &&
240      !(e->mask & (EVENTER_WRITE)))
241     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
242
243   /* Switch */
244   e->mask = mask;
245 }
246 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
247   eventer_t eiq = NULL;
248   ev_lock_state_t lockstate;
249   if(master_fds[fd].e) {
250     noitL(eventer_deb, "kqueue: remove_fd(%d)\n", fd);
251     lockstate = acquire_master_fd(fd);
252     eiq = master_fds[fd].e;
253     master_fds[fd].e = NULL;
254     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
255       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
256     if(eiq->mask & (EVENTER_WRITE))
257       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
258     release_master_fd(fd, lockstate);
259   }
260   return eiq;
261 }
262 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
263   return master_fds[fd].e;
264 }
265 static void
266 alter_kqueue_mask(eventer_t e, int oldmask, int newmask) {
267   /* toggle the read bits if needed */
268   if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
269     if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
270       ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
271   }
272   else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
273     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
274
275   /* toggle the write bits if needed */
276   if(newmask & EVENTER_WRITE) {
277     if(!(oldmask & EVENTER_WRITE))
278       ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
279   }
280   else if(oldmask & EVENTER_WRITE)
281     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
282 }
283
284 static void eventer_kqueue_impl_trigger(eventer_t e, int mask) {
285   ev_lock_state_t lockstate;
286   struct timeval __now;
287   int oldmask, newmask;
288   const char *cbname;
289   int fd;
290
291   fd = e->fd;
292   if(e != master_fds[fd].e) return;
293   if(!pthread_equal(pthread_self(), e->thr_owner)) {
294     eventer_cross_thread_trigger(e,mask);
295     return;
296   }
297   lockstate = acquire_master_fd(fd);
298   if(lockstate == EV_ALREADY_OWNED) return;
299   assert(lockstate == EV_OWNED);
300
301   gettimeofday(&__now, NULL);
302   /* We're going to lie to ourselves.  You'd think this should be:
303    * oldmask = e->mask;  However, we just fired with masks[fd], so
304    * kqueue is clearly looking for all of the events in masks[fd].
305    * So, we combine them "just to be safe."
306    */
307   oldmask = e->mask | masks[fd];
308   cbname = eventer_name_for_callback_e(e->callback, e);
309   noitLT(eventer_deb, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
310          fd, masks[fd], cbname?cbname:"???", e->callback);
311   noit_memory_begin();
312   EVENTER_CALLBACK_ENTRY((void *)e, (void *)e->callback, (char *)cbname, fd, e->mask, mask);
313   newmask = e->callback(e, mask, e->closure, &__now);
314   EVENTER_CALLBACK_RETURN((void *)e, (void *)e->callback, (char *)cbname, newmask);
315   noit_memory_end();
316
317   if(newmask) {
318     if(!pthread_equal(pthread_self(), e->thr_owner)) {
319       pthread_t tgt = e->thr_owner;
320       e->thr_owner = pthread_self();
321       alter_kqueue_mask(e, oldmask, 0);
322       e->thr_owner = tgt;
323       alter_kqueue_mask(e, 0, newmask);
324       noitL(eventer_deb, "moved event[%p] from t@%u to t@%u\n",
325             e, (unsigned int)pthread_self(), (unsigned int)tgt);
326     }
327     else {
328       alter_kqueue_mask(e, oldmask, newmask);
329       /* Set our mask */
330       e->mask = newmask;
331     }
332   }
333   else {
334     /*
335      * Long story long:
336      *  When integrating with a few external event systems, we find
337      *  it difficult to make their use of remove+add as an update
338      *  as it can be recurrent in a single handler call and you cannot
339      *  remove completely from the event system if you are going to
340      *  just update (otherwise the eventer_t in your call stack could
341      *  be stale).  What we do is perform a superficial remove, marking
342      *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
343      *  we already have an event, we just update the mask (as we
344      *  have not yet returned to the eventer's loop.
345      *  This leaves us in a tricky situation when a remove is called
346      *  and the add doesn't roll in, we return 0 (mask == 0) and hit
347      *  this spot.  We have intended to remove the event, but it still
348      *  resides at master_fds[fd].e -- even after we free it.
349      *  So, in the evnet that we return 0 and the event that
350      *  master_fds[fd].e == the event we're about to free... we NULL
351      *  it out.
352      */
353     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
354     eventer_free(e);
355   }
356   release_master_fd(fd, lockstate);
357 }
358 static int eventer_kqueue_impl_loop() {
359   struct timeval __dyna_sleep = { 0, 0 };
360   KQUEUE_DECL;
361   KQUEUE_SETUP(NULL);
362
363         if(eventer_kqueue_impl_register_wakeup(kqs) == -1)
364     abort();
365
366   while(1) {
367     struct timeval __now, __sleeptime;
368     struct timespec __kqueue_sleeptime;
369     int fd_cnt = 0;
370
371     if(compare_timeval(eventer_max_sleeptime, __dyna_sleep) < 0)
372       __dyna_sleep = eventer_max_sleeptime;
373
374     __sleeptime = __dyna_sleep;
375
376     eventer_dispatch_timed(&__now, &__sleeptime);
377
378     if(compare_timeval(__sleeptime, __dyna_sleep) > 0)
379       __sleeptime = __dyna_sleep;
380
381     /* Handle cross_thread dispatches */
382     eventer_cross_thread_process();
383
384     /* Handle recurrent events */
385     eventer_dispatch_recurrent(&__now);
386
387     /* Now we move on to our fd-based events */
388     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
389     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
390     fd_cnt = kevent(kqs->kqueue_fd, ke_vec, ke_vec_used,
391                     ke_vec, ke_vec_a,
392                     &__kqueue_sleeptime);
393     kqs->wakeup_notify = 0;
394     if(ke_vec_used) noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqs->kqueue_fd, ke_vec_used, fd_cnt);
395     ke_vec_used = 0;
396     if(fd_cnt < 0) {
397       noitLT(eventer_err, &__now, "kevent(s/%d): %s\n", kqs->kqueue_fd, strerror(errno));
398     }
399     else if(fd_cnt == 0 ||
400             (fd_cnt == 1 && ke_vec[0].filter == EVFILT_USER)) {
401       /* timeout */
402       if(fd_cnt) eventer_kqueue_impl_register_wakeup(kqs);
403       add_timeval(__dyna_sleep, __dyna_increment, &__dyna_sleep);
404     }
405     else {
406       int idx;
407       __dyna_sleep.tv_sec = __dyna_sleep.tv_usec = 0; /* reset */
408       /* loop once to clear */
409       for(idx = 0; idx < fd_cnt; idx++) {
410         struct kevent *ke;
411         ke = &ke_vec[idx];
412         if(ke->flags & EV_ERROR) continue;
413         if(ke->filter == EVFILT_USER) {
414           eventer_kqueue_impl_register_wakeup(kqs);
415           continue;
416         }
417         masks[ke->ident] = 0;
418       }
419       /* Loop again to aggregate */
420       for(idx = 0; idx < fd_cnt; idx++) {
421         struct kevent *ke;
422         ke = &ke_vec[idx];
423         if(ke->flags & EV_ERROR) continue;
424         if(ke->filter == EVFILT_USER) continue;
425         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
426         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
427       }
428       /* Loop a last time to process */
429       for(idx = 0; idx < fd_cnt; idx++) {
430         struct kevent *ke;
431         eventer_t e;
432         int fd;
433
434         ke = &ke_vec[idx];
435         if(ke->filter == EVFILT_USER) continue;
436         if(ke->flags & EV_ERROR) {
437           if(ke->data != EBADF && ke->data != ENOENT)
438             noitLT(eventer_err, &__now, "error [%d]: %s\n",
439                    (int)ke->ident, strerror(ke->data));
440           continue;
441         }
442         assert((vpsized_int)ke->udata == (vpsized_int)ke->ident);
443         fd = ke->ident;
444         e = master_fds[fd].e;
445         /* If we've seen this fd, don't callback twice */
446         if(!masks[fd]) continue;
447         /* It's possible that someone removed the event and freed it
448          * before we got here.
449          */
450         if(e) eventer_kqueue_impl_trigger(e, masks[fd]);
451         masks[fd] = 0; /* indicates we've processed this fd */
452       }
453     }
454   }
455   /* NOTREACHED */
456   return 0;
457 }
458
459 void eventer_kqueue_impl_wakeup(eventer_t e) {
460   KQUEUE_DECL;
461   KQUEUE_SETUP(e);
462   if(noit_spinlock_trylock(&kqs->wakeup_notify))
463     eventer_kqueue_impl_wakeup_spec(kqs);
464 }
465
466 struct _eventer_impl eventer_kqueue_impl = {
467   "kqueue",
468   eventer_kqueue_impl_init,
469   eventer_kqueue_impl_propset,
470   eventer_kqueue_impl_add,
471   eventer_kqueue_impl_remove,
472   eventer_kqueue_impl_update,
473   eventer_kqueue_impl_remove_fd,
474   eventer_kqueue_impl_find_fd,
475   eventer_kqueue_impl_trigger,
476   eventer_kqueue_impl_loop,
477   eventer_kqueue_impl_foreach_fdevent,
478   eventer_kqueue_impl_wakeup,
479   eventer_kqueue_spec_alloc,
480   { 0, 200000 },
481   0,
482   NULL
483 };
Note: See TracBrowser for help on using the browser.