root/src/eventer/eventer_epoll_impl.c

Revision 88a71780101cbf23034aa0cb840f9f0368fda2dd, 11.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fixes #126

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_atomic.h"
36 #include "utils/noit_skiplist.h"
37 #include "utils/noit_log.h"
38
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <sys/epoll.h>
43 #include <signal.h>
44 #include <pthread.h>
45 #include <assert.h>
46
47 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
48 static int maxfds;
49 static struct {
50   eventer_t e;
51   pthread_t executor;
52   noit_spinlock_t lock;
53 } *master_fds = NULL;
54 static int *masks;
55
56 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
57
58 static ev_lock_state_t
59 acquire_master_fd(int fd) {
60   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
61     master_fds[fd].executor = pthread_self();
62     return EV_OWNED;
63   }
64   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
65     return EV_ALREADY_OWNED;
66   }
67   noit_spinlock_lock(&master_fds[fd].lock);
68   master_fds[fd].executor = pthread_self();
69   return EV_OWNED;
70 }
71 static void
72 release_master_fd(int fd, ev_lock_state_t as) {
73   if(as == EV_OWNED) {
74     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
75     noit_spinlock_unlock(&master_fds[fd].lock);
76   }
77 }
78
79 static pthread_t master_thread;
80 static int epoll_fd = -1;
81 static pthread_mutex_t te_lock;
82 static noit_skiplist *timed_events = NULL;
83
84 static int eventer_epoll_impl_init() {
85   struct rlimit rlim;
86   int rv;
87
88   /* super init */
89   if((rv = eventer_impl_init()) != 0) return rv;
90
91   master_thread = pthread_self();
92   signal(SIGPIPE, SIG_IGN);
93   epoll_fd = epoll_create(1024);
94   if(epoll_fd == -1) {
95     return -1;
96   }
97   pthread_mutex_init(&te_lock, NULL);
98   getrlimit(RLIMIT_NOFILE, &rlim);
99   maxfds = rlim.rlim_cur;
100   master_fds = calloc(maxfds, sizeof(*master_fds));
101   masks = calloc(maxfds, sizeof(*masks));
102   timed_events = calloc(1, sizeof(*timed_events));
103   noit_skiplist_init(timed_events);
104   noit_skiplist_set_compare(timed_events,
105                             eventer_timecompare, eventer_timecompare);
106   noit_skiplist_add_index(timed_events,
107                           noit_compare_voidptr, noit_compare_voidptr);
108   return 0;
109 }
110 static int eventer_epoll_impl_propset(const char *key, const char *value) {
111   if(eventer_impl_propset(key, value)) {
112     /* Do our epoll local properties here */
113     return -1;
114   }
115   return 0;
116 }
117 static void eventer_epoll_impl_add(eventer_t e) {
118   struct epoll_event _ev;
119   ev_lock_state_t lockstate;
120   assert(e->mask);
121
122   if(e->mask & EVENTER_ASYNCH) {
123     eventer_add_asynch(NULL, e);
124     return;
125   }
126
127   /* Recurrent delegation */
128   if(e->mask & EVENTER_RECURRENT) {
129     eventer_add_recurrent(e);
130     return;
131   }
132
133   /* Timed events are simple */
134   if(e->mask & EVENTER_TIMER) {
135     pthread_mutex_lock(&te_lock);
136     noit_skiplist_insert(timed_events, e);
137     pthread_mutex_unlock(&te_lock);
138     return;
139   }
140
141   /* file descriptor event */
142   memset(&_ev, 0, sizeof(_ev));
143   _ev.data.fd = e->fd;
144   if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
145   if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
146   if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
147
148   lockstate = acquire_master_fd(e->fd);
149   master_fds[e->fd].e = e;
150
151   epoll_ctl(epoll_fd, EPOLL_CTL_ADD, e->fd, &_ev);
152
153   release_master_fd(e->fd, lockstate);
154 }
155 static eventer_t eventer_epoll_impl_remove(eventer_t e) {
156   eventer_t removed = NULL;
157   if(e->mask & EVENTER_ASYNCH) {
158     abort();
159   }
160   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
161     ev_lock_state_t lockstate;
162     struct epoll_event _ev;
163     memset(&_ev, 0, sizeof(_ev));
164     _ev.data.fd = e->fd;
165     lockstate = acquire_master_fd(e->fd);
166     if(e == master_fds[e->fd].e) {
167       removed = e;
168       master_fds[e->fd].e = NULL;
169       epoll_ctl(epoll_fd, EPOLL_CTL_DEL, e->fd, &_ev);
170     }
171     release_master_fd(e->fd, lockstate);
172   }
173   else if(e->mask & EVENTER_TIMER) {
174     pthread_mutex_lock(&te_lock);
175     if(noit_skiplist_remove_compare(timed_events, e, NULL,
176                                     noit_compare_voidptr))
177       removed = e;
178     pthread_mutex_unlock(&te_lock);
179   }
180   else if(e->mask & EVENTER_RECURRENT) {
181     removed = eventer_remove_recurrent(e);
182   }
183   else {
184     abort();
185   }
186   return removed;
187 }
188 static void eventer_epoll_impl_update(eventer_t e, int mask) {
189   struct epoll_event _ev;
190   if(e->mask & EVENTER_TIMER) {
191     assert(e->mask & EVENTER_TIMER);
192     pthread_mutex_lock(&te_lock);
193     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
194     noit_skiplist_insert(timed_events, e);
195     pthread_mutex_unlock(&te_lock);
196     return;
197   }
198   memset(&_ev, 0, sizeof(_ev));
199   _ev.data.fd = e->fd;
200   e->mask = mask;
201   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
202     if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
203     if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
204     if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
205     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, e->fd, &_ev);
206   }
207 }
208 static eventer_t eventer_epoll_impl_remove_fd(int fd) {
209   eventer_t eiq = NULL;
210   ev_lock_state_t lockstate;
211   if(master_fds[fd].e) {
212     struct epoll_event _ev;
213     memset(&_ev, 0, sizeof(_ev));
214     _ev.data.fd = fd;
215     lockstate = acquire_master_fd(fd);
216     eiq = master_fds[fd].e;
217     master_fds[fd].e = NULL;
218     epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &_ev);
219     release_master_fd(fd, lockstate);
220   }
221   return eiq;
222 }
223 static eventer_t eventer_epoll_impl_find_fd(int fd) {
224   return master_fds[fd].e;
225 }
226
227 static void eventer_epoll_impl_trigger(eventer_t e, int mask) {
228   struct timeval __now;
229   int fd, oldmask, newmask;
230   const char *cbname;
231   ev_lock_state_t lockstate;
232
233   fd = e->fd;
234   if(e != master_fds[fd].e) return;
235   lockstate = acquire_master_fd(fd);
236   if(lockstate == EV_ALREADY_OWNED) return;
237   assert(lockstate == EV_OWNED);
238
239   gettimeofday(&__now, NULL);
240   oldmask = e->mask;
241   cbname = eventer_name_for_callback(e->callback);
242   noitLT(eventer_deb, &__now, "epoll: fire on %d/%x to %s(%p)\n",
243          fd, mask, cbname?cbname:"???", e->callback);
244   newmask = e->callback(e, mask, e->closure, &__now);
245
246   if(newmask) {
247     struct epoll_event _ev;
248     memset(&_ev, 0, sizeof(_ev));
249     _ev.data.fd = fd;
250     if(newmask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
251     if(newmask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
252     if(newmask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
253     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &_ev);
254     /* Set our mask */
255     e->mask = newmask;
256   }
257   else {
258     /* see kqueue implementation for details on the next line */
259     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
260     eventer_free(e);
261   }
262   release_master_fd(fd, lockstate);
263 }
264 static int eventer_epoll_impl_loop() {
265   int is_master_thread = 0;
266   struct epoll_event *epev;
267   pthread_t self;
268
269   self = pthread_self();
270   if(pthread_equal(self, master_thread)) is_master_thread = 1;
271
272   epev = malloc(sizeof(*epev) * maxfds);
273
274   while(1) {
275     struct timeval __now, __sleeptime;
276     int fd_cnt = 0;
277     int max_timed_events_to_process;
278     int newmask;
279
280     __sleeptime = __max_sleeptime;
281
282     /* Handle timed events...
283      * we could be multithreaded, so if we pop forever we could starve
284      * ourselves. */
285     max_timed_events_to_process = timed_events->size;
286     while(max_timed_events_to_process-- > 0) {
287       eventer_t timed_event;
288
289       gettimeofday(&__now, NULL);
290
291       pthread_mutex_lock(&te_lock);
292       /* Peek at our next timed event, if should fire, pop it.
293        * otherwise we noop and NULL it out to break the loop. */
294       timed_event = noit_skiplist_peek(timed_events);
295       if(timed_event) {
296         if(compare_timeval(timed_event->whence, __now) < 0) {
297           timed_event = noit_skiplist_pop(timed_events, NULL);
298         }
299         else {
300           sub_timeval(timed_event->whence, __now, &__sleeptime);
301           timed_event = NULL;
302         }
303       }
304       pthread_mutex_unlock(&te_lock);
305       if(timed_event == NULL) break;
306
307       /* Make our call */
308       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
309                                       timed_event->closure, &__now);
310       if(newmask)
311         eventer_add(timed_event);
312       else
313         eventer_free(timed_event);
314     }
315
316     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
317       /* we exceed our configured maximum, set it down */
318       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
319     }
320
321     /* Handle recurrent events */
322     eventer_dispatch_recurrent(&__now);
323
324     /* Now we move on to our fd-based events */
325     fd_cnt = epoll_wait(epoll_fd, epev, maxfds,
326                         __sleeptime.tv_sec * 1000 + __sleeptime.tv_usec / 1000);
327     noitLT(eventer_deb, &__now, "debug: epoll_wait(%d, [], %d) => %d\n", epoll_fd, maxfds, fd_cnt);
328     if(fd_cnt < 0) {
329       noitLT(eventer_err, &__now, "epoll_wait: %s\n", strerror(errno));
330     }
331     else {
332       int idx;
333       /* loop once to clear */
334       for(idx = 0; idx < fd_cnt; idx++) {
335         struct epoll_event *ev;
336         eventer_t e;
337         int fd, mask = 0;
338
339         ev = &epev[idx];
340
341         if(ev->events & (EPOLLIN | EPOLLPRI)) mask |= EVENTER_READ;
342         if(ev->events & (EPOLLOUT)) mask |= EVENTER_WRITE;
343         if(ev->events & (EPOLLERR|EPOLLHUP)) mask |= EVENTER_EXCEPTION;
344
345         fd = ev->data.fd;
346
347         e = master_fds[fd].e;
348         /* It's possible that someone removed the event and freed it
349          * before we got here.
350          */
351         if(!e) continue;
352
353         eventer_epoll_impl_trigger(e, mask);
354       }
355     }
356   }
357   /* NOTREACHED */
358   return 0;
359 }
360
361 struct _eventer_impl eventer_epoll_impl = {
362   "epoll",
363   eventer_epoll_impl_init,
364   eventer_epoll_impl_propset,
365   eventer_epoll_impl_add,
366   eventer_epoll_impl_remove,
367   eventer_epoll_impl_update,
368   eventer_epoll_impl_remove_fd,
369   eventer_epoll_impl_find_fd,
370   eventer_epoll_impl_trigger,
371   eventer_epoll_impl_loop
372 };
Note: See TracBrowser for help on using the browser.