root/src/eventer/eventer_ports_impl.c

Revision 88a71780101cbf23034aa0cb840f9f0368fda2dd, 12.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fixes #126

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_atomic.h"
36 #include "utils/noit_skiplist.h"
37 #include "utils/noit_log.h"
38
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <signal.h>
43 #include <port.h>
44 #include <pthread.h>
45 #include <assert.h>
46
47 #define MAX_PORT_EVENTS 1024
48
49 static struct timeval __max_sleeptime = { 0, 200000 }; /* 200ms */
50 static int maxfds;
51 static struct {
52   eventer_t e;
53   pthread_t executor;
54   noit_spinlock_t lock;
55 } *master_fds = NULL;
56
57 typedef enum { EV_OWNED, EV_ALREADY_OWNED } ev_lock_state_t;
58
59 static ev_lock_state_t
60 acquire_master_fd(int fd) {
61   if(noit_spinlock_trylock(&master_fds[fd].lock)) {
62     master_fds[fd].executor = pthread_self();
63     return EV_OWNED;
64   }
65   if(pthread_equal(master_fds[fd].executor, pthread_self())) {
66     return EV_ALREADY_OWNED;
67   }
68   noit_spinlock_lock(&master_fds[fd].lock);
69   master_fds[fd].executor = pthread_self();
70   return EV_OWNED;
71 }
72 static void
73 release_master_fd(int fd, ev_lock_state_t as) {
74   if(as == EV_OWNED) {
75     memset(&master_fds[fd].executor, 0, sizeof(master_fds[fd].executor));
76     noit_spinlock_unlock(&master_fds[fd].lock);
77   }
78 }
79
80 static pthread_t master_thread;
81 static int port_fd = -1;
82
83 static pthread_mutex_t te_lock;
84 static noit_skiplist *timed_events = NULL;
85
86 static int eventer_ports_impl_init() {
87   struct rlimit rlim;
88   int rv;
89
90   /* super init */
91   if((rv = eventer_impl_init()) != 0) return rv;
92
93   master_thread = pthread_self();
94   signal(SIGPIPE, SIG_IGN);
95   port_fd = port_create();
96   if(port_fd == -1) {
97     return -1;
98   }
99   pthread_mutex_init(&te_lock, NULL);
100   getrlimit(RLIMIT_NOFILE, &rlim);
101   maxfds = rlim.rlim_cur;
102   master_fds = calloc(maxfds, sizeof(*master_fds));
103   timed_events = calloc(1, sizeof(*timed_events));
104   noit_skiplist_init(timed_events);
105   noit_skiplist_set_compare(timed_events,
106                             eventer_timecompare, eventer_timecompare);
107   noit_skiplist_add_index(timed_events,
108                           noit_compare_voidptr, noit_compare_voidptr);
109   return 0;
110 }
111 static int eventer_ports_impl_propset(const char *key, const char *value) {
112   if(eventer_impl_propset(key, value)) {
113     return -1;
114   }
115   return 0;
116 }
117 static void alter_fd(eventer_t e, int mask) {
118   if(mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
119     int events = 0;
120     if(mask & EVENTER_READ) events |= POLLIN;
121     if(mask & EVENTER_WRITE) events |= POLLOUT;
122     if(mask & EVENTER_EXCEPTION) events |= POLLERR;
123     if(port_associate(port_fd, PORT_SOURCE_FD, e->fd, events, (void *)e->fd) == -1) {
124       noitL(eventer_err,
125             "eventer port_associate failed: %s\n", strerror(errno));
126       abort();
127     }
128   }
129   else {
130     if(port_dissociate(port_fd, PORT_SOURCE_FD, e->fd) == -1) {
131       if(errno == ENOENT) return; /* Fine */
132       if(errno == EBADFD) return; /* Fine */
133       noitL(eventer_err,
134             "eventer port_dissociate failed: %s\n", strerror(errno));
135       abort();
136     }
137   }
138 }
139 static void eventer_ports_impl_add(eventer_t e) {
140   assert(e->mask);
141   ev_lock_state_t lockstate;
142   const char *cbname;
143   cbname = eventer_name_for_callback(e->callback);
144
145   if(e->mask & EVENTER_ASYNCH) {
146     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
147     eventer_add_asynch(NULL, e);
148     return;
149   }
150
151   /* Recurrent delegation */
152   if(e->mask & EVENTER_RECURRENT) {
153     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
154     eventer_add_recurrent(e);
155     return;
156   }
157
158   /* Timed events are simple */
159   if(e->mask & EVENTER_TIMER) {
160     noitL(eventer_deb, "debug: eventer_add timed (%s)\n", cbname ? cbname : "???");
161     pthread_mutex_lock(&te_lock);
162     noit_skiplist_insert(timed_events, e);
163     pthread_mutex_unlock(&te_lock);
164     return;
165   }
166
167   /* file descriptor event */
168   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
169   lockstate = acquire_master_fd(e->fd);
170   master_fds[e->fd].e = e;
171   alter_fd(e, e->mask);
172   release_master_fd(e->fd, lockstate);
173 }
174 static eventer_t eventer_ports_impl_remove(eventer_t e) {
175   eventer_t removed = NULL;
176   if(e->mask & EVENTER_ASYNCH) {
177     abort();
178   }
179   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
180     ev_lock_state_t lockstate;
181     lockstate = acquire_master_fd(e->fd);
182     if(e == master_fds[e->fd].e) {
183       removed = e;
184       master_fds[e->fd].e = NULL;
185       alter_fd(e, 0);
186     }
187     release_master_fd(e->fd, lockstate);
188   }
189   else if(e->mask & EVENTER_TIMER) {
190     pthread_mutex_lock(&te_lock);
191     if(noit_skiplist_remove_compare(timed_events, e, NULL,
192                                     noit_compare_voidptr))
193       removed = e;
194     pthread_mutex_unlock(&te_lock);
195   }
196   else if(e->mask & EVENTER_RECURRENT) {
197     removed = eventer_remove_recurrent(e);
198   }
199   else {
200     abort();
201   }
202   return removed;
203 }
204 static void eventer_ports_impl_update(eventer_t e, int mask) {
205   if(e->mask & EVENTER_TIMER) {
206     assert(mask & EVENTER_TIMER);
207     pthread_mutex_lock(&te_lock);
208     noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
209     noit_skiplist_insert(timed_events, e);
210     pthread_mutex_unlock(&te_lock);
211     return;
212   }
213   alter_fd(e, mask);
214   e->mask = mask;
215 }
216 static eventer_t eventer_ports_impl_remove_fd(int fd) {
217   eventer_t eiq = NULL;
218   ev_lock_state_t lockstate;
219   if(master_fds[fd].e) {
220     lockstate = acquire_master_fd(fd);
221     eiq = master_fds[fd].e;
222     master_fds[fd].e = NULL;
223     alter_fd(eiq, 0);
224     release_master_fd(fd, lockstate);
225   }
226   return eiq;
227 }
228 static eventer_t eventer_ports_impl_find_fd(int fd) {
229   return master_fds[fd].e;
230 }
231 static void
232 eventer_ports_impl_trigger(eventer_t e, int mask) {
233   ev_lock_state_t lockstate;
234   const char *cbname;
235   struct timeval __now;
236   int fd, oldmask, newmask;
237
238   fd = e->fd;
239   if(e != master_fds[fd].e) return;
240   lockstate = acquire_master_fd(fd);
241   if(lockstate == EV_ALREADY_OWNED) return;
242   assert(lockstate == EV_OWNED);
243
244   gettimeofday(&__now, NULL);
245   oldmask = e->mask;
246   cbname = eventer_name_for_callback(e->callback);
247   noitLT(eventer_deb, &__now, "ports: fire on %d/%x to %s(%p)\n",
248          fd, mask, cbname?cbname:"???", e->callback);
249   newmask = e->callback(e, mask, e->closure, &__now);
250
251   if(newmask) {
252     alter_fd(e, newmask);
253     /* Set our mask */
254     e->mask = newmask;
255   }
256   else {
257     /*
258      * Long story long:
259      *  When integrating with a few external event systems, we find
260      *  it difficult to make their use of remove+add as an update
261      *  as it can be recurrent in a single handler call and you cannot
262      *  remove completely from the event system if you are going to
263      *  just update (otherwise the eventer_t in your call stack could
264      *  be stale).  What we do is perform a superficial remove, marking
265      *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
266      *  we already have an event, we just update the mask (as we
267      *  have not yet returned to the eventer's loop.
268      *  This leaves us in a tricky situation when a remove is called
269      *  and the add doesn't roll in, we return 0 (mask == 0) and hit
270      *  this spot.  We have intended to remove the event, but it still
271      *  resides at master_fds[fd].e -- even after we free it.
272      *  So, in the evnet that we return 0 and the event that
273      *  master_fds[fd].e == the event we're about to free... we NULL
274      *  it out.
275      */
276     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
277     eventer_free(e);
278   }
279   release_master_fd(fd, lockstate);
280 }
281 static int eventer_ports_impl_loop() {
282   int is_master_thread = 0;
283   pthread_t self;
284
285   self = pthread_self();
286   if(pthread_equal(self, master_thread)) is_master_thread = 1;
287
288   while(1) {
289     const char *cbname;
290     struct timeval __now, __sleeptime;
291     struct timespec __ports_sleeptime;
292     unsigned int fd_cnt = 0;
293     int ret;
294     port_event_t pevents[MAX_PORT_EVENTS];
295     int max_timed_events_to_process;
296     int newmask;
297
298     __sleeptime = __max_sleeptime;
299
300     /* Handle timed events...
301      * we could be multithreaded, so if we pop forever we could starve
302      * ourselves. */
303     max_timed_events_to_process = timed_events->size;
304     while(max_timed_events_to_process-- > 0) {
305       eventer_t timed_event;
306
307       gettimeofday(&__now, NULL);
308
309       pthread_mutex_lock(&te_lock);
310       /* Peek at our next timed event, if should fire, pop it.
311        * otherwise we noop and NULL it out to break the loop. */
312       timed_event = noit_skiplist_peek(timed_events);
313       if(timed_event) {
314         if(compare_timeval(timed_event->whence, __now) < 0) {
315           timed_event = noit_skiplist_pop(timed_events, NULL);
316         }
317         else {
318           sub_timeval(timed_event->whence, __now, &__sleeptime);
319           timed_event = NULL;
320         }
321       }
322       pthread_mutex_unlock(&te_lock);
323       if(timed_event == NULL) break;
324
325       cbname = eventer_name_for_callback(timed_event->callback);
326       noitLT(eventer_deb, &__now, "debug: timed dispatch(%s)\n", cbname ? cbname : "???");
327       /* Make our call */
328       newmask = timed_event->callback(timed_event, EVENTER_TIMER,
329                                       timed_event->closure, &__now);
330       if(newmask)
331         eventer_add(timed_event);
332       else
333         eventer_free(timed_event);
334     }
335
336     if(compare_timeval(__max_sleeptime, __sleeptime) < 0) {
337       /* we exceed our configured maximum, set it down */
338       memcpy(&__sleeptime, &__max_sleeptime, sizeof(__sleeptime));
339     }
340
341     /* Handle recurrent events */
342     eventer_dispatch_recurrent(&__now);
343
344     /* Now we move on to our fd-based events */
345     __ports_sleeptime.tv_sec = __sleeptime.tv_sec;
346     __ports_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
347     fd_cnt = 1;
348     ret = port_getn(port_fd, pevents, MAX_PORT_EVENTS, &fd_cnt,
349                     &__ports_sleeptime);
350     noitLT(eventer_deb, &__now, "debug: port_getn(%d, [], %d) => %d\n", port_fd,
351            fd_cnt, ret);
352     if(ret < 0)
353       noitLT(eventer_deb, &__now, "port_getn: %s\n", strerror(errno));
354
355     if(ret == -1 && (errno == ETIME)) fd_cnt = 0;
356     else if(ret == -1 && (errno == EINTR)) fd_cnt = 0;
357     else if(ret == -1)
358       noitLT(eventer_err, &__now, "port_getn: %s\n", strerror(errno));
359
360     if(fd_cnt > 0) {
361       int idx;
362       /* Loop a last time to process */
363       for(idx = 0; idx < fd_cnt; idx++) {
364         port_event_t *pe;
365         eventer_t e;
366         int fd, oldmask, mask;
367
368         pe = &pevents[idx];
369         if(pe->portev_source != PORT_SOURCE_FD) continue;
370         fd = (int)pe->portev_object;
371         assert((int)pe->portev_user == fd);
372         e = master_fds[fd].e;
373         mask = 0;
374         if(pe->portev_events & (POLLIN | POLLHUP))
375           mask |= EVENTER_READ;
376         if(pe->portev_events & (POLLOUT))
377           mask |= EVENTER_WRITE;
378         if(pe->portev_events & (POLLERR | POLLHUP | POLLNVAL))
379           mask |= EVENTER_EXCEPTION;
380
381         /* It's possible that someone removed the event and freed it
382          * before we got here.
383          */
384         eventer_ports_impl_trigger(e, mask);
385       }
386     }
387   }
388   /* NOTREACHED */
389 }
390
391 struct _eventer_impl eventer_ports_impl = {
392   "ports",
393   eventer_ports_impl_init,
394   eventer_ports_impl_propset,
395   eventer_ports_impl_add,
396   eventer_ports_impl_remove,
397   eventer_ports_impl_update,
398   eventer_ports_impl_remove_fd,
399   eventer_ports_impl_find_fd,
400   eventer_ports_impl_trigger,
401   eventer_ports_impl_loop
402 };
Note: See TracBrowser for help on using the browser.