root/src/eventer/eventer_epoll_impl.c

Revision 4ad439d9108f1e34cf90ae8ba2709acc7b1f9b08, 9.7 kB (checked in by Mark Harrison <mark@omniti.com>, 5 years ago)

Compile fix on linux refs #65

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <sys/epoll.h>
16 #include <signal.h>
17 #include <pthread.h>
18 #include <assert.h>
19
20 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
21 static int maxfds;
22 static struct {
23   eventer_t e;
24   pthread_t executor;
25   noit_spinlock_t lock;
26 } *master_fds = NULL;
27 static int *masks;
28
29 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
30
31 static ev_lock_state_t
32 acquire_master_fd(int fd) {
33   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
34     master_fds[fd].executor = pthread_self();
35     return EV_OWNED;
36   }
37   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
38     return EV_ALREADY_OWNED;
39   }
40   noit_spinlock_lock(&master_fds[fd].lock);
41   master_fds[fd].executor = pthread_self();
42   return EV_OWNED;
43 }
44 static void
45 release_master_fd(int fd, ev_lock_state_t as) {
46   if(as == EV_OWNED) {
47     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
48     noit_spinlock_unlock(&master_fds[fd].lock);
49   }
50 }
51
52 static pthread_t master_thread;
53 static int epoll_fd = -1;
54 static pthread_mutex_t te_lock;
55 static noit_skiplist *timed_events = NULL;
56
57 static int eventer_epoll_impl_init() {
58   struct rlimit rlim;
59   int rv;
60
61   /* super init */
62   if((rv = eventer_impl_init()) != 0) return rv;
63
64   master_thread = pthread_self();
65   signal(SIGPIPE, SIG_IGN);
66   epoll_fd = epoll_create(1024);
67   if(epoll_fd == -1) {
68     return -1;
69   }
70   pthread_mutex_init(&te_lock, NULL);
71   getrlimit(RLIMIT_NOFILE, &rlim);
72   maxfds = rlim.rlim_cur;
73   master_fds = calloc(maxfds, sizeof(*master_fds));
74   masks = calloc(maxfds, sizeof(*masks));
75   timed_events = calloc(1, sizeof(*timed_events));
76   noit_skiplist_init(timed_events);
77   noit_skiplist_set_compare(timed_events,
78                             eventer_timecompare, eventer_timecompare);
79   noit_skiplist_add_index(timed_events,
80                           noit_compare_voidptr, noit_compare_voidptr);
81   return 0;
82 }
83 static int eventer_epoll_impl_propset(const char *key, const char *value) {
84   if(eventer_impl_propset(key, value)) {
85     /* Do our epoll local properties here */
86     return -1;
87   }
88   return 0;
89 }
90 static void eventer_epoll_impl_add(eventer_t e) {
91   struct epoll_event _ev;
92   ev_lock_state_t lockstate;
93   assert(e->mask);
94
95   if(e->mask & EVENTER_ASYNCH) {
96     eventer_add_asynch(NULL, e);
97     return;
98   }
99
100   /* Recurrent delegation */
101   if(e->mask & EVENTER_RECURRENT) {
102     eventer_add_recurrent(e);
103     return;
104   }
105
106   /* Timed events are simple */
107   if(e->mask & EVENTER_TIMER) {
108     pthread_mutex_lock(&te_lock);
109     noit_skiplist_insert(timed_events, e);
110     pthread_mutex_unlock(&te_lock);
111     return;
112   }
113
114   /* file descriptor event */
115   memset(&_ev, 0, sizeof(_ev));
116   _ev.data.fd = e->fd;
117   if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
118   if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
119   if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
120
121   lockstate = acquire_master_fd(e->fd);
122   master_fds[e->fd].e = e;
123
124   epoll_ctl(epoll_fd, EPOLL_CTL_ADD, e->fd, &_ev);
125
126   release_master_fd(e->fd, lockstate);
127 }
128 static eventer_t eventer_epoll_impl_remove(eventer_t e) {
129   eventer_t removed = NULL;
130   if(e->mask & EVENTER_ASYNCH) {
131     abort();
132   }
133   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
134     ev_lock_state_t lockstate;
135     struct epoll_event _ev;
136     memset(&_ev, 0, sizeof(_ev));
137     _ev.data.fd = e->fd;
138     lockstate = acquire_master_fd(e->fd);
139     if(e == master_fds[e->fd].e) {
140       removed = e;
141       master_fds[e->fd].e = NULL;
142       epoll_ctl(epoll_fd, EPOLL_CTL_DEL, e->fd, &_ev);
143     }
144     release_master_fd(e->fd, lockstate);
145   }
146   else if(e->mask & EVENTER_TIMER) {
147     pthread_mutex_lock(&te_lock);
148     if(noit_skiplist_remove_compare(timed_events, e, NULL,
149                                     noit_compare_voidptr))
150       removed = e;
151     pthread_mutex_unlock(&te_lock);
152   }
153   else if(e->mask & EVENTER_RECURRENT) {
154     removed = eventer_remove_recurrent(e);
155   }
156   else {
157     abort();
158   }
159   return removed;
160 }
161 static void eventer_epoll_impl_update(eventer_t e, int mask) {
162   struct epoll_event _ev;
163   if(e->mask & EVENTER_TIMER) {
164     assert(e->mask & EVENTER_TIMER);
165     pthread_mutex_lock(&te_lock);
166     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
167     noit_skiplist_insert(timed_events, e);
168     pthread_mutex_unlock(&te_lock);
169     return;
170   }
171   memset(&_ev, 0, sizeof(_ev));
172   _ev.data.fd = e->fd;
173   e->mask = mask;
174   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
175     if(e->mask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
176     if(e->mask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
177     if(e->mask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
178     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, e->fd, &_ev);
179   }
180 }
181 static eventer_t eventer_epoll_impl_remove_fd(int fd) {
182   eventer_t eiq = NULL;
183   ev_lock_state_t lockstate;
184   if(master_fds[fd].e) {
185     struct epoll_event _ev;
186     memset(&_ev, 0, sizeof(_ev));
187     _ev.data.fd = fd;
188     lockstate = acquire_master_fd(fd);
189     eiq = master_fds[fd].e;
190     master_fds[fd].e = NULL;
191     epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, &_ev);
192     release_master_fd(fd, lockstate);
193   }
194   return eiq;
195 }
196 static eventer_t eventer_epoll_impl_find_fd(int fd) {
197   return master_fds[fd].e;
198 }
199
200 static void eventer_epoll_impl_trigger(eventer_t e, int mask) {
201   struct timeval __now;
202   int fd, oldmask, newmask;
203   const char *cbname;
204   ev_lock_state_t lockstate;
205
206   fd = e->fd;
207   if(e != master_fds[fd].e) return;
208   lockstate = acquire_master_fd(fd);
209   if(lockstate == EV_ALREADY_OWNED) return;
210   assert(lockstate == EV_OWNED);
211
212   gettimeofday(&__now, NULL);
213   oldmask = e->mask;
214   cbname = eventer_name_for_callback(e->callback);
215   noitLT(eventer_deb, &__now, "epoll: fire on %d/%x to %s(%p)\n",
216          fd, mask, cbname?cbname:"???", e->callback);
217   newmask = e->callback(e, mask, e->closure, &__now);
218
219   if(newmask) {
220     struct epoll_event _ev;
221     memset(&_ev, 0, sizeof(_ev));
222     _ev.data.fd = fd;
223     if(newmask & EVENTER_READ) _ev.events |= (EPOLLIN|EPOLLPRI);
224     if(newmask & EVENTER_WRITE) _ev.events |= (EPOLLOUT);
225     if(newmask & EVENTER_EXCEPTION) _ev.events |= (EPOLLERR|EPOLLHUP);
226     epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &_ev);
227     /* Set our mask */
228     e->mask = newmask;
229   }
230   else {
231     /* see kqueue implementation for details on the next line */
232     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
233     eventer_free(e);
234   }
235   release_master_fd(fd, lockstate);
236 }
237 static int eventer_epoll_impl_loop() {
238   int is_master_thread = 0;
239   struct epoll_event *epev;
240   pthread_t self;
241
242   self = pthread_self();
243   if(pthread_equal(self, master_thread)) is_master_thread = 1;
244
245   epev = malloc(sizeof(*epev) * maxfds);
246
247   while(1) {
248     struct timeval __now, __sleeptime;
249     int fd_cnt = 0;
250     int max_timed_events_to_process;
251     int newmask;
252
253     __sleeptime = __max_sleeptime;
254
255     /* Handle timed events...
256      * we could be multithreaded, so if we pop forever we could starve
257      * ourselves. */
258     max_timed_events_to_process = timed_events->size;
259     while(max_timed_events_to_process-- > 0) {
260       eventer_t timed_event;
261
262       gettimeofday(&__now, NULL);
263
264       pthread_mutex_lock(&te_lock);
265       /* Peek at our next timed event, if should fire, pop it.
266        * otherwise we noop and NULL it out to break the loop. */
267       timed_event = noit_skiplist_peek(timed_events);
268       if(timed_event) {
269         if(compare_timeval(timed_event->whence, __now) < 0) {
270           timed_event = noit_skiplist_pop(timed_events, NULL);
271         }
272         else {
273           sub_timeval(timed_event->whence, __now, &__sleeptime);
274           timed_event = NULL;
275         }
276       }
277       pthread_mutex_unlock(&te_lock);
278       if(timed_event == NULL) break;
279
280       /* Make our call */
281       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
282                                       timed_event->closure, &__now);
283       if(newmask)
284         eventer_add(timed_event);
285       else
286         eventer_free(timed_event);
287     }
288
289     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
290       /* we exceed our configured maximum, set it down */
291       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
292     }
293
294     /* Handle recurrent events */
295     eventer_dispatch_recurrent(&__now);
296
297     /* Now we move on to our fd-based events */
298     fd_cnt = epoll_wait(epoll_fd, epev, maxfds,
299                         __sleeptime.tv_sec * 1000 + __sleeptime.tv_usec / 1000);
300     noitLT(eventer_deb, &__now, "debug: epoll_wait(%d, [], %d) => %d\n", epoll_fd, maxfds, fd_cnt);
301     if(fd_cnt < 0) {
302       noitLT(eventer_err, &__now, "epoll_wait: %s\n", strerror(errno));
303     }
304     else {
305       int idx;
306       /* loop once to clear */
307       for(idx = 0; idx < fd_cnt; idx++) {
308         struct epoll_event *ev;
309         eventer_t e;
310         int fd, mask = 0;
311
312         ev = &epev[idx];
313
314         if(ev->events & (EPOLLIN | EPOLLPRI)) mask |= EVENTER_READ;
315         if(ev->events & (EPOLLOUT)) mask |= EVENTER_WRITE;
316         if(ev->events & (EPOLLERR|EPOLLHUP)) mask |= EVENTER_EXCEPTION;
317
318         fd = ev->data.fd;
319
320         e = master_fds[fd].e;
321         /* It's possible that someone removed the event and freed it
322          * before we got here.
323          */
324         if(!e) continue;
325
326         eventer_epoll_impl_trigger(e, mask);
327       }
328     }
329   }
330   /* NOTREACHED */
331   return 0;
332 }
333
334 struct _eventer_impl eventer_epoll_impl = {
335   "epoll",
336   eventer_epoll_impl_init,
337   eventer_epoll_impl_propset,
338   eventer_epoll_impl_add,
339   eventer_epoll_impl_remove,
340   eventer_epoll_impl_update,
341   eventer_epoll_impl_remove_fd,
342   eventer_epoll_impl_find_fd,
343   eventer_epoll_impl_trigger,
344   eventer_epoll_impl_loop
345 };
Note: See TracBrowser for help on using the browser.