root/src/eventer/eventer_kqueue_impl.c

Revision f920aec8f7779dc67b4ea347d8da0384b27627b4, 14.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

race condition

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/event.h>
16 #include <pthread.h>
17 #include <assert.h>
18
19 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
20 static int maxfds;
21 static struct {
22   eventer_t e;
23   pthread_t executor;
24   noit_spinlock_t lock;
25 } *master_fds = NULL;
26 static int *masks;
27
28 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
29
30 static ev_lock_state_t
31 acquire_master_fd(int fd) {
32   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
33     master_fds[fd].executor = pthread_self();
34     return EV_OWNED;
35   }
36   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
37     return EV_ALREADY_OWNED;
38   }
39   noit_spinlock_lock(&master_fds[fd].lock);
40   master_fds[fd].executor = pthread_self();
41   return EV_OWNED;
42 }
43 static void
44 release_master_fd(int fd, ev_lock_state_t as) {
45   if(as == EV_OWNED) {
46     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
47     noit_spinlock_unlock(&master_fds[fd].lock);
48   }
49 }
50
51 static pthread_t master_thread;
52 static int kqueue_fd = -1;
53 typedef struct kqueue_setup {
54   struct kevent *__ke_vec;
55   unsigned int __ke_vec_a;
56   unsigned int __ke_vec_used;
57 } *kqs_t;
58
59 static pthread_mutex_t kqs_lock;
60 static pthread_mutex_t te_lock;
61 static kqs_t master_kqs = NULL;
62 static pthread_key_t kqueue_setup_key;
63 static noit_skiplist *timed_events = NULL;
64 #define KQUEUE_DECL kqs_t kqs
65 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
66 #define ke_vec kqs->__ke_vec
67 #define ke_vec_a kqs->__ke_vec_a
68 #define ke_vec_used kqs->__ke_vec_used
69
70 static void kqs_init(kqs_t kqs) {
71   enum { initial_alloc = 64 };
72   ke_vec_a = initial_alloc;
73   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
74 }
75 static void
76 ke_change (register int const ident,
77            register int const filter,
78            register int const flags,
79            register void *const udata) {
80   register struct kevent *kep;
81   KQUEUE_DECL;
82   KQUEUE_SETUP;
83   if(!kqs) kqs = master_kqs;
84
85   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
86   if (!ke_vec_a) {
87     kqs_init(kqs);
88   }
89   else if (ke_vec_used == ke_vec_a) {
90     ke_vec_a <<= 1;
91     ke_vec = (struct kevent *) realloc(ke_vec,
92                                        ke_vec_a * sizeof (struct kevent));
93   }
94   kep = &ke_vec[ke_vec_used++];
95
96   EV_SET(kep, ident, filter, flags, 0, 0, udata);
97   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
98 }
99
100 static int eventer_kqueue_impl_init() {
101   struct rlimit rlim;
102   int rv;
103
104   /* super init */
105   if((rv = eventer_impl_init()) != 0) return rv;
106
107   master_thread = pthread_self();
108   signal(SIGPIPE, SIG_IGN);
109   kqueue_fd = kqueue();
110   if(kqueue_fd == -1) {
111     return -1;
112   }
113   pthread_mutex_init(&kqs_lock, NULL);
114   pthread_mutex_init(&te_lock, NULL);
115   pthread_key_create(&kqueue_setup_key, NULL);
116   master_kqs = calloc(1, sizeof(*master_kqs));
117   kqs_init(master_kqs);
118   getrlimit(RLIMIT_NOFILE, &rlim);
119   maxfds = rlim.rlim_cur;
120   master_fds = calloc(maxfds, sizeof(*master_fds));
121   masks = calloc(maxfds, sizeof(*masks));
122   timed_events = calloc(1, sizeof(*timed_events));
123   noit_skiplist_init(timed_events);
124   noit_skiplist_set_compare(timed_events,
125                             eventer_timecompare, eventer_timecompare);
126   noit_skiplist_add_index(timed_events,
127                           noit_compare_voidptr, noit_compare_voidptr);
128   return 0;
129 }
130 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
131   if(eventer_impl_propset(key, value)) {
132     /* Do our kqueue local properties here */
133     return -1;
134   }
135   return 0;
136 }
137 static void eventer_kqueue_impl_add(eventer_t e) {
138   assert(e->mask);
139   ev_lock_state_t lockstate;
140   const char *cbname;
141   cbname = eventer_name_for_callback(e->callback);
142
143   if(e->mask & EVENTER_ASYNCH) {
144     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
145     eventer_add_asynch(NULL, e);
146     return;
147   }
148
149   /* Recurrent delegation */
150   if(e->mask & EVENTER_RECURRENT) {
151     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
152     eventer_add_recurrent(e);
153     return;
154   }
155
156   /* Timed events are simple */
157   if(e->mask & EVENTER_TIMER) {
158     noitL(eventer_deb, "debug: eventer_add timed (%s)\n", cbname ? cbname : "???");
159     pthread_mutex_lock(&te_lock);
160     noit_skiplist_insert(timed_events, e);
161     pthread_mutex_unlock(&te_lock);
162     return;
163   }
164
165   /* file descriptor event */
166   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
167   lockstate = acquire_master_fd(e->fd);
168   master_fds[e->fd].e = e;
169   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
170     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
171   if(e->mask & (EVENTER_WRITE))
172     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
173   release_master_fd(e->fd, lockstate);
174 }
175 static eventer_t eventer_kqueue_impl_remove(eventer_t e) {
176   eventer_t removed = NULL;
177   if(e->mask & EVENTER_ASYNCH) {
178     abort();
179   }
180   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
181     ev_lock_state_t lockstate;
182     lockstate = acquire_master_fd(e->fd);
183     if(e == master_fds[e->fd].e) {
184       removed = e;
185       master_fds[e->fd].e = NULL;
186       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
187         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
188       if(e->mask & (EVENTER_WRITE))
189         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
190     }
191     release_master_fd(e->fd, lockstate);
192   }
193   else if(e->mask & EVENTER_TIMER) {
194     pthread_mutex_lock(&te_lock);
195     if(noit_skiplist_remove_compare(timed_events, e, NULL,
196                                     noit_compare_voidptr))
197       removed = e;
198     pthread_mutex_unlock(&te_lock);
199   }
200   else if(e->mask & EVENTER_RECURRENT) {
201     removed = eventer_remove_recurrent(e);
202   }
203   else {
204     abort();
205   }
206   return removed;
207 }
208 static void eventer_kqueue_impl_update(eventer_t e) {
209   if(e->mask & EVENTER_TIMER) {
210     pthread_mutex_lock(&te_lock);
211     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
212     noit_skiplist_insert(timed_events, e);
213     pthread_mutex_unlock(&te_lock);
214     return;
215   }
216   ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
217   ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
218   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
219     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
220   if(e->mask & (EVENTER_WRITE))
221     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
222 }
223 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
224   eventer_t eiq = NULL;
225   ev_lock_state_t lockstate;
226   if(master_fds[fd].e) {
227     lockstate = acquire_master_fd(fd);
228     eiq = master_fds[fd].e;
229     master_fds[fd].e = NULL;
230     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
231       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
232     if(eiq->mask & (EVENTER_WRITE))
233       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
234     release_master_fd(fd, lockstate);
235   }
236   return eiq;
237 }
238 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
239   return master_fds[fd].e;
240 }
241 static int eventer_kqueue_impl_loop() {
242   int is_master_thread = 0;
243   pthread_t self;
244   KQUEUE_DECL;
245   KQUEUE_SETUP;
246
247   self = pthread_self();
248   if(pthread_equal(self, master_thread)) is_master_thread = 1;
249
250   if(!kqs) {
251     kqs = calloc(1, sizeof(*kqs));
252     kqs_init(kqs);
253   }
254   pthread_setspecific(kqueue_setup_key, kqs);
255   while(1) {
256     const char *cbname;
257     struct timeval __now, __sleeptime;
258     struct timespec __kqueue_sleeptime;
259     int fd_cnt = 0;
260     int max_timed_events_to_process;
261     int newmask;
262
263     __sleeptime = __max_sleeptime;
264
265     /* Handle timed events...
266      * we could be multithreaded, so if we pop forever we could starve
267      * ourselves. */
268     max_timed_events_to_process = timed_events->size;
269     while(max_timed_events_to_process-- > 0) {
270       eventer_t timed_event;
271
272       gettimeofday(&__now, NULL);
273
274       pthread_mutex_lock(&te_lock);
275       /* Peek at our next timed event, if should fire, pop it.
276        * otherwise we noop and NULL it out to break the loop. */
277       timed_event = noit_skiplist_peek(timed_events);
278       if(timed_event) {
279         if(compare_timeval(timed_event->whence, __now) < 0) {
280           timed_event = noit_skiplist_pop(timed_events, NULL);
281         }
282         else {
283           sub_timeval(timed_event->whence, __now, &__sleeptime);
284           timed_event = NULL;
285         }
286       }
287       pthread_mutex_unlock(&te_lock);
288       if(timed_event == NULL) break;
289
290       cbname = eventer_name_for_callback(timed_event->callback);
291       noitLT(eventer_deb, &__now, "debug: timed dispatch(%s)\n", cbname ? cbname : "???");
292       /* Make our call */
293       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
294                                       timed_event->closure, &__now);
295       if(newmask)
296         eventer_add(timed_event);
297       else
298         eventer_free(timed_event);
299     }
300
301     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
302       /* we exceed our configured maximum, set it down */
303       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
304     }
305
306     /* Handle recurrent events */
307     eventer_dispatch_recurrent(&__now);
308
309     /* If we're the master, we need to lock the master_kqs and make mods */
310     if(master_kqs->__ke_vec_used) {
311       struct timespec __zerotime = { 0, 0 };
312       pthread_mutex_lock(&kqs_lock);
313       fd_cnt = kevent(kqueue_fd,
314                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
315                       NULL, 0,
316                       &__zerotime);
317       noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
318       if(fd_cnt < 0) {
319         noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
320       }
321       master_kqs->__ke_vec_used = 0;
322       pthread_mutex_unlock(&kqs_lock);
323     }
324
325     /* Now we move on to our fd-based events */
326     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
327     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
328     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
329                     ke_vec, ke_vec_a,
330                     &__kqueue_sleeptime);
331     noitLT(eventer_deb, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
332     ke_vec_used = 0;
333     if(fd_cnt < 0) {
334       noitLT(eventer_err, &__now, "kevent: %s\n", strerror(errno));
335     }
336     else {
337       int idx;
338       /* loop once to clear */
339       for(idx = 0; idx < fd_cnt; idx++) {
340         struct kevent *ke;
341         ke = &ke_vec[idx];
342         if(ke->flags & EV_ERROR) continue;
343         masks[ke->ident] = 0;
344       }
345       /* Loop again to aggregate */
346       for(idx = 0; idx < fd_cnt; idx++) {
347         struct kevent *ke;
348         ke = &ke_vec[idx];
349         if(ke->flags & EV_ERROR) continue;
350         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
351         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
352       }
353       /* Loop a last time to process */
354       for(idx = 0; idx < fd_cnt; idx++) {
355         ev_lock_state_t lockstate;
356         struct kevent *ke;
357         eventer_t e;
358         int fd, oldmask;
359
360         ke = &ke_vec[idx];
361         if(ke->flags & EV_ERROR) {
362           if(ke->data != EBADF)
363             noitLT(eventer_err, &__now, "error: %s\n", strerror(ke->data));
364           continue;
365         }
366         e = (eventer_t)ke->udata;
367         fd = ke->ident;
368         /* If we've seen this fd, don't callback twice */
369         if(!masks[fd]) continue;
370         /* It's possible that someone removed the event and freed it
371          * before we got here.
372          */
373         if(e != master_fds[fd].e) continue;
374         lockstate = acquire_master_fd(fd);
375         assert(lockstate == EV_OWNED);
376
377         gettimeofday(&__now, NULL);
378         oldmask = e->mask;
379         cbname = eventer_name_for_callback(e->callback);
380         noitLT(eventer_deb, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
381                fd, masks[fd], cbname?cbname:"???", e->callback);
382         newmask = e->callback(e, masks[fd], e->closure, &__now);
383         masks[fd] = 0; /* indicates we've processed this fd */
384
385         if(newmask) {
386           /* toggle the read bits if needed */
387           if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
388             if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
389               ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
390           }
391           else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
392             ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
393  
394           /* toggle the write bits if needed */
395           if(newmask & EVENTER_WRITE) {
396             if(!(oldmask & EVENTER_WRITE))
397               ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
398           }
399           else if(oldmask & EVENTER_WRITE)
400               ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
401  
402           /* Set our mask */
403           e->mask = newmask;
404         }
405         else {
406           /*
407            * Long story long:
408            *  When integrating with a few external event systems, we find
409            *  it difficult to make their use of remove+add as an update
410            *  as it can be recurrent in a single handler call and you cannot
411            *  remove completely from the event system if you are going to
412            *  just update (otherwise the eventer_t in your call stack could
413            *  be stale).  What we do is perform a superficial remove, marking
414            *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
415            *  we already have an event, we just update the mask (as we
416            *  have not yet returned to the eventer's loop.
417            *  This leaves us in a tricky situation when a remove is called
418            *  and the add doesn't roll in, we return 0 (mask == 0) and hit
419            *  this spot.  We have intended to remove the event, but it still
420            *  resides at master_fds[fd].e -- even after we free it.
421            *  So, in the evnet that we return 0 and the event that
422            *  master_fds[fd].e == the event we're about to free... we NULL
423            *  it out.
424            */
425           if(master_fds[fd].e == e) master_fds[fd].e = NULL;
426           eventer_free(e);
427         }
428         release_master_fd(fd, lockstate);
429       }
430     }
431   }
432   /* NOTREACHED */
433   return 0;
434 }
435
436 struct _eventer_impl eventer_kqueue_impl = {
437   "kqueue",
438   eventer_kqueue_impl_init,
439   eventer_kqueue_impl_propset,
440   eventer_kqueue_impl_add,
441   eventer_kqueue_impl_remove,
442   eventer_kqueue_impl_update,
443   eventer_kqueue_impl_remove_fd,
444   eventer_kqueue_impl_find_fd,
445   eventer_kqueue_impl_loop
446 };
Note: See TracBrowser for help on using the browser.