root/src/eventer/eventer_kqueue_impl.c

Revision e3c8f105af17e80c10e566366bb11a9f2a510c20, 11.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 years ago)

oh my... http

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/event.h>
16 #include <pthread.h>
17 #include <assert.h>
18
19 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
20 static int maxfds;
21 static struct {
22   eventer_t e;
23   pthread_t executor;
24   noit_spinlock_t lock;
25 } *master_fds = NULL;
26 static int *masks;
27
28 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
29
30 static ev_lock_state_t
31 acquire_master_fd(int fd) {
32   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
33     master_fds[fd].executor = pthread_self();
34     return EV_OWNED;
35   }
36   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
37     return EV_ALREADY_OWNED;
38   }
39   noit_spinlock_lock(&master_fds[fd].lock);
40   master_fds[fd].executor = pthread_self();
41   return EV_OWNED;
42 }
43 static void
44 release_master_fd(int fd, ev_lock_state_t as) {
45   if(as == EV_OWNED) {
46     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
47     noit_spinlock_unlock(&master_fds[fd].lock);
48   }
49 }
50
51 static pthread_t master_thread;
52 static int kqueue_fd = -1;
53 typedef struct kqueue_setup {
54   struct kevent *__ke_vec;
55   unsigned int __ke_vec_a;
56   unsigned int __ke_vec_used;
57 } *kqs_t;
58
59 static pthread_mutex_t kqs_lock;
60 static pthread_mutex_t te_lock;
61 static kqs_t master_kqs = NULL;
62 static pthread_key_t kqueue_setup_key;
63 static noit_skiplist *timed_events = NULL;
64 #define KQUEUE_DECL kqs_t kqs
65 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
66 #define ke_vec kqs->__ke_vec
67 #define ke_vec_a kqs->__ke_vec_a
68 #define ke_vec_used kqs->__ke_vec_used
69
70 static void kqs_init(kqs_t kqs) {
71   enum { initial_alloc = 64 };
72   ke_vec_a = initial_alloc;
73   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
74 }
75 static void
76 ke_change (register int const ident,
77            register int const filter,
78            register int const flags,
79            register void *const udata) {
80   register struct kevent *kep;
81   KQUEUE_DECL;
82   KQUEUE_SETUP;
83   if(!kqs) kqs = master_kqs;
84
85   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
86   if (!ke_vec_a) {
87     kqs_init(kqs);
88   }
89   else if (ke_vec_used == ke_vec_a) {
90     ke_vec_a <<= 1;
91     ke_vec = (struct kevent *) realloc(ke_vec,
92                                        ke_vec_a * sizeof (struct kevent));
93   }
94   kep = &ke_vec[ke_vec_used++];
95
96   EV_SET(kep, ident, filter, flags, 0, 0, udata);
97   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
98 }
99
100 static int eventer_kqueue_impl_init() {
101   struct rlimit rlim;
102   master_thread = pthread_self();
103   kqueue_fd = kqueue();
104   if(kqueue_fd == -1) {
105     return -1;
106   }
107   pthread_mutex_init(&kqs_lock, NULL);
108   pthread_mutex_init(&te_lock, NULL);
109   pthread_key_create(&kqueue_setup_key, NULL);
110   master_kqs = calloc(1, sizeof(*master_kqs));
111   kqs_init(master_kqs);
112   getrlimit(RLIMIT_NOFILE, &rlim);
113   maxfds = rlim.rlim_cur;
114   master_fds = calloc(maxfds, sizeof(*master_fds));
115   masks = calloc(maxfds, sizeof(*masks));
116   timed_events = calloc(1, sizeof(*timed_events));
117   noit_skiplist_init(timed_events);
118   noit_skiplist_set_compare(timed_events,
119                             eventer_timecompare, eventer_timecompare);
120   noit_skiplist_add_index(timed_events,
121                           noit_compare_voidptr, noit_compare_voidptr);
122   return 0;
123 }
124 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
125   return -1;
126 }
127 static void eventer_kqueue_impl_add(eventer_t e) {
128   assert(e->mask);
129   ev_lock_state_t lockstate;
130   /* Timed events are simple */
131   if(e->mask == EVENTER_TIMER) {
132     pthread_mutex_lock(&te_lock);
133     noit_skiplist_insert(timed_events, e);
134     pthread_mutex_unlock(&te_lock);
135     return;
136   }
137
138   /* file descriptor event */
139   lockstate = acquire_master_fd(e->fd);
140   master_fds[e->fd].e = e;
141   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
142     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
143   if(e->mask & (EVENTER_WRITE))
144     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
145   release_master_fd(e->fd, lockstate);
146 }
147 static void eventer_kqueue_impl_remove(eventer_t e) {
148   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
149     ev_lock_state_t lockstate;
150     lockstate = acquire_master_fd(e->fd);
151     if(e == master_fds[e->fd].e) {
152       master_fds[e->fd].e = NULL;
153       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
154         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
155       if(e->mask & (EVENTER_WRITE))
156         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
157     }
158     release_master_fd(e->fd, lockstate);
159   }
160   else if(e->mask & EVENTER_TIMER) {
161     pthread_mutex_lock(&te_lock);
162     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
163     pthread_mutex_unlock(&te_lock);
164   }
165   else {
166     abort();
167   }
168 }
169 static void eventer_kqueue_impl_update(eventer_t e) {
170   if(e->mask & EVENTER_TIMER) {
171     pthread_mutex_lock(&te_lock);
172     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
173     noit_skiplist_insert(timed_events, e);
174     pthread_mutex_unlock(&te_lock);
175     return;
176   }
177   ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
178   ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
179   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
180     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
181   if(e->mask & (EVENTER_WRITE))
182     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
183 }
184 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
185   eventer_t eiq = NULL;
186   ev_lock_state_t lockstate;
187   if(master_fds[fd].e) {
188     lockstate = acquire_master_fd(fd);
189     eiq = master_fds[fd].e;
190     master_fds[fd].e = NULL;
191     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
192       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
193     if(eiq->mask & (EVENTER_WRITE))
194       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
195     release_master_fd(fd, lockstate);
196   }
197   return eiq;
198 }
199 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
200   return master_fds[fd].e;
201 }
202 static void eventer_kqueue_impl_loop() {
203   int is_master_thread = 0;
204   pthread_t self;
205   KQUEUE_DECL;
206   KQUEUE_SETUP;
207
208   self = pthread_self();
209   if(pthread_equal(self, master_thread)) is_master_thread = 1;
210
211   if(!kqs) {
212     kqs = calloc(1, sizeof(*kqs));
213     kqs_init(kqs);
214   }
215   pthread_setspecific(kqueue_setup_key, kqs);
216   while(1) {
217     struct timeval __now, __sleeptime;
218     struct timespec __kqueue_sleeptime;
219     int fd_cnt = 0;
220     int max_timed_events_to_process;
221     int newmask;
222
223     __sleeptime = __max_sleeptime;
224
225     /* Handle timed events...
226      * we could be multithreaded, so if we pop forever we could starve
227      * ourselves. */
228     max_timed_events_to_process = timed_events->size;
229     while(max_timed_events_to_process-- > 0) {
230       eventer_t timed_event;
231
232       gettimeofday(&__now, NULL);
233
234       pthread_mutex_lock(&te_lock);
235       /* Peek at our next timed event, if should fire, pop it.
236        * otherwise we noop and NULL it out to break the loop. */
237       timed_event = noit_skiplist_peek(timed_events);
238       if(timed_event) {
239         if(compare_timeval(timed_event->whence, __now) < 0) {
240           timed_event = noit_skiplist_pop(timed_events, NULL);
241         }
242         else {
243           sub_timeval(timed_event->whence, __now, &__sleeptime);
244           timed_event = NULL;
245         }
246       }
247       pthread_mutex_unlock(&te_lock);
248       if(timed_event == NULL) break;
249
250       /* Make our call */
251       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
252                                       timed_event->closure, &__now);
253       if(newmask)
254         eventer_add(timed_event);
255       else
256         eventer_free(timed_event);
257     }
258
259     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
260       /* we exceed our configured maximum, set it down */
261       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
262     }
263
264     /* If we're the master, we need to lock the master_kqs and make mods */
265     if(master_kqs->__ke_vec_used) {
266       struct timespec __zerotime = { 0, 0 };
267       pthread_mutex_lock(&kqs_lock);
268       fd_cnt = kevent(kqueue_fd,
269                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
270                       NULL, 0,
271                       &__zerotime);
272       noit_log(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
273       if(fd_cnt < 0) {
274         noit_log(noit_error, &__now, "kevent: %s\n", strerror(errno));
275       }
276       master_kqs->__ke_vec_used = 0;
277       pthread_mutex_unlock(&kqs_lock);
278     }
279
280     /* Now we move on to our fd-based events */
281     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
282     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
283     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
284                     ke_vec, ke_vec_a,
285                     &__kqueue_sleeptime);
286     noit_log(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
287     ke_vec_used = 0;
288     if(fd_cnt < 0) {
289       noit_log(noit_error, &__now, "kevent: %s\n", strerror(errno));
290     }
291     else {
292       int idx;
293       /* loop once to clear */
294       for(idx = 0; idx < fd_cnt; idx++) {
295         struct kevent *ke;
296         ke = &ke_vec[idx];
297         if(ke->flags & EV_ERROR) continue;
298         masks[ke->ident] = 0;
299       }
300       /* Loop again to aggregate */
301       for(idx = 0; idx < fd_cnt; idx++) {
302         struct kevent *ke;
303         ke = &ke_vec[idx];
304         if(ke->flags & EV_ERROR) continue;
305         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
306         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
307       }
308       /* Loop a last time to process */
309       for(idx = 0; idx < fd_cnt; idx++) {
310         const char *cbname;
311         ev_lock_state_t lockstate;
312         struct kevent *ke;
313         eventer_t e;
314         int fd, oldmask;
315
316         ke = &ke_vec[idx];
317         if(ke->flags & EV_ERROR) {
318           if(ke->data != EBADF)
319             noit_log(noit_error, &__now, "error: %s\n", strerror(ke->data));
320           continue;
321         }
322         e = (eventer_t)ke->udata;
323         fd = ke->ident;
324         if(!masks[fd]) continue;
325         assert(e == master_fds[fd].e);
326         lockstate = acquire_master_fd(fd);
327         assert(lockstate == EV_OWNED);
328
329         gettimeofday(&__now, NULL);
330         oldmask = e->mask;
331         cbname = eventer_name_for_callback(e->callback);
332         noit_log(noit_debug, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
333                  fd, masks[fd], cbname?cbname:"???", e->callback);
334         newmask = e->callback(e, masks[fd], e->closure, &__now);
335         masks[fd] = 0; /* indicates we've processed this fd */
336
337         if(newmask) {
338           /* toggle the read bits if needed */
339           if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
340             if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
341               ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
342           }
343           else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
344             ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
345  
346           /* toggle the write bits if needed */
347           if(newmask & EVENTER_WRITE) {
348             if(!(oldmask & EVENTER_WRITE))
349               ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
350           }
351           else if(oldmask & EVENTER_WRITE)
352               ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
353  
354           /* Set our mask */
355           e->mask = newmask;
356         }
357         else {
358           eventer_free(e);
359         }
360         release_master_fd(fd, lockstate);
361       }
362     }
363   }
364 }
365
366 struct _eventer_impl eventer_kqueue_impl = {
367   "kqueue",
368   eventer_kqueue_impl_init,
369   eventer_kqueue_impl_propset,
370   eventer_kqueue_impl_add,
371   eventer_kqueue_impl_remove,
372   eventer_kqueue_impl_update,
373   eventer_kqueue_impl_remove_fd,
374   eventer_kqueue_impl_find_fd,
375   eventer_kqueue_impl_loop
376 };
Note: See TracBrowser for help on using the browser.