root/src/eventer/eventer_kqueue_impl.c

Revision 171d8ed2238b758bd302a0bb23cd0aeeabc50b33, 12.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

make eventer.h one-stop-shopping

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/event.h>
16 #include <pthread.h>
17 #include <assert.h>
18
19 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
20 static int maxfds;
21 static struct {
22   eventer_t e;
23   pthread_t executor;
24   noit_spinlock_t lock;
25 } *master_fds = NULL;
26 static int *masks;
27
28 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
29
30 static ev_lock_state_t
31 acquire_master_fd(int fd) {
32   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
33     master_fds[fd].executor = pthread_self();
34     return EV_OWNED;
35   }
36   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
37     return EV_ALREADY_OWNED;
38   }
39   noit_spinlock_lock(&master_fds[fd].lock);
40   master_fds[fd].executor = pthread_self();
41   return EV_OWNED;
42 }
43 static void
44 release_master_fd(int fd, ev_lock_state_t as) {
45   if(as == EV_OWNED) {
46     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
47     noit_spinlock_unlock(&master_fds[fd].lock);
48   }
49 }
50
51 static pthread_t master_thread;
52 static int kqueue_fd = -1;
53 typedef struct kqueue_setup {
54   struct kevent *__ke_vec;
55   unsigned int __ke_vec_a;
56   unsigned int __ke_vec_used;
57 } *kqs_t;
58
59 static pthread_mutex_t kqs_lock;
60 static pthread_mutex_t te_lock;
61 static kqs_t master_kqs = NULL;
62 static pthread_key_t kqueue_setup_key;
63 static noit_skiplist *timed_events = NULL;
64 #define KQUEUE_DECL kqs_t kqs
65 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
66 #define ke_vec kqs->__ke_vec
67 #define ke_vec_a kqs->__ke_vec_a
68 #define ke_vec_used kqs->__ke_vec_used
69
70 static void kqs_init(kqs_t kqs) {
71   enum { initial_alloc = 64 };
72   ke_vec_a = initial_alloc;
73   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
74 }
75 static void
76 ke_change (register int const ident,
77            register int const filter,
78            register int const flags,
79            register void *const udata) {
80   register struct kevent *kep;
81   KQUEUE_DECL;
82   KQUEUE_SETUP;
83   if(!kqs) kqs = master_kqs;
84
85   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
86   if (!ke_vec_a) {
87     kqs_init(kqs);
88   }
89   else if (ke_vec_used == ke_vec_a) {
90     ke_vec_a <<= 1;
91     ke_vec = (struct kevent *) realloc(ke_vec,
92                                        ke_vec_a * sizeof (struct kevent));
93   }
94   kep = &ke_vec[ke_vec_used++];
95
96   EV_SET(kep, ident, filter, flags, 0, 0, udata);
97   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
98 }
99
100 static int eventer_kqueue_impl_init() {
101   struct rlimit rlim;
102   int rv;
103
104   /* super init */
105   if((rv = eventer_impl_init()) != 0) return rv;
106
107   master_thread = pthread_self();
108   signal(SIGPIPE, SIG_IGN);
109   kqueue_fd = kqueue();
110   if(kqueue_fd == -1) {
111     return -1;
112   }
113   pthread_mutex_init(&kqs_lock, NULL);
114   pthread_mutex_init(&te_lock, NULL);
115   pthread_key_create(&kqueue_setup_key, NULL);
116   master_kqs = calloc(1, sizeof(*master_kqs));
117   kqs_init(master_kqs);
118   getrlimit(RLIMIT_NOFILE, &rlim);
119   maxfds = rlim.rlim_cur;
120   master_fds = calloc(maxfds, sizeof(*master_fds));
121   masks = calloc(maxfds, sizeof(*masks));
122   timed_events = calloc(1, sizeof(*timed_events));
123   noit_skiplist_init(timed_events);
124   noit_skiplist_set_compare(timed_events,
125                             eventer_timecompare, eventer_timecompare);
126   noit_skiplist_add_index(timed_events,
127                           noit_compare_voidptr, noit_compare_voidptr);
128   return 0;
129 }
130 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
131   if(eventer_impl_propset(key, value)) {
132     /* Do our kqueue local properties here */
133     return -1;
134   }
135   return 0;
136 }
137 static void eventer_kqueue_impl_add(eventer_t e) {
138   assert(e->mask);
139   ev_lock_state_t lockstate;
140
141   if(e->mask & EVENTER_ASYNCH) {
142     eventer_add_asynch(NULL, e);
143     return;
144   }
145
146   /* Recurrent delegation */
147   if(e->mask & EVENTER_RECURRENT) {
148     eventer_add_recurrent(e);
149     return;
150   }
151
152   /* Timed events are simple */
153   if(e->mask & EVENTER_TIMER) {
154     pthread_mutex_lock(&te_lock);
155     noit_skiplist_insert(timed_events, e);
156     pthread_mutex_unlock(&te_lock);
157     return;
158   }
159
160   /* file descriptor event */
161   lockstate = acquire_master_fd(e->fd);
162   master_fds[e->fd].e = e;
163   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
164     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
165   if(e->mask & (EVENTER_WRITE))
166     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
167   release_master_fd(e->fd, lockstate);
168 }
169 static eventer_t eventer_kqueue_impl_remove(eventer_t e) {
170   eventer_t removed = NULL;
171   if(e->mask & EVENTER_ASYNCH) {
172     abort();
173   }
174   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
175     ev_lock_state_t lockstate;
176     lockstate = acquire_master_fd(e->fd);
177     if(e == master_fds[e->fd].e) {
178       removed = e;
179       master_fds[e->fd].e = NULL;
180       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
181         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
182       if(e->mask & (EVENTER_WRITE))
183         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
184     }
185     release_master_fd(e->fd, lockstate);
186   }
187   else if(e->mask & EVENTER_TIMER) {
188     pthread_mutex_lock(&te_lock);
189     if(noit_skiplist_remove_compare(timed_events, e, NULL,
190                                     noit_compare_voidptr))
191       removed = e;
192     pthread_mutex_unlock(&te_lock);
193   }
194   else if(e->mask & EVENTER_RECURRENT) {
195     removed = eventer_remove_recurrent(e);
196   }
197   else {
198     abort();
199   }
200   return removed;
201 }
202 static void eventer_kqueue_impl_update(eventer_t e) {
203   if(e->mask & EVENTER_TIMER) {
204     pthread_mutex_lock(&te_lock);
205     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
206     noit_skiplist_insert(timed_events, e);
207     pthread_mutex_unlock(&te_lock);
208     return;
209   }
210   ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
211   ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
212   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
213     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
214   if(e->mask & (EVENTER_WRITE))
215     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
216 }
217 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
218   eventer_t eiq = NULL;
219   ev_lock_state_t lockstate;
220   if(master_fds[fd].e) {
221     lockstate = acquire_master_fd(fd);
222     eiq = master_fds[fd].e;
223     master_fds[fd].e = NULL;
224     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
225       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
226     if(eiq->mask & (EVENTER_WRITE))
227       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
228     release_master_fd(fd, lockstate);
229   }
230   return eiq;
231 }
232 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
233   return master_fds[fd].e;
234 }
235 static int eventer_kqueue_impl_loop() {
236   int is_master_thread = 0;
237   pthread_t self;
238   KQUEUE_DECL;
239   KQUEUE_SETUP;
240
241   self = pthread_self();
242   if(pthread_equal(self, master_thread)) is_master_thread = 1;
243
244   if(!kqs) {
245     kqs = calloc(1, sizeof(*kqs));
246     kqs_init(kqs);
247   }
248   pthread_setspecific(kqueue_setup_key, kqs);
249   while(1) {
250     struct timeval __now, __sleeptime;
251     struct timespec __kqueue_sleeptime;
252     int fd_cnt = 0;
253     int max_timed_events_to_process;
254     int newmask;
255
256     __sleeptime = __max_sleeptime;
257
258     /* Handle timed events...
259      * we could be multithreaded, so if we pop forever we could starve
260      * ourselves. */
261     max_timed_events_to_process = timed_events->size;
262     while(max_timed_events_to_process-- > 0) {
263       eventer_t timed_event;
264
265       gettimeofday(&__now, NULL);
266
267       pthread_mutex_lock(&te_lock);
268       /* Peek at our next timed event, if should fire, pop it.
269        * otherwise we noop and NULL it out to break the loop. */
270       timed_event = noit_skiplist_peek(timed_events);
271       if(timed_event) {
272         if(compare_timeval(timed_event->whence, __now) < 0) {
273           timed_event = noit_skiplist_pop(timed_events, NULL);
274         }
275         else {
276           sub_timeval(timed_event->whence, __now, &__sleeptime);
277           timed_event = NULL;
278         }
279       }
280       pthread_mutex_unlock(&te_lock);
281       if(timed_event == NULL) break;
282
283       /* Make our call */
284       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
285                                       timed_event->closure, &__now);
286       if(newmask)
287         eventer_add(timed_event);
288       else
289         eventer_free(timed_event);
290     }
291
292     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
293       /* we exceed our configured maximum, set it down */
294       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
295     }
296
297     /* Handle recurrent events */
298     eventer_dispatch_recurrent(&__now);
299
300     /* If we're the master, we need to lock the master_kqs and make mods */
301     if(master_kqs->__ke_vec_used) {
302       struct timespec __zerotime = { 0, 0 };
303       pthread_mutex_lock(&kqs_lock);
304       fd_cnt = kevent(kqueue_fd,
305                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
306                       NULL, 0,
307                       &__zerotime);
308       noitLT(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
309       if(fd_cnt < 0) {
310         noitLT(noit_error, &__now, "kevent: %s\n", strerror(errno));
311       }
312       master_kqs->__ke_vec_used = 0;
313       pthread_mutex_unlock(&kqs_lock);
314     }
315
316     /* Now we move on to our fd-based events */
317     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
318     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
319     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
320                     ke_vec, ke_vec_a,
321                     &__kqueue_sleeptime);
322     noitLT(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
323     ke_vec_used = 0;
324     if(fd_cnt < 0) {
325       noitLT(noit_error, &__now, "kevent: %s\n", strerror(errno));
326     }
327     else {
328       int idx;
329       /* loop once to clear */
330       for(idx = 0; idx < fd_cnt; idx++) {
331         struct kevent *ke;
332         ke = &ke_vec[idx];
333         if(ke->flags & EV_ERROR) continue;
334         masks[ke->ident] = 0;
335       }
336       /* Loop again to aggregate */
337       for(idx = 0; idx < fd_cnt; idx++) {
338         struct kevent *ke;
339         ke = &ke_vec[idx];
340         if(ke->flags & EV_ERROR) continue;
341         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
342         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
343       }
344       /* Loop a last time to process */
345       for(idx = 0; idx < fd_cnt; idx++) {
346         const char *cbname;
347         ev_lock_state_t lockstate;
348         struct kevent *ke;
349         eventer_t e;
350         int fd, oldmask;
351
352         ke = &ke_vec[idx];
353         if(ke->flags & EV_ERROR) {
354           if(ke->data != EBADF)
355             noitLT(noit_error, &__now, "error: %s\n", strerror(ke->data));
356           continue;
357         }
358         e = (eventer_t)ke->udata;
359         fd = ke->ident;
360         if(!masks[fd]) continue;
361         assert(e == master_fds[fd].e);
362         lockstate = acquire_master_fd(fd);
363         assert(lockstate == EV_OWNED);
364
365         gettimeofday(&__now, NULL);
366         oldmask = e->mask;
367         cbname = eventer_name_for_callback(e->callback);
368         noitLT(noit_debug, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
369                fd, masks[fd], cbname?cbname:"???", e->callback);
370         newmask = e->callback(e, masks[fd], e->closure, &__now);
371         masks[fd] = 0; /* indicates we've processed this fd */
372
373         if(newmask) {
374           /* toggle the read bits if needed */
375           if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
376             if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
377               ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
378           }
379           else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
380             ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
381  
382           /* toggle the write bits if needed */
383           if(newmask & EVENTER_WRITE) {
384             if(!(oldmask & EVENTER_WRITE))
385               ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
386           }
387           else if(oldmask & EVENTER_WRITE)
388               ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
389  
390           /* Set our mask */
391           e->mask = newmask;
392         }
393         else {
394           eventer_free(e);
395         }
396         release_master_fd(fd, lockstate);
397       }
398     }
399   }
400   /* NOTREACHED */
401   return 0;
402 }
403
404 struct _eventer_impl eventer_kqueue_impl = {
405   "kqueue",
406   eventer_kqueue_impl_init,
407   eventer_kqueue_impl_propset,
408   eventer_kqueue_impl_add,
409   eventer_kqueue_impl_remove,
410   eventer_kqueue_impl_update,
411   eventer_kqueue_impl_remove_fd,
412   eventer_kqueue_impl_find_fd,
413   eventer_kqueue_impl_loop
414 };
Note: See TracBrowser for help on using the browser.