root/src/eventer/eventer_epoll_impl.c

Revision a3118173f47a3680d2178910858e8c703879a53e, 11.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 days ago)

Move the rlim setting before eventer_impl_init so that maxfds
is set and we can allocate our arrays. This is important because
we spin up loops for concurrent eventer threads *in* impl_init
and maxfds will be 0 for a while. This was causing spurious
'epoll_wait: Invalid argument' on Linux.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "noit_defines.h"
35 #include "eventer/eventer.h"
36 #include "utils/noit_atomic.h"
37 #include "utils/noit_skiplist.h"
38 #include "utils/noit_memory.h"
39 #include "utils/noit_log.h"
40 #include "dtrace_probes.h"
41
42 #include <errno.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <sys/epoll.h>
46 #include <signal.h>
47 #include <pthread.h>
48 #include <assert.h>
49
50 struct _eventer_impl eventer_epoll_impl;
51 #define LOCAL_EVENTER eventer_epoll_impl
52 #define LOCAL_EVENTER_foreach_fdevent eventer_epoll_impl_foreach_fdevent
53 #define maxfds LOCAL_EVENTER.maxfds
54 #define master_fds LOCAL_EVENTER.master_fds
55
56 #include "eventer/eventer_impl_private.h"
57
58 static int *masks;
59 struct epoll_spec {
60   int epoll_fd;
61 };
62
63 static void *eventer_epoll_spec_alloc() {
64   struct epoll_spec *spec;
65   spec = calloc(1, sizeof(*spec));
66   spec->epoll_fd = epoll_create(1024);
67   if(spec->epoll_fd < 0) abort();
68   return spec;
69 }
70
71 static int eventer_epoll_impl_init() {
72   int rv;
73
74   maxfds = eventer_impl_setrlimit();
75   master_fds = calloc(maxfds, sizeof(*master_fds));
76   masks = calloc(maxfds, sizeof(*masks));
77
78   /* super init */
79   if((rv = eventer_impl_init()) != 0) return rv;
80
81   signal(SIGPIPE, SIG_IGN);
82   return 0;
83 }
84 static int eventer_epoll_impl_propset(const char *key, const char *value) {
85   if(eventer_impl_propset(key, value)) {
86     /* Do our epoll local properties here */
87     return -1;
88   }
89   return 0;
90 }
91 static void eventer_epoll_impl_add(eventer_t e) {
92   int rv;
93   struct epoll_spec *spec;
94   struct epoll_event _ev;
95   ev_lock_state_t lockstate;
96   assert(e->mask);
97
98   if(e->mask & EVENTER_ASYNCH) {
99     eventer_add_asynch(NULL, e);
100     return;
101   }
102
103   /* Recurrent delegation */
104   if(e->mask & EVENTER_RECURRENT) {
105     eventer_add_recurrent(e);
106     return;
107   }
108
109   /* Timed events are simple */
110   if(e->mask & EVENTER_TIMER) {
111     eventer_add_timed(e);
112     return;
113   }
114
115   spec = eventer_get_spec_for_event(e);
116   /* file descriptor event */
117   assert(e->whence.tv_sec == 0 && e->whence.tv_usec == 0);
118   memset(&_ev, 0, sizeof(_ev));
119   _ev.data.fd = e->fd;
120   if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
121   if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
122   if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
123
124   lockstate = acquire_master_fd(e->fd);
125   master_fds[e->fd].e = e;
126
127   rv = epoll_ctl(spec->epoll_fd, EPOLL_CTL_ADD, e->fd, &_ev);
128   if(rv != 0) {
129     noitL(eventer_err, "epoll_ctl(%d,add,%d,%x) -> %d (%d: %s)\n",
130           spec->epoll_fd, e->fd, e->mask, rv, errno, strerror(errno));
131     abort();
132   }
133
134   release_master_fd(e->fd, lockstate);
135 }
136 static eventer_t eventer_epoll_impl_remove(eventer_t e) {
137   struct epoll_spec *spec;
138   eventer_t removed = NULL;
139   if(e->mask & EVENTER_ASYNCH) {
140     abort();
141   }
142   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
143     ev_lock_state_t lockstate;
144     struct epoll_event _ev;
145     spec = eventer_get_spec_for_event(e);
146     memset(&_ev, 0, sizeof(_ev));
147     _ev.data.fd = e->fd;
148     lockstate = acquire_master_fd(e->fd);
149     if(e == master_fds[e->fd].e) {
150       removed = e;
151       master_fds[e->fd].e = NULL;
152       if(epoll_ctl(spec->epoll_fd, EPOLL_CTL_DEL, e->fd, &_ev) != 0) {
153         noitL(noit_error, "epoll_ctl(%d, EPOLL_CTL_DEL, %d) -> %s\n",
154               spec->epoll_fd, e->fd, strerror(errno));
155         if(errno != ENOENT) abort();
156       }
157     }
158     release_master_fd(e->fd, lockstate);
159   }
160   else if(e->mask & EVENTER_TIMER) {
161     removed = eventer_remove_timed(e);
162   }
163   else if(e->mask & EVENTER_RECURRENT) {
164     removed = eventer_remove_recurrent(e);
165   }
166   else {
167     abort();
168   }
169   return removed;
170 }
171 static void eventer_epoll_impl_update(eventer_t e, int mask) {
172   struct epoll_event _ev;
173   if(e->mask & EVENTER_TIMER) {
174     eventer_update_timed(e,mask);
175     return;
176   }
177   memset(&_ev, 0, sizeof(_ev));
178   _ev.data.fd = e->fd;
179   e->mask = mask;
180   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
181     struct epoll_spec *spec;
182     spec = eventer_get_spec_for_event(e);
183     if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
184     if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
185     if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
186     if(epoll_ctl(spec->epoll_fd, EPOLL_CTL_MOD, e->fd, &_ev) != 0) {
187       noitL(noit_error, "epoll_ctl(%d, EPOLL_CTL_MOD, %d) -> %s\n",
188             spec->epoll_fd, e->fd, strerror(errno));
189       abort();
190     }
191   }
192 }
193 static eventer_t eventer_epoll_impl_remove_fd(int fd) {
194   eventer_t eiq = NULL;
195   ev_lock_state_t lockstate;
196   if(master_fds[fd].e) {
197     struct epoll_spec *spec;
198     struct epoll_event _ev;
199     memset(&_ev, 0, sizeof(_ev));
200     _ev.data.fd = fd;
201     lockstate = acquire_master_fd(fd);
202     eiq = master_fds[fd].e;
203     spec = eventer_get_spec_for_event(eiq);
204     master_fds[fd].e = NULL;
205     if(epoll_ctl(spec->epoll_fd, EPOLL_CTL_DEL, fd, &_ev) != 0) {
206       noitL(noit_error, "epoll_ctl(%d, EPOLL_CTL_DEL, %d) -> %s\n",
207             spec->epoll_fd, fd, strerror(errno));
208       if(errno != ENOENT) abort();
209     }
210     release_master_fd(fd, lockstate);
211   }
212   return eiq;
213 }
214 static eventer_t eventer_epoll_impl_find_fd(int fd) {
215   return master_fds[fd].e;
216 }
217
218 static void eventer_epoll_impl_trigger(eventer_t e, int mask) {
219   struct epoll_spec *spec;
220   struct timeval __now;
221   int fd, newmask;
222   const char *cbname;
223   ev_lock_state_t lockstate;
224
225   fd = e->fd;
226   if(e != master_fds[fd].e) return;
227   if(!pthread_equal(pthread_self(), e->thr_owner)) {
228     eventer_cross_thread_trigger(e,mask);
229     return;
230   }
231   lockstate = acquire_master_fd(fd);
232   if(lockstate == EV_ALREADY_OWNED) return;
233   assert(lockstate == EV_OWNED);
234
235   gettimeofday(&__now, NULL);
236   cbname = eventer_name_for_callback_e(e->callback, e);
237   noitLT(eventer_deb, &__now, "epoll: fire on %d/%x to %s(%p)\n",
238          fd, mask, cbname?cbname:"???", e->callback);
239   noit_memory_begin();
240   EVENTER_CALLBACK_ENTRY((void *)e, (void *)e->callback, (char *)cbname, fd, e->mask, mask);
241   newmask = e->callback(e, mask, e->closure, &__now);
242   EVENTER_CALLBACK_RETURN((void *)e, (void *)e->callback, (char *)cbname, newmask);
243   noit_memory_end();
244
245   if(newmask) {
246     struct epoll_event _ev;
247     memset(&_ev, 0, sizeof(_ev));
248     _ev.data.fd = fd;
249     if(newmask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
250     if(newmask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
251     if(newmask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
252     if(master_fds[fd].e == NULL) {
253       noitL(noit_debug, "eventer %s(%p) epoll asked to modify descheduled fd: %d\n",
254             cbname?cbname:"???", e->callback, fd);
255     } else {
256       if(!pthread_equal(pthread_self(), e->thr_owner)) {
257         pthread_t tgt = e->thr_owner;
258         e->thr_owner = pthread_self();
259         spec = eventer_get_spec_for_event(e);
260         assert(epoll_ctl(spec->epoll_fd, EPOLL_CTL_DEL, fd, &_ev) == 0);
261         e->thr_owner = tgt;
262         spec = eventer_get_spec_for_event(e);
263         assert(epoll_ctl(spec->epoll_fd, EPOLL_CTL_ADD, fd, &_ev) == 0);
264         noitL(eventer_deb, "moved event[%p] from t@%d to t@%d\n", e, (int)pthread_self(), (int)tgt);
265       }
266       else {
267         spec = eventer_get_spec_for_event(e);
268         assert(epoll_ctl(spec->epoll_fd, EPOLL_CTL_MOD, fd, &_ev) == 0);
269       }
270     }
271     /* Set our mask */
272     e->mask = newmask;
273   }
274   else {
275     /* see kqueue implementation for details on the next line */
276     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
277     eventer_free(e);
278   }
279   release_master_fd(fd, lockstate);
280 }
281 static int eventer_epoll_impl_loop() {
282   struct epoll_event *epev;
283   struct epoll_spec *spec;
284
285   spec = eventer_get_spec_for_event(NULL);
286   epev = malloc(sizeof(*epev) * maxfds);
287
288   while(1) {
289     struct timeval __now, __sleeptime;
290     int fd_cnt = 0;
291
292     __sleeptime = eventer_max_sleeptime;
293
294     gettimeofday(&__now, NULL);
295     eventer_dispatch_timed(&__now, &__sleeptime);
296
297     /* Handle cross_thread dispatches */
298     eventer_cross_thread_process();
299
300     /* Handle recurrent events */
301     eventer_dispatch_recurrent(&__now);
302
303     /* Now we move on to our fd-based events */
304     do {
305       fd_cnt = epoll_wait(spec->epoll_fd, epev, maxfds,
306                           __sleeptime.tv_sec * 1000 + __sleeptime.tv_usec / 1000);
307     } while(fd_cnt < 0 && errno == EINTR);
308     noitLT(eventer_deb, &__now, "debug: epoll_wait(%d, [], %d) => %d\n",
309            spec->epoll_fd, maxfds, fd_cnt);
310     if(fd_cnt < 0) {
311       noitLT(eventer_err, &__now, "epoll_wait: %s\n", strerror(errno));
312     }
313     else {
314       int idx;
315       /* loop once to clear */
316       for(idx = 0; idx < fd_cnt; idx++) {
317         struct epoll_event *ev;
318         eventer_t e;
319         int fd, mask = 0;
320
321         ev = &epev[idx];
322
323         if(ev->events & (EPOLLIN | EPOLLPRI)) mask |= EVENTER_READ;
324         if(ev->events & (EPOLLOUT)) mask |= EVENTER_WRITE;
325         if(ev->events & (EPOLLERR|EPOLLHUP)) mask |= EVENTER_EXCEPTION;
326
327         fd = ev->data.fd;
328
329         e = master_fds[fd].e;
330         /* It's possible that someone removed the event and freed it
331          * before we got here.
332          */
333         if(!e) continue;
334
335         eventer_epoll_impl_trigger(e, mask);
336       }
337     }
338   }
339   /* NOTREACHED */
340   return 0;
341 }
342
343 struct _eventer_impl eventer_epoll_impl = {
344   "epoll",
345   eventer_epoll_impl_init,
346   eventer_epoll_impl_propset,
347   eventer_epoll_impl_add,
348   eventer_epoll_impl_remove,
349   eventer_epoll_impl_update,
350   eventer_epoll_impl_remove_fd,
351   eventer_epoll_impl_find_fd,
352   eventer_epoll_impl_trigger,
353   eventer_epoll_impl_loop,
354   eventer_epoll_impl_foreach_fdevent,
355   eventer_wakeup_noop,
356   eventer_epoll_spec_alloc,
357   { 0, 200000 },
358   0,
359   NULL
360 };
Note: See TracBrowser for help on using the browser.