root/src/eventer/eventer_epoll_impl.c

Revision 4c05448ea043aaf96b94270f68092553c96ccd9d, 9.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 11 years ago)

little bits of valgrind ecstasy

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/epoll.h>
16 #include <signal.h>
17 #include <pthread.h>
18 #include <assert.h>
19
20 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
21 static int maxfds;
22 static struct {
23   eventer_t e;
24   pthread_t executor;
25   noit_spinlock_t lock;
26 } *master_fds = NULL;
27 static int *masks;
28
29 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
30
31 static ev_lock_state_t
32 acquire_master_fd(int fd) {
33   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
34     master_fds[fd].executor = pthread_self();
35     return EV_OWNED;
36   }
37   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
38     return EV_ALREADY_OWNED;
39   }
40   noit_spinlock_lock(&master_fds[fd].lock);
41   master_fds[fd].executor = pthread_self();
42   return EV_OWNED;
43 }
44 static void
45 release_master_fd(int fd, ev_lock_state_t as) {
46   if(as == EV_OWNED) {
47     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
48     noit_spinlock_unlock(&master_fds[fd].lock);
49   }
50 }
51
52 static pthread_t master_thread;
53 static int epoll_fd = -1;
54 static pthread_mutex_t te_lock;
55 static noit_skiplist *timed_events = NULL;
56
57 static int eventer_epoll_impl_init() {
58   struct rlimit rlim;
59   int rv;
60
61   /* super init */
62   if((rv = eventer_impl_init()) != 0) return rv;
63
64   master_thread = pthread_self();
65   signal(SIGPIPE, SIG_IGN);
66   epoll_fd = epoll_create(1024);
67   if(epoll_fd == -1) {
68     return -1;
69   }
70   pthread_mutex_init(&te_lock, NULL);
71   getrlimit(RLIMIT_NOFILE, &rlim);
72   maxfds = rlim.rlim_cur;
73   master_fds = calloc(maxfds, sizeof(*master_fds));
74   masks = calloc(maxfds, sizeof(*masks));
75   timed_events = calloc(1, sizeof(*timed_events));
76   noit_skiplist_init(timed_events);
77   noit_skiplist_set_compare(timed_events,
78                             eventer_timecompare, eventer_timecompare);
79   noit_skiplist_add_index(timed_events,
80                           noit_compare_voidptr, noit_compare_voidptr);
81   return 0;
82 }
83 static int eventer_epoll_impl_propset(const char *key, const char *value) {
84   if(eventer_impl_propset(key, value)) {
85     /* Do our epoll local properties here */
86     return -1;
87   }
88   return 0;
89 }
90 static void eventer_epoll_impl_add(eventer_t e) {
91   struct epoll_event _ev;
92   ev_lock_state_t lockstate;
93   assert(e->mask);
94
95   if(e->mask & EVENTER_ASYNCH) {
96     eventer_add_asynch(NULL, e);
97     return;
98   }
99
100   /* Recurrent delegation */
101   if(e->mask & EVENTER_RECURRENT) {
102     eventer_add_recurrent(e);
103     return;
104   }
105
106   /* Timed events are simple */
107   if(e->mask & EVENTER_TIMER) {
108     pthread_mutex_lock(&te_lock);
109     noit_skiplist_insert(timed_events, e);
110     pthread_mutex_unlock(&te_lock);
111     return;
112   }
113
114   /* file descriptor event */
115   memset(&_ev, 0, sizeof(_ev));
116   _ev.data.fd = e->fd;
117   if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
118   if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
119   if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
120
121   lockstate = acquire_master_fd(e->fd);
122   master_fds[e->fd].e = e;
123
124   epoll_ctl(epoll_fd, EPOLL_CTL_ADD, e->fd, &_ev);
125
126   release_master_fd(e->fd, lockstate);
127 }
128 static eventer_t eventer_epoll_impl_remove(eventer_t e) {
129   eventer_t removed = NULL;
130   if(e->mask & EVENTER_ASYNCH) {
131     abort();
132   }
133   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
134     ev_lock_state_t lockstate;
135     struct epoll_event _ev;
136     memset(&_ev, 0, sizeof(_ev));
137     _ev.data.fd = e->fd;
138     lockstate = acquire_master_fd(e->fd);
139     if(e == master_fds[e->fd].e) {
140       removed = e;
141       master_fds[e->fd].e = NULL;
142       epoll_ctl(epoll_fd, EPOLL_CTL_DEL, e->fd, &_ev);
143     }
144     release_master_fd(e->fd, lockstate);
145   }
146   else if(e->mask & EVENTER_TIMER) {
147     pthread_mutex_lock(&te_lock);
148     if(noit_skiplist_remove_compare(timed_events, e, NULL,
149                                     noit_compare_voidptr))
150       removed = e;
151     pthread_mutex_unlock(&te_lock);
152   }
153   else if(e->mask & EVENTER_RECURRENT) {
154     removed = eventer_remove_recurrent(e);
155   }
156   else {
157     abort();
158   }
159   return removed;
160 }
161 static void eventer_epoll_impl_update(eventer_t e) {
162   struct epoll_event _ev;
163   if(e->mask & EVENTER_TIMER) {
164     pthread_mutex_lock(&te_lock);
165     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
166     noit_skiplist_insert(timed_events, e);
167     pthread_mutex_unlock(&te_lock);
168     return;
169   }
170   memset(&_ev, 0, sizeof(_ev));
171   _ev.data.fd = e->fd;
172   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
173     if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
174     if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
175     if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
176     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, e->fd, &_ev);
177   }
178 }
179 static eventer_t eventer_epoll_impl_remove_fd(int fd) {
180   eventer_t eiq = NULL;
181   ev_lock_state_t lockstate;
182   if(master_fds[fd].e) {
183     struct epoll_event _ev;
184     memset(&_ev, 0, sizeof(_ev));
185     _ev.data.fd = fd;
186     lockstate = acquire_master_fd(fd);
187     eiq = master_fds[fd].e;
188     master_fds[fd].e = NULL;
189     epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &_ev);
190     release_master_fd(fd, lockstate);
191   }
192   return eiq;
193 }
194 static eventer_t eventer_epoll_impl_find_fd(int fd) {
195   return master_fds[fd].e;
196 }
197 static int eventer_epoll_impl_loop() {
198   int is_master_thread = 0;
199   struct epoll_event *epev;
200   pthread_t self;
201
202   self = pthread_self();
203   if(pthread_equal(self, master_thread)) is_master_thread = 1;
204
205   epev = malloc(sizeof(*epev) * maxfds);
206
207   while(1) {
208     struct timeval __now, __sleeptime;
209     int fd_cnt = 0;
210     int max_timed_events_to_process;
211     int newmask;
212
213     __sleeptime = __max_sleeptime;
214
215     /* Handle timed events...
216      * we could be multithreaded, so if we pop forever we could starve
217      * ourselves. */
218     max_timed_events_to_process = timed_events->size;
219     while(max_timed_events_to_process-- > 0) {
220       eventer_t timed_event;
221
222       gettimeofday(&__now, NULL);
223
224       pthread_mutex_lock(&te_lock);
225       /* Peek at our next timed event, if should fire, pop it.
226        * otherwise we noop and NULL it out to break the loop. */
227       timed_event = noit_skiplist_peek(timed_events);
228       if(timed_event) {
229         if(compare_timeval(timed_event->whence, __now) < 0) {
230           timed_event = noit_skiplist_pop(timed_events, NULL);
231         }
232         else {
233           sub_timeval(timed_event->whence, __now, &__sleeptime);
234           timed_event = NULL;
235         }
236       }
237       pthread_mutex_unlock(&te_lock);
238       if(timed_event == NULL) break;
239
240       /* Make our call */
241       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
242                                       timed_event->closure, &__now);
243       if(newmask)
244         eventer_add(timed_event);
245       else
246         eventer_free(timed_event);
247     }
248
249     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
250       /* we exceed our configured maximum, set it down */
251       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
252     }
253
254     /* Handle recurrent events */
255     eventer_dispatch_recurrent(&__now);
256
257     /* Now we move on to our fd-based events */
258     fd_cnt = epoll_wait(epoll_fd, epev, maxfds,
259                         __sleeptime.tv_sec * 1000 + __sleeptime.tv_usec / 1000);
260     noitLT(eventer_deb, &__now, "debug: epoll_wait(%d, [], %d) => %d\n", epoll_fd, maxfds, fd_cnt);
261     if(fd_cnt < 0) {
262       noitLT(eventer_err, &__now, "epoll_wait: %s\n", strerror(errno));
263     }
264     else {
265       int idx;
266       /* loop once to clear */
267       for(idx = 0; idx < fd_cnt; idx++) {
268         struct epoll_event *ev;
269         const char *cbname;
270         ev_lock_state_t lockstate;
271         eventer_t e;
272         int fd, oldmask, mask = 0;
273
274         ev = &epev[idx];
275
276         if(ev->events & (EPOLLIN | EPOLLPRI)) mask |= EVENTER_READ;
277         if(ev->events & (EPOLLOUT)) mask |= EVENTER_WRITE;
278         if(ev->events & (EPOLLERR|EPOLLHUP)) mask |= EVENTER_EXCEPTION;
279
280         fd = ev->data.fd;
281         e = master_fds[fd].e;
282
283         lockstate = acquire_master_fd(fd);
284         assert(lockstate == EV_OWNED);
285
286         gettimeofday(&__now, NULL);
287         oldmask = e->mask;
288         cbname = eventer_name_for_callback(e->callback);
289         noitLT(eventer_deb, &__now, "epoll: fire on %d/%x to %s(%p)\n",
290                fd, mask, cbname?cbname:"???", e->callback);
291         newmask = e->callback(e, mask, e->closure, &__now);
292
293         if(newmask) {
294           struct epoll_event _ev;
295           memset(&_ev, 0, sizeof(_ev));
296           _ev.data.fd = fd;
297           if(newmask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
298           if(newmask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
299           if(newmask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
300           epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &_ev);
301           /* Set our mask */
302           e->mask = newmask;
303         }
304         else {
305           eventer_free(e);
306         }
307         release_master_fd(fd, lockstate);
308       }
309     }
310   }
311   /* NOTREACHED */
312   return 0;
313 }
314
315 struct _eventer_impl eventer_epoll_impl = {
316   "epoll",
317   eventer_epoll_impl_init,
318   eventer_epoll_impl_propset,
319   eventer_epoll_impl_add,
320   eventer_epoll_impl_remove,
321   eventer_epoll_impl_update,
322   eventer_epoll_impl_remove_fd,
323   eventer_epoll_impl_find_fd,
324   eventer_epoll_impl_loop
325 };
Note: See TracBrowser for help on using the browser.