root/src/eventer/eventer_epoll_impl.c

Revision cd97caaaf4a03ed9c491c6c973b956ec045fc20e, 8.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

We don't need to know which is the 'master thread' refs #283

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_atomic.h"
36 #include "utils/noit_skiplist.h"
37 #include "utils/noit_log.h"
38
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <sys/epoll.h>
43 #include <signal.h>
44 #include <pthread.h>
45 #include <assert.h>
46
47 struct _eventer_impl eventer_epoll_impl;
48 #define LOCAL_EVENTER eventer_epoll_impl
49 #define LOCAL_EVENTER_foreach_fdevent eventer_epoll_impl_foreach_fdevent
50 #define maxfds LOCAL_EVENTER.maxfds
51 #define master_fds LOCAL_EVENTER.master_fds
52
53 #include "eventer/eventer_impl_private.h"
54
55 static int *masks;
56 static int epoll_fd = -1;
57
58 static int eventer_epoll_impl_init() {
59   struct rlimit rlim;
60   int rv;
61
62   /* super init */
63   if((rv = eventer_impl_init()) != 0) return rv;
64
65   signal(SIGPIPE, SIG_IGN);
66   epoll_fd = epoll_create(1024);
67   if(epoll_fd == -1) {
68     return -1;
69   }
70   getrlimit(RLIMIT_NOFILE, &rlim);
71   maxfds = rlim.rlim_cur;
72   master_fds = calloc(maxfds, sizeof(*master_fds));
73   masks = calloc(maxfds, sizeof(*masks));
74   return 0;
75 }
76 static int eventer_epoll_impl_propset(const char *key, const char *value) {
77   if(eventer_impl_propset(key, value)) {
78     /* Do our epoll local properties here */
79     return -1;
80   }
81   return 0;
82 }
83 static void eventer_epoll_impl_add(eventer_t e) {
84   struct epoll_event _ev;
85   ev_lock_state_t lockstate;
86   assert(e->mask);
87
88   if(e->mask & EVENTER_ASYNCH) {
89     eventer_add_asynch(NULL, e);
90     return;
91   }
92
93   /* Recurrent delegation */
94   if(e->mask & EVENTER_RECURRENT) {
95     eventer_add_recurrent(e);
96     return;
97   }
98
99   /* Timed events are simple */
100   if(e->mask & EVENTER_TIMER) {
101     eventer_add_timed(e);
102     return;
103   }
104
105   /* file descriptor event */
106   assert(e->whence.tv_sec == 0 && e->whence.tv_usec == 0);
107   memset(&_ev, 0, sizeof(_ev));
108   _ev.data.fd = e->fd;
109   if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
110   if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
111   if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
112
113   lockstate = acquire_master_fd(e->fd);
114   master_fds[e->fd].e = e;
115
116   epoll_ctl(epoll_fd, EPOLL_CTL_ADD, e->fd, &_ev);
117
118   release_master_fd(e->fd, lockstate);
119 }
120 static eventer_t eventer_epoll_impl_remove(eventer_t e) {
121   eventer_t removed = NULL;
122   if(e->mask & EVENTER_ASYNCH) {
123     abort();
124   }
125   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
126     ev_lock_state_t lockstate;
127     struct epoll_event _ev;
128     memset(&_ev, 0, sizeof(_ev));
129     _ev.data.fd = e->fd;
130     lockstate = acquire_master_fd(e->fd);
131     if(e == master_fds[e->fd].e) {
132       removed = e;
133       master_fds[e->fd].e = NULL;
134       epoll_ctl(epoll_fd, EPOLL_CTL_DEL, e->fd, &_ev);
135     }
136     release_master_fd(e->fd, lockstate);
137   }
138   else if(e->mask & EVENTER_TIMER) {
139     removed = eventer_remove_timed(e);
140   }
141   else if(e->mask & EVENTER_RECURRENT) {
142     removed = eventer_remove_recurrent(e);
143   }
144   else {
145     abort();
146   }
147   return removed;
148 }
149 static void eventer_epoll_impl_update(eventer_t e, int mask) {
150   struct epoll_event _ev;
151   if(e->mask & EVENTER_TIMER) {
152     eventer_update_timed(e,mask);
153     return;
154   }
155   memset(&_ev, 0, sizeof(_ev));
156   _ev.data.fd = e->fd;
157   e->mask = mask;
158   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
159     if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
160     if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
161     if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
162     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, e->fd, &_ev);
163   }
164 }
165 static eventer_t eventer_epoll_impl_remove_fd(int fd) {
166   eventer_t eiq = NULL;
167   ev_lock_state_t lockstate;
168   if(master_fds[fd].e) {
169     struct epoll_event _ev;
170     memset(&_ev, 0, sizeof(_ev));
171     _ev.data.fd = fd;
172     lockstate = acquire_master_fd(fd);
173     eiq = master_fds[fd].e;
174     master_fds[fd].e = NULL;
175     epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &_ev);
176     release_master_fd(fd, lockstate);
177   }
178   return eiq;
179 }
180 static eventer_t eventer_epoll_impl_find_fd(int fd) {
181   return master_fds[fd].e;
182 }
183
184 static void eventer_epoll_impl_trigger(eventer_t e, int mask) {
185   struct timeval __now;
186   int fd, oldmask, newmask;
187   const char *cbname;
188   ev_lock_state_t lockstate;
189
190   fd = e->fd;
191   if(e != master_fds[fd].e) return;
192   lockstate = acquire_master_fd(fd);
193   if(lockstate == EV_ALREADY_OWNED) return;
194   assert(lockstate == EV_OWNED);
195
196   gettimeofday(&__now, NULL);
197   oldmask = e->mask;
198   cbname = eventer_name_for_callback(e->callback);
199   noitLT(eventer_deb, &__now, "epoll: fire on %d/%x to %s(%p)\n",
200          fd, mask, cbname?cbname:"???", e->callback);
201   newmask = e->callback(e, mask, e->closure, &__now);
202
203   if(newmask) {
204     struct epoll_event _ev;
205     memset(&_ev, 0, sizeof(_ev));
206     _ev.data.fd = fd;
207     if(newmask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
208     if(newmask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
209     if(newmask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
210     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &_ev);
211     /* Set our mask */
212     e->mask = newmask;
213   }
214   else {
215     /* see kqueue implementation for details on the next line */
216     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
217     eventer_free(e);
218   }
219   release_master_fd(fd, lockstate);
220 }
221 static int eventer_epoll_impl_loop() {
222   struct epoll_event *epev;
223
224   epev = malloc(sizeof(*epev) * maxfds);
225
226   while(1) {
227     struct timeval __now, __sleeptime;
228     int fd_cnt = 0;
229
230     __sleeptime = eventer_max_sleeptime;
231
232     eventer_dispatch_timed(&__now, &__sleeptime);
233
234     /* Handle recurrent events */
235     eventer_dispatch_recurrent(&__now);
236
237     /* Now we move on to our fd-based events */
238     fd_cnt = epoll_wait(epoll_fd, epev, maxfds,
239                         __sleeptime.tv_sec * 1000 + __sleeptime.tv_usec / 1000);
240     noitLT(eventer_deb, &__now, "debug: epoll_wait(%d, [], %d) => %d\n", epoll_fd, maxfds, fd_cnt);
241     if(fd_cnt < 0) {
242       noitLT(eventer_err, &__now, "epoll_wait: %s\n", strerror(errno));
243     }
244     else {
245       int idx;
246       /* loop once to clear */
247       for(idx = 0; idx < fd_cnt; idx++) {
248         struct epoll_event *ev;
249         eventer_t e;
250         int fd, mask = 0;
251
252         ev = &epev[idx];
253
254         if(ev->events & (EPOLLIN | EPOLLPRI)) mask |= EVENTER_READ;
255         if(ev->events & (EPOLLOUT)) mask |= EVENTER_WRITE;
256         if(ev->events & (EPOLLERR|EPOLLHUP)) mask |= EVENTER_EXCEPTION;
257
258         fd = ev->data.fd;
259
260         e = master_fds[fd].e;
261         /* It's possible that someone removed the event and freed it
262          * before we got here.
263          */
264         if(!e) continue;
265
266         eventer_epoll_impl_trigger(e, mask);
267       }
268     }
269   }
270   /* NOTREACHED */
271   return 0;
272 }
273
274 struct _eventer_impl eventer_epoll_impl = {
275   "epoll",
276   eventer_epoll_impl_init,
277   eventer_epoll_impl_propset,
278   eventer_epoll_impl_add,
279   eventer_epoll_impl_remove,
280   eventer_epoll_impl_update,
281   eventer_epoll_impl_remove_fd,
282   eventer_epoll_impl_find_fd,
283   eventer_epoll_impl_trigger,
284   eventer_epoll_impl_loop,
285   eventer_epoll_impl_foreach_fdevent,
286   { 0, 200000 },
287   0,
288   NULL
289 };
Note: See TracBrowser for help on using the browser.