root/src/eventer/eventer_epoll_impl.c

Revision df2e1eb8a0d6b779a1327d8df4d867b961ce4b07, 9.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fixed #66

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/epoll.h>
16 #include <signal.h>
17 #include <pthread.h>
18 #include <assert.h>
19
20 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
21 static int maxfds;
22 static struct {
23   eventer_t e;
24   pthread_t executor;
25   noit_spinlock_t lock;
26 } *master_fds = NULL;
27 static int *masks;
28
29 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
30
31 static ev_lock_state_t
32 acquire_master_fd(int fd) {
33   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
34     master_fds[fd].executor = pthread_self();
35     return EV_OWNED;
36   }
37   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
38     return EV_ALREADY_OWNED;
39   }
40   noit_spinlock_lock(&master_fds[fd].lock);
41   master_fds[fd].executor = pthread_self();
42   return EV_OWNED;
43 }
44 static void
45 release_master_fd(int fd, ev_lock_state_t as) {
46   if(as == EV_OWNED) {
47     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
48     noit_spinlock_unlock(&master_fds[fd].lock);
49   }
50 }
51
52 static pthread_t master_thread;
53 static int epoll_fd = -1;
54 static pthread_mutex_t te_lock;
55 static noit_skiplist *timed_events = NULL;
56
57 static int eventer_epoll_impl_init() {
58   struct rlimit rlim;
59   int rv;
60
61   /* super init */
62   if((rv = eventer_impl_init()) != 0) return rv;
63
64   master_thread = pthread_self();
65   signal(SIGPIPE, SIG_IGN);
66   epoll_fd = epoll_create(1024);
67   if(epoll_fd == -1) {
68     return -1;
69   }
70   pthread_mutex_init(&te_lock, NULL);
71   getrlimit(RLIMIT_NOFILE, &rlim);
72   maxfds = rlim.rlim_cur;
73   master_fds = calloc(maxfds, sizeof(*master_fds));
74   masks = calloc(maxfds, sizeof(*masks));
75   timed_events = calloc(1, sizeof(*timed_events));
76   noit_skiplist_init(timed_events);
77   noit_skiplist_set_compare(timed_events,
78                             eventer_timecompare, eventer_timecompare);
79   noit_skiplist_add_index(timed_events,
80                           noit_compare_voidptr, noit_compare_voidptr);
81   return 0;
82 }
83 static int eventer_epoll_impl_propset(const char *key, const char *value) {
84   if(eventer_impl_propset(key, value)) {
85     /* Do our epoll local properties here */
86     return -1;
87   }
88   return 0;
89 }
90 static void eventer_epoll_impl_add(eventer_t e) {
91   struct epoll_event _ev;
92   ev_lock_state_t lockstate;
93   assert(e->mask);
94
95   if(e->mask & EVENTER_ASYNCH) {
96     eventer_add_asynch(NULL, e);
97     return;
98   }
99
100   /* Recurrent delegation */
101   if(e->mask & EVENTER_RECURRENT) {
102     eventer_add_recurrent(e);
103     return;
104   }
105
106   /* Timed events are simple */
107   if(e->mask & EVENTER_TIMER) {
108     pthread_mutex_lock(&te_lock);
109     noit_skiplist_insert(timed_events, e);
110     pthread_mutex_unlock(&te_lock);
111     return;
112   }
113
114   /* file descriptor event */
115   memset(&_ev, 0, sizeof(_ev));
116   _ev.data.fd = e->fd;
117   if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
118   if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
119   if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
120
121   lockstate = acquire_master_fd(e->fd);
122   master_fds[e->fd].e = e;
123
124   epoll_ctl(epoll_fd, EPOLL_CTL_ADD, e->fd, &_ev);
125
126   release_master_fd(e->fd, lockstate);
127 }
128 static eventer_t eventer_epoll_impl_remove(eventer_t e) {
129   eventer_t removed = NULL;
130   if(e->mask & EVENTER_ASYNCH) {
131     abort();
132   }
133   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
134     ev_lock_state_t lockstate;
135     struct epoll_event _ev;
136     memset(&_ev, 0, sizeof(_ev));
137     _ev.data.fd = e->fd;
138     lockstate = acquire_master_fd(e->fd);
139     if(e == master_fds[e->fd].e) {
140       removed = e;
141       master_fds[e->fd].e = NULL;
142       epoll_ctl(epoll_fd, EPOLL_CTL_DEL, e->fd, &_ev);
143     }
144     release_master_fd(e->fd, lockstate);
145   }
146   else if(e->mask & EVENTER_TIMER) {
147     pthread_mutex_lock(&te_lock);
148     if(noit_skiplist_remove_compare(timed_events, e, NULL,
149                                     noit_compare_voidptr))
150       removed = e;
151     pthread_mutex_unlock(&te_lock);
152   }
153   else if(e->mask & EVENTER_RECURRENT) {
154     removed = eventer_remove_recurrent(e);
155   }
156   else {
157     abort();
158   }
159   return removed;
160 }
161 static void eventer_epoll_impl_update(eventer_t e, int mask) {
162   struct epoll_event _ev;
163   if(e->mask & EVENTER_TIMER) {
164     assert(e->mask & EVENTER_TIMER);
165     pthread_mutex_lock(&te_lock);
166     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
167     noit_skiplist_insert(timed_events, e);
168     pthread_mutex_unlock(&te_lock);
169     return;
170   }
171   memset(&_ev, 0, sizeof(_ev));
172   _ev.data.fd = e->fd;
173   e->mask = mask;
174   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
175     if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
176     if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
177     if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
178     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, e->fd, &_ev);
179   }
180 }
181 static eventer_t eventer_epoll_impl_remove_fd(int fd) {
182   eventer_t eiq = NULL;
183   ev_lock_state_t lockstate;
184   if(master_fds[fd].e) {
185     struct epoll_event _ev;
186     memset(&_ev, 0, sizeof(_ev));
187     _ev.data.fd = fd;
188     lockstate = acquire_master_fd(fd);
189     eiq = master_fds[fd].e;
190     master_fds[fd].e = NULL;
191     epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &_ev);
192     release_master_fd(fd, lockstate);
193   }
194   return eiq;
195 }
196 static eventer_t eventer_epoll_impl_find_fd(int fd) {
197   return master_fds[fd].e;
198 }
199 static int eventer_epoll_impl_loop() {
200   int is_master_thread = 0;
201   struct epoll_event *epev;
202   pthread_t self;
203
204   self = pthread_self();
205   if(pthread_equal(self, master_thread)) is_master_thread = 1;
206
207   epev = malloc(sizeof(*epev) * maxfds);
208
209   while(1) {
210     struct timeval __now, __sleeptime;
211     int fd_cnt = 0;
212     int max_timed_events_to_process;
213     int newmask;
214
215     __sleeptime = __max_sleeptime;
216
217     /* Handle timed events...
218      * we could be multithreaded, so if we pop forever we could starve
219      * ourselves. */
220     max_timed_events_to_process = timed_events->size;
221     while(max_timed_events_to_process-- > 0) {
222       eventer_t timed_event;
223
224       gettimeofday(&__now, NULL);
225
226       pthread_mutex_lock(&te_lock);
227       /* Peek at our next timed event, if should fire, pop it.
228        * otherwise we noop and NULL it out to break the loop. */
229       timed_event = noit_skiplist_peek(timed_events);
230       if(timed_event) {
231         if(compare_timeval(timed_event->whence, __now) < 0) {
232           timed_event = noit_skiplist_pop(timed_events, NULL);
233         }
234         else {
235           sub_timeval(timed_event->whence, __now, &__sleeptime);
236           timed_event = NULL;
237         }
238       }
239       pthread_mutex_unlock(&te_lock);
240       if(timed_event == NULL) break;
241
242       /* Make our call */
243       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
244                                       timed_event->closure, &__now);
245       if(newmask)
246         eventer_add(timed_event);
247       else
248         eventer_free(timed_event);
249     }
250
251     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
252       /* we exceed our configured maximum, set it down */
253       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
254     }
255
256     /* Handle recurrent events */
257     eventer_dispatch_recurrent(&__now);
258
259     /* Now we move on to our fd-based events */
260     fd_cnt = epoll_wait(epoll_fd, epev, maxfds,
261                         __sleeptime.tv_sec * 1000 + __sleeptime.tv_usec / 1000);
262     noitLT(eventer_deb, &__now, "debug: epoll_wait(%d, [], %d) => %d\n", epoll_fd, maxfds, fd_cnt);
263     if(fd_cnt < 0) {
264       noitLT(eventer_err, &__now, "epoll_wait: %s\n", strerror(errno));
265     }
266     else {
267       int idx;
268       /* loop once to clear */
269       for(idx = 0; idx < fd_cnt; idx++) {
270         struct epoll_event *ev;
271         const char *cbname;
272         ev_lock_state_t lockstate;
273         eventer_t e;
274         int fd, oldmask, mask = 0;
275
276         ev = &epev[idx];
277
278         if(ev->events & (EPOLLIN | EPOLLPRI)) mask |= EVENTER_READ;
279         if(ev->events & (EPOLLOUT)) mask |= EVENTER_WRITE;
280         if(ev->events & (EPOLLERR|EPOLLHUP)) mask |= EVENTER_EXCEPTION;
281
282         fd = ev->data.fd;
283         e = master_fds[fd].e;
284         /* It's possible that someone removed the event and freed it
285          * before we got here.
286          */
287         if(!e) continue;
288
289
290         lockstate = acquire_master_fd(fd);
291         assert(lockstate == EV_OWNED);
292
293         gettimeofday(&__now, NULL);
294         oldmask = e->mask;
295         cbname = eventer_name_for_callback(e->callback);
296         noitLT(eventer_deb, &__now, "epoll: fire on %d/%x to %s(%p)\n",
297                fd, mask, cbname?cbname:"???", e->callback);
298         newmask = e->callback(e, mask, e->closure, &__now);
299
300         if(newmask) {
301           struct epoll_event _ev;
302           memset(&_ev, 0, sizeof(_ev));
303           _ev.data.fd = fd;
304           if(newmask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
305           if(newmask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
306           if(newmask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
307           epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &_ev);
308           /* Set our mask */
309           e->mask = newmask;
310         }
311         else {
312           /* see kqueue implementation for details on the next line */
313           if(master_fds[fd].e == e) master_fds[fd].e = NULL;
314           eventer_free(e);
315         }
316         release_master_fd(fd, lockstate);
317       }
318     }
319   }
320   /* NOTREACHED */
321   return 0;
322 }
323
324 struct _eventer_impl eventer_epoll_impl = {
325   "epoll",
326   eventer_epoll_impl_init,
327   eventer_epoll_impl_propset,
328   eventer_epoll_impl_add,
329   eventer_epoll_impl_remove,
330   eventer_epoll_impl_update,
331   eventer_epoll_impl_remove_fd,
332   eventer_epoll_impl_find_fd,
333   eventer_epoll_impl_loop
334 };
Note: See TracBrowser for help on using the browser.