root/src/eventer/eventer_kqueue_impl.c

Revision f235ad41b286a70c1f8e40c8e56eebc9b051508a, 10.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

make this work on Mac OS X

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/event.h>
16 #include <pthread.h>
17 #include <assert.h>
18
19 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
20 static int maxfds;
21 static struct {
22   eventer_t e;
23   pthread_t executor;
24   noit_spinlock_t lock;
25 } *master_fds = NULL;
26
27 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
28
29 static ev_lock_state_t
30 acquire_master_fd(int fd) {
31   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
32     master_fds[fd].executor = pthread_self();
33     return EV_OWNED;
34   }
35   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
36     return EV_ALREADY_OWNED;
37   }
38   noit_spinlock_lock(&master_fds[fd].lock);
39   master_fds[fd].executor = pthread_self();
40   return EV_OWNED;
41 }
42 static void
43 release_master_fd(int fd, ev_lock_state_t as) {
44   if(as == EV_OWNED) {
45     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
46     noit_spinlock_unlock(&master_fds[fd].lock);
47   }
48 }
49
50 static pthread_t master_thread;
51 static int kqueue_fd = -1;
52 typedef struct kqueue_setup {
53   struct kevent *__ke_vec;
54   unsigned int __ke_vec_a;
55   unsigned int __ke_vec_used;
56 } *kqs_t;
57
58 static pthread_mutex_t kqs_lock;
59 static pthread_mutex_t te_lock;
60 static kqs_t master_kqs = NULL;
61 static pthread_key_t kqueue_setup_key;
62 static noit_skiplist *timed_events = NULL;
63 #define KQUEUE_DECL kqs_t kqs
64 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
65 #define ke_vec kqs->__ke_vec
66 #define ke_vec_a kqs->__ke_vec_a
67 #define ke_vec_used kqs->__ke_vec_used
68
69 static void kqs_init(kqs_t kqs) {
70   enum { initial_alloc = 64 };
71   ke_vec_a = initial_alloc;
72   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
73 }
74 static void
75 ke_change (register int const ident,
76            register int const filter,
77            register int const flags,
78            register void *const udata) {
79   register struct kevent *kep;
80   KQUEUE_DECL;
81   KQUEUE_SETUP;
82   if(!kqs) kqs = master_kqs;
83
84   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
85   if (!ke_vec_a) {
86     kqs_init(kqs);
87   }
88   else if (ke_vec_used == ke_vec_a) {
89     ke_vec_a <<= 1;
90     ke_vec = (struct kevent *) realloc(ke_vec,
91                                        ke_vec_a * sizeof (struct kevent));
92   }
93   kep = &ke_vec[ke_vec_used++];
94
95   EV_SET(kep, ident, filter, flags, 0, 0, udata);
96   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
97 }
98
99 static int eventer_kqueue_impl_init() {
100   struct rlimit rlim;
101   master_thread = pthread_self();
102   kqueue_fd = kqueue();
103   if(kqueue_fd == -1) {
104     return -1;
105   }
106   pthread_mutex_init(&kqs_lock, NULL);
107   pthread_mutex_init(&te_lock, NULL);
108   pthread_key_create(&kqueue_setup_key, NULL);
109   master_kqs = calloc(1, sizeof(*master_kqs));
110   kqs_init(master_kqs);
111   getrlimit(RLIMIT_NOFILE, &rlim);
112   maxfds = rlim.rlim_cur;
113   master_fds = calloc(maxfds, sizeof(*master_fds));
114   timed_events = calloc(1, sizeof(*timed_events));
115   noit_skiplist_init(timed_events);
116   noit_skiplist_set_compare(timed_events,
117                             eventer_timecompare, eventer_timecompare);
118   noit_skiplist_add_index(timed_events,
119                           noit_compare_voidptr, noit_compare_voidptr);
120   return 0;
121 }
122 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
123   return -1;
124 }
125 static void eventer_kqueue_impl_add(eventer_t e) {
126   assert(e->mask);
127   ev_lock_state_t lockstate;
128
129   /* Timed events are simple */
130   if(e->mask == EVENTER_TIMER) {
131     pthread_mutex_lock(&te_lock);
132     noit_skiplist_insert(timed_events, e);
133     pthread_mutex_unlock(&te_lock);
134     return;
135   }
136
137   /* file descriptor event */
138   lockstate = acquire_master_fd(e->fd);
139   master_fds[e->fd].e = e;
140   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
141     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
142   if(e->mask & (EVENTER_WRITE))
143     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
144   release_master_fd(e->fd, lockstate);
145 }
146 static void eventer_kqueue_impl_remove(eventer_t e) {
147   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
148     ev_lock_state_t lockstate;
149     lockstate = acquire_master_fd(e->fd);
150     if(e == master_fds[e->fd].e) {
151       master_fds[e->fd].e = NULL;
152       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
153         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
154       if(e->mask & (EVENTER_WRITE))
155         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
156     }
157     release_master_fd(e->fd, lockstate);
158   }
159   else if(e->mask & EVENTER_TIMER) {
160     pthread_mutex_lock(&te_lock);
161     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
162     pthread_mutex_unlock(&te_lock);
163   }
164   else {
165     abort();
166   }
167 }
168 static void eventer_kqueue_impl_update(eventer_t e) {
169   if(e->mask & EVENTER_TIMER) {
170     pthread_mutex_lock(&te_lock);
171     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
172     noit_skiplist_insert(timed_events, e);
173     pthread_mutex_unlock(&te_lock);
174     return;
175   }
176   ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
177   ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
178   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
179     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
180   if(e->mask & (EVENTER_WRITE))
181     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
182 }
183 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
184   eventer_t eiq = NULL;
185   ev_lock_state_t lockstate;
186   if(master_fds[fd].e) {
187     lockstate = acquire_master_fd(fd);
188     eiq = master_fds[fd].e;
189     master_fds[fd].e = NULL;
190     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
191       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
192     if(eiq->mask & (EVENTER_WRITE))
193       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
194     release_master_fd(fd, lockstate);
195   }
196   return eiq;
197 }
198 static void eventer_kqueue_impl_loop() {
199   int is_master_thread = 0;
200   pthread_t self;
201   KQUEUE_DECL;
202   KQUEUE_SETUP;
203
204   self = pthread_self();
205   if(pthread_equal(self, master_thread)) is_master_thread = 1;
206
207   if(!kqs) {
208     kqs = calloc(1, sizeof(*kqs));
209     kqs_init(kqs);
210   }
211   pthread_setspecific(kqueue_setup_key, kqs);
212   while(1) {
213     struct timeval __now, __sleeptime;
214     struct timespec __kqueue_sleeptime;
215     int fd_cnt = 0;
216     int max_timed_events_to_process;
217     int newmask;
218
219     __sleeptime = __max_sleeptime;
220
221     /* Handle timed events...
222      * we could be multithreaded, so if we pop forever we could starve
223      * ourselves. */
224     max_timed_events_to_process = timed_events->size;
225     while(max_timed_events_to_process-- > 0) {
226       eventer_t timed_event;
227
228       gettimeofday(&__now, NULL);
229
230       pthread_mutex_lock(&te_lock);
231       /* Peek at our next timed event, if should fire, pop it.
232        * otherwise we noop and NULL it out to break the loop. */
233       timed_event = noit_skiplist_peek(timed_events);
234       if(timed_event) {
235         if(compare_timeval(timed_event->whence, __now) < 0) {
236           timed_event = noit_skiplist_pop(timed_events, NULL);
237         }
238         else {
239           sub_timeval(timed_event->whence, __now, &__sleeptime);
240           timed_event = NULL;
241         }
242       }
243       pthread_mutex_unlock(&te_lock);
244       if(timed_event == NULL) break;
245
246       /* Make our call */
247       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
248                                       timed_event->closure, &__now);
249       if(newmask)
250         eventer_add(timed_event);
251       else
252         eventer_free(timed_event);
253     }
254
255     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
256       /* we exceed our configured maximum, set it down */
257       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
258     }
259
260     /* If we're the master, we need to lock the master_kqs and make mods */
261     if(master_kqs->__ke_vec_used) {
262       struct timespec __zerotime = { 0, 0 };
263       pthread_mutex_lock(&kqs_lock);
264       fd_cnt = kevent(kqueue_fd,
265                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
266                       NULL, 0,
267                       &__zerotime);
268       noit_log(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
269       if(fd_cnt < 0) {
270         noit_log(noit_error, &__now, "kevent: %s\n", strerror(errno));
271       }
272       master_kqs->__ke_vec_used = 0;
273       pthread_mutex_unlock(&kqs_lock);
274     }
275
276     /* Now we move on to our fd-based events */
277     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
278     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
279     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
280                     ke_vec, ke_vec_a,
281                     &__kqueue_sleeptime);
282     noit_log(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
283     ke_vec_used = 0;
284     if(fd_cnt < 0) {
285       noit_log(noit_error, &__now, "kevent: %s\n", strerror(errno));
286     }
287     else {
288       int idx;
289       for(idx = 0; idx < fd_cnt; idx++) {
290         ev_lock_state_t lockstate;
291         struct kevent *ke;
292         eventer_t e;
293         int fd, evmask, oldmask;
294
295         ke = &ke_vec[idx];
296         if(ke->flags & EV_ERROR) {
297           if(ke->data != EBADF)
298             noit_log(noit_error, &__now, "error: %s\n", strerror(ke->data));
299           continue;
300         }
301         e = (eventer_t)ke->udata;
302         fd = ke->ident;
303         assert(e == master_fds[fd].e);
304         lockstate = acquire_master_fd(fd);
305         assert(lockstate == EV_OWNED);
306
307         evmask = 0;
308         if(ke->filter == EVFILT_READ) evmask = EVENTER_READ;
309         if(ke->filter == EVFILT_WRITE) evmask = EVENTER_WRITE;
310         gettimeofday(&__now, NULL);
311         oldmask = e->mask;
312         newmask = e->callback(e, evmask, e->closure, &__now);
313
314         if(newmask) {
315           /* toggle the read bits if needed */
316           if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
317             if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
318               ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
319           }
320           else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
321             ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
322  
323           /* toggle the write bits if needed */
324           if(newmask & EVENTER_WRITE) {
325             if(!(oldmask & EVENTER_WRITE))
326               ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
327           }
328           else if(oldmask & EVENTER_WRITE)
329               ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
330  
331           /* Set our mask */
332           e->mask = newmask;
333         }
334         else {
335           eventer_free(e);
336         }
337         release_master_fd(fd, lockstate);
338       }
339     }
340   }
341 }
342
343 struct _eventer_impl eventer_kqueue_impl = {
344   "kqueue",
345   eventer_kqueue_impl_init,
346   eventer_kqueue_impl_propset,
347   eventer_kqueue_impl_add,
348   eventer_kqueue_impl_remove,
349   eventer_kqueue_impl_update,
350   eventer_kqueue_impl_remove_fd,
351   eventer_kqueue_impl_loop
352 };
Note: See TracBrowser for help on using the browser.