root/src/eventer/eventer_ports_impl.c

Revision df2e1eb8a0d6b779a1327d8df4d867b961ce4b07, 11.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fixed #66

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_atomic.h"
9 #include "utils/noit_skiplist.h"
10 #include "utils/noit_log.h"
11
12 #include <errno.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <signal.h>
16 #include <port.h>
17 #include <pthread.h>
18 #include <assert.h>
19
20 #define MAX_PORT_EVENTS 1024
21
22 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
23 static int maxfds;
24 static struct {
25   eventer_t e;
26   pthread_t executor;
27   noit_spinlock_t lock;
28 } *master_fds = NULL;
29
30 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
31
32 static ev_lock_state_t
33 acquire_master_fd(int fd) {
34   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
35     master_fds[fd].executor = pthread_self();
36     return EV_OWNED;
37   }
38   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
39     return EV_ALREADY_OWNED;
40   }
41   noit_spinlock_lock(&master_fds[fd].lock);
42   master_fds[fd].executor = pthread_self();
43   return EV_OWNED;
44 }
45 static void
46 release_master_fd(int fd, ev_lock_state_t as) {
47   if(as == EV_OWNED) {
48     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
49     noit_spinlock_unlock(&master_fds[fd].lock);
50   }
51 }
52
53 static pthread_t master_thread;
54 static int port_fd = -1;
55
56 static pthread_mutex_t te_lock;
57 static noit_skiplist *timed_events = NULL;
58
59 static int eventer_ports_impl_init() {
60   struct rlimit rlim;
61   int rv;
62
63   /* super init */
64   if((rv = eventer_impl_init()) != 0) return rv;
65
66   master_thread = pthread_self();
67   signal(SIGPIPE, SIG_IGN);
68   port_fd = port_create();
69   if(port_fd == -1) {
70     return -1;
71   }
72   pthread_mutex_init(&te_lock, NULL);
73   getrlimit(RLIMIT_NOFILE, &rlim);
74   maxfds = rlim.rlim_cur;
75   master_fds = calloc(maxfds, sizeof(*master_fds));
76   timed_events = calloc(1, sizeof(*timed_events));
77   noit_skiplist_init(timed_events);
78   noit_skiplist_set_compare(timed_events,
79                             eventer_timecompare, eventer_timecompare);
80   noit_skiplist_add_index(timed_events,
81                           noit_compare_voidptr, noit_compare_voidptr);
82   return 0;
83 }
84 static int eventer_ports_impl_propset(const char *key, const char *value) {
85   if(eventer_impl_propset(key, value)) {
86     return -1;
87   }
88   return 0;
89 }
90 static void alter_fd(eventer_t e, int mask) {
91   if(mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
92     int events = 0;
93     if(mask & EVENTER_READ) events |= POLLIN;
94     if(mask & EVENTER_WRITE) events |= POLLOUT;
95     if(mask & EVENTER_EXCEPTION) events |= POLLERR;
96     if(port_associate(port_fd, PORT_SOURCE_FD, e->fd, events, e) == -1) {
97       noitL(eventer_err,
98             "eventer port_associate failed: %s\n", strerror(errno));
99       abort();
100     }
101   }
102   else {
103     if(port_dissociate(port_fd, PORT_SOURCE_FD, e->fd) == -1) {
104       if(errno == ENOENT) return; /* Fine */
105       if(errno == EBADFD) return; /* Fine */
106       noitL(eventer_err,
107             "eventer port_dissociate failed: %s\n", strerror(errno));
108       abort();
109     }
110   }
111 }
112 static void eventer_ports_impl_add(eventer_t e) {
113   assert(e->mask);
114   ev_lock_state_t lockstate;
115   const char *cbname;
116   cbname = eventer_name_for_callback(e->callback);
117
118   if(e->mask & EVENTER_ASYNCH) {
119     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
120     eventer_add_asynch(NULL, e);
121     return;
122   }
123
124   /* Recurrent delegation */
125   if(e->mask & EVENTER_RECURRENT) {
126     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
127     eventer_add_recurrent(e);
128     return;
129   }
130
131   /* Timed events are simple */
132   if(e->mask & EVENTER_TIMER) {
133     noitL(eventer_deb, "debug: eventer_add timed (%s)\n", cbname ? cbname : "???");
134     pthread_mutex_lock(&te_lock);
135     noit_skiplist_insert(timed_events, e);
136     pthread_mutex_unlock(&te_lock);
137     return;
138   }
139
140   /* file descriptor event */
141   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
142   lockstate = acquire_master_fd(e->fd);
143   master_fds[e->fd].e = e;
144   alter_fd(e, e->mask);
145   release_master_fd(e->fd, lockstate);
146 }
147 static eventer_t eventer_ports_impl_remove(eventer_t e) {
148   eventer_t removed = NULL;
149   if(e->mask & EVENTER_ASYNCH) {
150     abort();
151   }
152   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
153     ev_lock_state_t lockstate;
154     lockstate = acquire_master_fd(e->fd);
155     if(e == master_fds[e->fd].e) {
156       removed = e;
157       master_fds[e->fd].e = NULL;
158       alter_fd(e, 0);
159     }
160     release_master_fd(e->fd, lockstate);
161   }
162   else if(e->mask & EVENTER_TIMER) {
163     pthread_mutex_lock(&te_lock);
164     if(noit_skiplist_remove_compare(timed_events, e, NULL,
165                                     noit_compare_voidptr))
166       removed = e;
167     pthread_mutex_unlock(&te_lock);
168   }
169   else if(e->mask & EVENTER_RECURRENT) {
170     removed = eventer_remove_recurrent(e);
171   }
172   else {
173     abort();
174   }
175   return removed;
176 }
177 static void eventer_ports_impl_update(eventer_t e, int mask) {
178   if(e->mask & EVENTER_TIMER) {
179     assert(mask & EVENTER_TIMER);
180     pthread_mutex_lock(&te_lock);
181     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
182     noit_skiplist_insert(timed_events, e);
183     pthread_mutex_unlock(&te_lock);
184     return;
185   }
186   alter_fd(e, mask);
187   e->mask = mask;
188 }
189 static eventer_t eventer_ports_impl_remove_fd(int fd) {
190   eventer_t eiq = NULL;
191   ev_lock_state_t lockstate;
192   if(master_fds[fd].e) {
193     lockstate = acquire_master_fd(fd);
194     eiq = master_fds[fd].e;
195     master_fds[fd].e = NULL;
196     alter_fd(eiq, 0);
197     release_master_fd(fd, lockstate);
198   }
199   return eiq;
200 }
201 static eventer_t eventer_ports_impl_find_fd(int fd) {
202   return master_fds[fd].e;
203 }
204 static int eventer_ports_impl_loop() {
205   int is_master_thread = 0;
206   pthread_t self;
207
208   self = pthread_self();
209   if(pthread_equal(self, master_thread)) is_master_thread = 1;
210
211   while(1) {
212     const char *cbname;
213     struct timeval __now, __sleeptime;
214     struct timespec __ports_sleeptime;
215     unsigned int fd_cnt = 0;
216     int ret;
217     port_event_t pevents[MAX_PORT_EVENTS];
218     int max_timed_events_to_process;
219     int newmask;
220
221     __sleeptime = __max_sleeptime;
222
223     /* Handle timed events...
224      * we could be multithreaded, so if we pop forever we could starve
225      * ourselves. */
226     max_timed_events_to_process = timed_events->size;
227     while(max_timed_events_to_process-- > 0) {
228       eventer_t timed_event;
229
230       gettimeofday(&__now, NULL);
231
232       pthread_mutex_lock(&te_lock);
233       /* Peek at our next timed event, if should fire, pop it.
234        * otherwise we noop and NULL it out to break the loop. */
235       timed_event = noit_skiplist_peek(timed_events);
236       if(timed_event) {
237         if(compare_timeval(timed_event->whence, __now) < 0) {
238           timed_event = noit_skiplist_pop(timed_events, NULL);
239         }
240         else {
241           sub_timeval(timed_event->whence, __now, &__sleeptime);
242           timed_event = NULL;
243         }
244       }
245       pthread_mutex_unlock(&te_lock);
246       if(timed_event == NULL) break;
247
248       cbname = eventer_name_for_callback(timed_event->callback);
249       noitLT(eventer_deb, &__now, "debug: timed dispatch(%s)\n", cbname ? cbname : "???");
250       /* Make our call */
251       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
252                                       timed_event->closure, &__now);
253       if(newmask)
254         eventer_add(timed_event);
255       else
256         eventer_free(timed_event);
257     }
258
259     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
260       /* we exceed our configured maximum, set it down */
261       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
262     }
263
264     /* Handle recurrent events */
265     eventer_dispatch_recurrent(&__now);
266
267     /* Now we move on to our fd-based events */
268     __ports_sleeptime.tv_sec = __sleeptime.tv_sec;
269     __ports_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
270     fd_cnt = 1;
271     ret = port_getn(port_fd, pevents, MAX_PORT_EVENTS, &fd_cnt,
272                     &__ports_sleeptime);
273     noitLT(eventer_deb, &__now, "debug: port_getn(%d, [], %d) => %d\n", port_fd,
274            fd_cnt, ret);
275     if(ret < 0)
276       noitLT(eventer_deb, &__now, "port_getn: %s\n", strerror(errno));
277
278     if(ret == -1 && (errno == ETIME)) fd_cnt = 0;
279     else if(ret == -1 && (errno == EINTR)) fd_cnt = 0;
280     else if(ret == -1)
281       noitLT(eventer_err, &__now, "port_getn: %s\n", strerror(errno));
282
283     if(fd_cnt > 0) {
284       int idx;
285       /* Loop a last time to process */
286       for(idx = 0; idx < fd_cnt; idx++) {
287         ev_lock_state_t lockstate;
288         port_event_t *pe;
289         eventer_t e;
290         int fd, oldmask, mask;
291
292         pe = &pevents[idx];
293         if(pe->portev_source == PORT_SOURCE_FD)
294         e = (eventer_t)pe->portev_user;
295         fd = (int)pe->portev_object;
296         mask = 0;
297         if(pe->portev_events & (POLLIN | POLLHUP))
298           mask |= EVENTER_READ;
299         if(pe->portev_events & (POLLOUT))
300           mask |= EVENTER_WRITE;
301         if(pe->portev_events & (POLLERR | POLLHUP | POLLNVAL))
302           mask |= EVENTER_EXCEPTION;
303
304         /* It's possible that someone removed the event and freed it
305          * before we got here.
306          */
307         if(e != master_fds[fd].e) continue;
308         lockstate = acquire_master_fd(fd);
309         assert(lockstate == EV_OWNED);
310
311         gettimeofday(&__now, NULL);
312         oldmask = e->mask;
313         cbname = eventer_name_for_callback(e->callback);
314         noitLT(eventer_deb, &__now, "ports: fire on %d/%x to %s(%p)\n",
315                fd, mask, cbname?cbname:"???", e->callback);
316         newmask = e->callback(e, mask, e->closure, &__now);
317
318         if(newmask) {
319           alter_fd(e, newmask);
320           /* Set our mask */
321           e->mask = newmask;
322         }
323         else {
324           /*
325            * Long story long:
326            *  When integrating with a few external event systems, we find
327            *  it difficult to make their use of remove+add as an update
328            *  as it can be recurrent in a single handler call and you cannot
329            *  remove completely from the event system if you are going to
330            *  just update (otherwise the eventer_t in your call stack could
331            *  be stale).  What we do is perform a superficial remove, marking
332            *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
333            *  we already have an event, we just update the mask (as we
334            *  have not yet returned to the eventer's loop.
335            *  This leaves us in a tricky situation when a remove is called
336            *  and the add doesn't roll in, we return 0 (mask == 0) and hit
337            *  this spot.  We have intended to remove the event, but it still
338            *  resides at master_fds[fd].e -- even after we free it.
339            *  So, in the evnet that we return 0 and the event that
340            *  master_fds[fd].e == the event we're about to free... we NULL
341            *  it out.
342            */
343           if(master_fds[fd].e == e) master_fds[fd].e = NULL;
344           eventer_free(e);
345         }
346         release_master_fd(fd, lockstate);
347       }
348     }
349   }
350   /* NOTREACHED */
351 }
352
353 struct _eventer_impl eventer_ports_impl = {
354   "ports",
355   eventer_ports_impl_init,
356   eventer_ports_impl_propset,
357   eventer_ports_impl_add,
358   eventer_ports_impl_remove,
359   eventer_ports_impl_update,
360   eventer_ports_impl_remove_fd,
361   eventer_ports_impl_find_fd,
362   eventer_ports_impl_loop
363 };
Note: See TracBrowser for help on using the browser.