root/src/eventer/eventer_kqueue_impl.c

Revision 31ecb27a3a88da161238c56128d25ddc8e264595, 11.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

trap sig pipe

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/event.h>
16 #include <pthread.h>
17 #include <assert.h>
18
19 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
20 static int maxfds;
21 static struct {
22   eventer_t e;
23   pthread_t executor;
24   noit_spinlock_t lock;
25 } *master_fds = NULL;
26 static int *masks;
27
28 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
29
30 static ev_lock_state_t
31 acquire_master_fd(int fd) {
32   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
33     master_fds[fd].executor = pthread_self();
34     return EV_OWNED;
35   }
36   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
37     return EV_ALREADY_OWNED;
38   }
39   noit_spinlock_lock(&master_fds[fd].lock);
40   master_fds[fd].executor = pthread_self();
41   return EV_OWNED;
42 }
43 static void
44 release_master_fd(int fd, ev_lock_state_t as) {
45   if(as == EV_OWNED) {
46     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
47     noit_spinlock_unlock(&master_fds[fd].lock);
48   }
49 }
50
51 static pthread_t master_thread;
52 static int kqueue_fd = -1;
53 typedef struct kqueue_setup {
54   struct kevent *__ke_vec;
55   unsigned int __ke_vec_a;
56   unsigned int __ke_vec_used;
57 } *kqs_t;
58
59 static pthread_mutex_t kqs_lock;
60 static pthread_mutex_t te_lock;
61 static kqs_t master_kqs = NULL;
62 static pthread_key_t kqueue_setup_key;
63 static noit_skiplist *timed_events = NULL;
64 #define KQUEUE_DECL kqs_t kqs
65 #define KQUEUE_SETUP kqs = (kqs_t) pthread_getspecific(kqueue_setup_key)
66 #define ke_vec kqs->__ke_vec
67 #define ke_vec_a kqs->__ke_vec_a
68 #define ke_vec_used kqs->__ke_vec_used
69
70 static void kqs_init(kqs_t kqs) {
71   enum { initial_alloc = 64 };
72   ke_vec_a = initial_alloc;
73   ke_vec = (struct kevent *) malloc(ke_vec_a * sizeof (struct kevent));
74 }
75 static void
76 ke_change (register int const ident,
77            register int const filter,
78            register int const flags,
79            register void *const udata) {
80   register struct kevent *kep;
81   KQUEUE_DECL;
82   KQUEUE_SETUP;
83   if(!kqs) kqs = master_kqs;
84
85   if(kqs == master_kqs) pthread_mutex_lock(&kqs_lock);
86   if (!ke_vec_a) {
87     kqs_init(kqs);
88   }
89   else if (ke_vec_used == ke_vec_a) {
90     ke_vec_a <<= 1;
91     ke_vec = (struct kevent *) realloc(ke_vec,
92                                        ke_vec_a * sizeof (struct kevent));
93   }
94   kep = &ke_vec[ke_vec_used++];
95
96   EV_SET(kep, ident, filter, flags, 0, 0, udata);
97   if(kqs == master_kqs) pthread_mutex_unlock(&kqs_lock);
98 }
99
100 static int eventer_kqueue_impl_init() {
101   struct rlimit rlim;
102   master_thread = pthread_self();
103   signal(SIGPIPE, SIG_IGN);
104   kqueue_fd = kqueue();
105   if(kqueue_fd == -1) {
106     return -1;
107   }
108   pthread_mutex_init(&kqs_lock, NULL);
109   pthread_mutex_init(&te_lock, NULL);
110   pthread_key_create(&kqueue_setup_key, NULL);
111   master_kqs = calloc(1, sizeof(*master_kqs));
112   kqs_init(master_kqs);
113   getrlimit(RLIMIT_NOFILE, &rlim);
114   maxfds = rlim.rlim_cur;
115   master_fds = calloc(maxfds, sizeof(*master_fds));
116   masks = calloc(maxfds, sizeof(*masks));
117   timed_events = calloc(1, sizeof(*timed_events));
118   noit_skiplist_init(timed_events);
119   noit_skiplist_set_compare(timed_events,
120                             eventer_timecompare, eventer_timecompare);
121   noit_skiplist_add_index(timed_events,
122                           noit_compare_voidptr, noit_compare_voidptr);
123   return 0;
124 }
125 static int eventer_kqueue_impl_propset(const char *key, const char *value) {
126   return -1;
127 }
128 static void eventer_kqueue_impl_add(eventer_t e) {
129   assert(e->mask);
130   ev_lock_state_t lockstate;
131   /* Timed events are simple */
132   if(e->mask == EVENTER_TIMER) {
133     pthread_mutex_lock(&te_lock);
134     noit_skiplist_insert(timed_events, e);
135     pthread_mutex_unlock(&te_lock);
136     return;
137   }
138
139   /* file descriptor event */
140   lockstate = acquire_master_fd(e->fd);
141   master_fds[e->fd].e = e;
142   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
143     ke_change(e->fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
144   if(e->mask & (EVENTER_WRITE))
145     ke_change(e->fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
146   release_master_fd(e->fd, lockstate);
147 }
148 static void eventer_kqueue_impl_remove(eventer_t e) {
149   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
150     ev_lock_state_t lockstate;
151     lockstate = acquire_master_fd(e->fd);
152     if(e == master_fds[e->fd].e) {
153       master_fds[e->fd].e = NULL;
154       if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
155         ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
156       if(e->mask & (EVENTER_WRITE))
157         ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
158     }
159     release_master_fd(e->fd, lockstate);
160   }
161   else if(e->mask & EVENTER_TIMER) {
162     pthread_mutex_lock(&te_lock);
163     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
164     pthread_mutex_unlock(&te_lock);
165   }
166   else {
167     abort();
168   }
169 }
170 static void eventer_kqueue_impl_update(eventer_t e) {
171   if(e->mask & EVENTER_TIMER) {
172     pthread_mutex_lock(&te_lock);
173     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
174     noit_skiplist_insert(timed_events, e);
175     pthread_mutex_unlock(&te_lock);
176     return;
177   }
178   ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
179   ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
180   if(e->mask & (EVENTER_READ | EVENTER_EXCEPTION))
181     ke_change(e->fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
182   if(e->mask & (EVENTER_WRITE))
183     ke_change(e->fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
184 }
185 static eventer_t eventer_kqueue_impl_remove_fd(int fd) {
186   eventer_t eiq = NULL;
187   ev_lock_state_t lockstate;
188   if(master_fds[fd].e) {
189     lockstate = acquire_master_fd(fd);
190     eiq = master_fds[fd].e;
191     master_fds[fd].e = NULL;
192     if(eiq->mask & (EVENTER_READ | EVENTER_EXCEPTION))
193       ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, eiq);
194     if(eiq->mask & (EVENTER_WRITE))
195       ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, eiq);
196     release_master_fd(fd, lockstate);
197   }
198   return eiq;
199 }
200 static eventer_t eventer_kqueue_impl_find_fd(int fd) {
201   return master_fds[fd].e;
202 }
203 static int eventer_kqueue_impl_loop() {
204   int is_master_thread = 0;
205   pthread_t self;
206   KQUEUE_DECL;
207   KQUEUE_SETUP;
208
209   self = pthread_self();
210   if(pthread_equal(self, master_thread)) is_master_thread = 1;
211
212   if(!kqs) {
213     kqs = calloc(1, sizeof(*kqs));
214     kqs_init(kqs);
215   }
216   pthread_setspecific(kqueue_setup_key, kqs);
217   while(1) {
218     struct timeval __now, __sleeptime;
219     struct timespec __kqueue_sleeptime;
220     int fd_cnt = 0;
221     int max_timed_events_to_process;
222     int newmask;
223
224     __sleeptime = __max_sleeptime;
225
226     /* Handle timed events...
227      * we could be multithreaded, so if we pop forever we could starve
228      * ourselves. */
229     max_timed_events_to_process = timed_events->size;
230     while(max_timed_events_to_process-- > 0) {
231       eventer_t timed_event;
232
233       gettimeofday(&__now, NULL);
234
235       pthread_mutex_lock(&te_lock);
236       /* Peek at our next timed event, if should fire, pop it.
237        * otherwise we noop and NULL it out to break the loop. */
238       timed_event = noit_skiplist_peek(timed_events);
239       if(timed_event) {
240         if(compare_timeval(timed_event->whence, __now) < 0) {
241           timed_event = noit_skiplist_pop(timed_events, NULL);
242         }
243         else {
244           sub_timeval(timed_event->whence, __now, &__sleeptime);
245           timed_event = NULL;
246         }
247       }
248       pthread_mutex_unlock(&te_lock);
249       if(timed_event == NULL) break;
250
251       /* Make our call */
252       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
253                                       timed_event->closure, &__now);
254       if(newmask)
255         eventer_add(timed_event);
256       else
257         eventer_free(timed_event);
258     }
259
260     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
261       /* we exceed our configured maximum, set it down */
262       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
263     }
264
265     /* If we're the master, we need to lock the master_kqs and make mods */
266     if(master_kqs->__ke_vec_used) {
267       struct timespec __zerotime = { 0, 0 };
268       pthread_mutex_lock(&kqs_lock);
269       fd_cnt = kevent(kqueue_fd,
270                       master_kqs->__ke_vec, master_kqs->__ke_vec_used,
271                       NULL, 0,
272                       &__zerotime);
273       noitLT(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, master_kqs->__ke_vec_used, fd_cnt);
274       if(fd_cnt < 0) {
275         noitLT(noit_error, &__now, "kevent: %s\n", strerror(errno));
276       }
277       master_kqs->__ke_vec_used = 0;
278       pthread_mutex_unlock(&kqs_lock);
279     }
280
281     /* Now we move on to our fd-based events */
282     __kqueue_sleeptime.tv_sec = __sleeptime.tv_sec;
283     __kqueue_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
284     fd_cnt = kevent(kqueue_fd, ke_vec, ke_vec_used,
285                     ke_vec, ke_vec_a,
286                     &__kqueue_sleeptime);
287     noitLT(noit_debug, &__now, "debug: kevent(%d, [], %d) => %d\n", kqueue_fd, ke_vec_used, fd_cnt);
288     ke_vec_used = 0;
289     if(fd_cnt < 0) {
290       noitLT(noit_error, &__now, "kevent: %s\n", strerror(errno));
291     }
292     else {
293       int idx;
294       /* loop once to clear */
295       for(idx = 0; idx < fd_cnt; idx++) {
296         struct kevent *ke;
297         ke = &ke_vec[idx];
298         if(ke->flags & EV_ERROR) continue;
299         masks[ke->ident] = 0;
300       }
301       /* Loop again to aggregate */
302       for(idx = 0; idx < fd_cnt; idx++) {
303         struct kevent *ke;
304         ke = &ke_vec[idx];
305         if(ke->flags & EV_ERROR) continue;
306         if(ke->filter == EVFILT_READ) masks[ke->ident] |= EVENTER_READ;
307         if(ke->filter == EVFILT_WRITE) masks[ke->ident] |= EVENTER_WRITE;
308       }
309       /* Loop a last time to process */
310       for(idx = 0; idx < fd_cnt; idx++) {
311         const char *cbname;
312         ev_lock_state_t lockstate;
313         struct kevent *ke;
314         eventer_t e;
315         int fd, oldmask;
316
317         ke = &ke_vec[idx];
318         if(ke->flags & EV_ERROR) {
319           if(ke->data != EBADF)
320             noitLT(noit_error, &__now, "error: %s\n", strerror(ke->data));
321           continue;
322         }
323         e = (eventer_t)ke->udata;
324         fd = ke->ident;
325         if(!masks[fd]) continue;
326         assert(e == master_fds[fd].e);
327         lockstate = acquire_master_fd(fd);
328         assert(lockstate == EV_OWNED);
329
330         gettimeofday(&__now, NULL);
331         oldmask = e->mask;
332         cbname = eventer_name_for_callback(e->callback);
333         noitLT(noit_debug, &__now, "kqueue: fire on %d/%x to %s(%p)\n",
334                fd, masks[fd], cbname?cbname:"???", e->callback);
335         newmask = e->callback(e, masks[fd], e->closure, &__now);
336         masks[fd] = 0; /* indicates we've processed this fd */
337
338         if(newmask) {
339           /* toggle the read bits if needed */
340           if(newmask & (EVENTER_READ | EVENTER_EXCEPTION)) {
341             if(!(oldmask & (EVENTER_READ | EVENTER_EXCEPTION)))
342               ke_change(fd, EVFILT_READ, EV_ADD | EV_ENABLE, e);
343           }
344           else if(oldmask & (EVENTER_READ | EVENTER_EXCEPTION))
345             ke_change(fd, EVFILT_READ, EV_DELETE | EV_DISABLE, e);
346  
347           /* toggle the write bits if needed */
348           if(newmask & EVENTER_WRITE) {
349             if(!(oldmask & EVENTER_WRITE))
350               ke_change(fd, EVFILT_WRITE, EV_ADD | EV_ENABLE, e);
351           }
352           else if(oldmask & EVENTER_WRITE)
353               ke_change(fd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, e);
354  
355           /* Set our mask */
356           e->mask = newmask;
357         }
358         else {
359           eventer_free(e);
360         }
361         release_master_fd(fd, lockstate);
362       }
363     }
364   }
365   /* NOTREACHED */
366   return 0;
367 }
368
369 struct _eventer_impl eventer_kqueue_impl = {
370   "kqueue",
371   eventer_kqueue_impl_init,
372   eventer_kqueue_impl_propset,
373   eventer_kqueue_impl_add,
374   eventer_kqueue_impl_remove,
375   eventer_kqueue_impl_update,
376   eventer_kqueue_impl_remove_fd,
377   eventer_kqueue_impl_find_fd,
378   eventer_kqueue_impl_loop
379 };
Note: See TracBrowser for help on using the browser.