root/src/eventer/eventer_ports_impl.c

Revision aa91f4eac14396cf1481ec845de3940816d846af, 11.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 months ago)

allow eventer callback names to be more expressive

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_atomic.h"
36 #include "utils/noit_skiplist.h"
37 #include "utils/noit_log.h"
38 #include "dtrace_probes.h"
39
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <signal.h>
44 #include <port.h>
45 #include <pthread.h>
46 #include <assert.h>
47
48 #define MAX_PORT_EVENTS 1024
49
50 struct _eventer_impl eventer_ports_impl;
51 #define LOCAL_EVENTER eventer_ports_impl
52 #define LOCAL_EVENTER_foreach_fdevent eventer_ports_impl_foreach_fdevent
53 #define LOCAL_EVENTER_foreach_timedevent eventer_ports_impl_foreach_timedevent
54 #define maxfds LOCAL_EVENTER.maxfds
55 #define master_fds LOCAL_EVENTER.master_fds
56
57 #include "eventer/eventer_impl_private.h"
58
59 static const struct timeval __dyna_increment = { 0, 1000 }; /* 1 ms */
60 static int port_fd = -1;
61
62 static int eventer_ports_impl_init() {
63   struct rlimit rlim;
64   int rv;
65
66   /* super init */
67   if((rv = eventer_impl_init()) != 0) return rv;
68
69   signal(SIGPIPE, SIG_IGN);
70   port_fd = port_create();
71   if(port_fd == -1) {
72     return -1;
73   }
74   getrlimit(RLIMIT_NOFILE, &rlim);
75   maxfds = rlim.rlim_cur;
76   master_fds = calloc(maxfds, sizeof(*master_fds));
77   return 0;
78 }
79 static int eventer_ports_impl_propset(const char *key, const char *value) {
80   if(eventer_impl_propset(key, value)) {
81     return -1;
82   }
83   return 0;
84 }
85 static void alter_fd(eventer_t e, int mask) {
86   if(mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
87     int events = 0;
88     if(mask & EVENTER_READ) events |= POLLIN;
89     if(mask & EVENTER_WRITE) events |= POLLOUT;
90     if(mask & EVENTER_EXCEPTION) events |= POLLERR;
91     if(port_associate(port_fd, PORT_SOURCE_FD, e->fd, events, (void *)(vpsized_int)e->fd) == -1) {
92       noitL(eventer_err,
93             "eventer port_associate failed: %s\n", strerror(errno));
94       abort();
95     }
96   }
97   else {
98     if(port_dissociate(port_fd, PORT_SOURCE_FD, e->fd) == -1) {
99       if(errno == ENOENT) return; /* Fine */
100       if(errno == EBADFD) return; /* Fine */
101       noitL(eventer_err,
102             "eventer port_dissociate failed: %s\n", strerror(errno));
103       abort();
104     }
105   }
106 }
107 static void eventer_ports_impl_add(eventer_t e) {
108   assert(e->mask);
109   ev_lock_state_t lockstate;
110   const char *cbname;
111   cbname = eventer_name_for_callback_e(e->callback, e);
112
113   if(e->mask & EVENTER_ASYNCH) {
114     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
115     eventer_add_asynch(NULL, e);
116     return;
117   }
118
119   /* Recurrent delegation */
120   if(e->mask & EVENTER_RECURRENT) {
121     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
122     eventer_add_recurrent(e);
123     return;
124   }
125
126   /* Timed events are simple */
127   if(e->mask & EVENTER_TIMER) {
128     eventer_add_timed(e);
129     return;
130   }
131
132   /* file descriptor event */
133   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
134   lockstate = acquire_master_fd(e->fd);
135   assert(e->whence.tv_sec == 0 && e->whence.tv_usec == 0);
136   master_fds[e->fd].e = e;
137   alter_fd(e, e->mask);
138   release_master_fd(e->fd, lockstate);
139 }
140 static eventer_t eventer_ports_impl_remove(eventer_t e) {
141   eventer_t removed = NULL;
142   if(e->mask & EVENTER_ASYNCH) {
143     abort();
144   }
145   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
146     ev_lock_state_t lockstate;
147     lockstate = acquire_master_fd(e->fd);
148     if(e == master_fds[e->fd].e) {
149       removed = e;
150       master_fds[e->fd].e = NULL;
151       alter_fd(e, 0);
152     }
153     release_master_fd(e->fd, lockstate);
154   }
155   else if(e->mask & EVENTER_TIMER) {
156     removed = eventer_remove_timed(e);
157   }
158   else if(e->mask & EVENTER_RECURRENT) {
159     removed = eventer_remove_recurrent(e);
160   }
161   else {
162     abort();
163   }
164   return removed;
165 }
166 static void eventer_ports_impl_update(eventer_t e, int mask) {
167   if(e->mask & EVENTER_TIMER) {
168     eventer_update_timed(e,mask);
169     return;
170   }
171   alter_fd(e, mask);
172   e->mask = mask;
173 }
174 static eventer_t eventer_ports_impl_remove_fd(int fd) {
175   eventer_t eiq = NULL;
176   ev_lock_state_t lockstate;
177   if(master_fds[fd].e) {
178     lockstate = acquire_master_fd(fd);
179     eiq = master_fds[fd].e;
180     master_fds[fd].e = NULL;
181     alter_fd(eiq, 0);
182     release_master_fd(fd, lockstate);
183   }
184   return eiq;
185 }
186 static eventer_t eventer_ports_impl_find_fd(int fd) {
187   return master_fds[fd].e;
188 }
189 static void
190 eventer_ports_impl_trigger(eventer_t e, int mask) {
191   ev_lock_state_t lockstate;
192   const char *cbname;
193   struct timeval __now;
194   int fd, newmask;
195
196   fd = e->fd;
197   if(e != master_fds[fd].e) return;
198   lockstate = acquire_master_fd(fd);
199   if(lockstate == EV_ALREADY_OWNED) return;
200   assert(lockstate == EV_OWNED);
201
202   gettimeofday(&__now, NULL);
203   cbname = eventer_name_for_callback_e(e->callback, e);
204   noitLT(eventer_deb, &__now, "ports: fire on %d/%x to %s(%p)\n",
205          fd, mask, cbname?cbname:"???", e->callback);
206   EVENTER_CALLBACK_ENTRY((void *)e->callback, (char *)cbname, fd, e->mask, mask);
207   newmask = e->callback(e, mask, e->closure, &__now);
208   EVENTER_CALLBACK_RETURN((void *)e->callback, (char *)cbname, newmask);
209
210   if(newmask) {
211     alter_fd(e, newmask);
212     /* Set our mask */
213     e->mask = newmask;
214     noitLT(eventer_deb, &__now, "ports: complete on %d/(%x->%x) to %s(%p)\n",
215            fd, mask, newmask, cbname?cbname:"???", e->callback);
216   }
217   else {
218     noitLT(eventer_deb, &__now, "ports: complete on %d/none to %s(%p)\n",
219            fd, cbname?cbname:"???", e->callback);
220     /*
221      * Long story long:
222      *  When integrating with a few external event systems, we find
223      *  it difficult to make their use of remove+add as an update
224      *  as it can be recurrent in a single handler call and you cannot
225      *  remove completely from the event system if you are going to
226      *  just update (otherwise the eventer_t in your call stack could
227      *  be stale).  What we do is perform a superficial remove, marking
228      *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
229      *  we already have an event, we just update the mask (as we
230      *  have not yet returned to the eventer's loop.
231      *  This leaves us in a tricky situation when a remove is called
232      *  and the add doesn't roll in, we return 0 (mask == 0) and hit
233      *  this spot.  We have intended to remove the event, but it still
234      *  resides at master_fds[fd].e -- even after we free it.
235      *  So, in the evnet that we return 0 and the event that
236      *  master_fds[fd].e == the event we're about to free... we NULL
237      *  it out.
238      */
239     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
240     eventer_free(e);
241   }
242   release_master_fd(fd, lockstate);
243 }
244 static int eventer_ports_impl_loop() {
245   struct timeval __dyna_sleep = { 0, 0 };
246
247   while(1) {
248     struct timeval __now, __sleeptime;
249     struct timespec __ports_sleeptime;
250     unsigned int fd_cnt = 0;
251     int ret;
252     port_event_t pevents[MAX_PORT_EVENTS];
253
254     if(compare_timeval(eventer_max_sleeptime, __dyna_sleep) < 0)
255       __dyna_sleep = eventer_max_sleeptime;
256  
257     __sleeptime = __dyna_sleep;
258
259     eventer_dispatch_timed(&__now, &__sleeptime);
260
261     if(compare_timeval(__sleeptime, __dyna_sleep) > 0)
262       __sleeptime = __dyna_sleep;
263
264     /* Handle recurrent events */
265     eventer_dispatch_recurrent(&__now);
266
267     /* Now we move on to our fd-based events */
268     __ports_sleeptime.tv_sec = __sleeptime.tv_sec;
269     __ports_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
270     fd_cnt = 1;
271
272     pevents[0].portev_source = 65535; /* This is impossible */
273
274     ret = port_getn(port_fd, pevents, MAX_PORT_EVENTS, &fd_cnt,
275                     &__ports_sleeptime);
276     /* The timeout case is a tad complex with ports.  -1/ETIME is clearly
277      * a timeout.  However, it i spossible that we got that and fd_cnt isn't
278      * 0, which means we both timed out and got events... WTF?
279      */
280     if(fd_cnt == 0 ||
281        (ret == -1 && errno == ETIME && pevents[0].portev_source == 65535))
282       add_timeval(__dyna_sleep, __dyna_increment, &__dyna_sleep);
283
284     if(ret == -1 && (errno != ETIME && errno != EINTR))
285       noitLT(eventer_err, &__now, "port_getn: %s\n", strerror(errno));
286
287     if(ret < 0)
288       noitLT(eventer_deb, &__now, "port_getn: %s\n", strerror(errno));
289
290     noitLT(eventer_deb, &__now, "debug: port_getn(%d, [], %d) => %d\n", port_fd,
291            fd_cnt, ret);
292
293     if(pevents[0].portev_source == 65535) {
294       /* the impossible still remains, which means our fd_cnt _must_ be 0 */
295       fd_cnt = 0;
296     }
297
298     if(fd_cnt > 0) {
299       int idx;
300       /* Loop a last time to process */
301       __dyna_sleep.tv_sec = __dyna_sleep.tv_usec = 0; /* reset */
302       for(idx = 0; idx < fd_cnt; idx++) {
303         port_event_t *pe;
304         eventer_t e;
305         int fd, mask;
306
307         pe = &pevents[idx];
308         if(pe->portev_source != PORT_SOURCE_FD) continue;
309         fd = (int)pe->portev_object;
310         assert((vpsized_int)pe->portev_user == fd);
311         e = master_fds[fd].e;
312         mask = 0;
313         if(pe->portev_events & (POLLIN | POLLHUP))
314           mask |= EVENTER_READ;
315         if(pe->portev_events & (POLLOUT))
316           mask |= EVENTER_WRITE;
317         if(pe->portev_events & (POLLERR | POLLHUP | POLLNVAL))
318           mask |= EVENTER_EXCEPTION;
319
320         /* It's possible that someone removed the event and freed it
321          * before we got here.
322          */
323         eventer_ports_impl_trigger(e, mask);
324       }
325     }
326   }
327   /* NOTREACHED */
328   return 0;
329 }
330
331 static void
332 eventer_ports_impl_wakeup() {
333   port_send(port_fd, 0, NULL);
334 }
335
336 struct _eventer_impl eventer_ports_impl = {
337   "ports",
338   eventer_ports_impl_init,
339   eventer_ports_impl_propset,
340   eventer_ports_impl_add,
341   eventer_ports_impl_remove,
342   eventer_ports_impl_update,
343   eventer_ports_impl_remove_fd,
344   eventer_ports_impl_find_fd,
345   eventer_ports_impl_trigger,
346   eventer_ports_impl_loop,
347   eventer_ports_impl_foreach_fdevent,
348   eventer_ports_impl_wakeup,
349   { 0, 200000 },
350   0,
351   NULL
352 };
Note: See TracBrowser for help on using the browser.