root/src/eventer/eventer_ports_impl.c

Revision 31d42e564259174c10ca8df5d8b206096a29c957, 10.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

This pulls the timer stuff into the shared base and consolidates a lot
of repeated code across the different scheduler implementations.

times and fdevents are API exposed now and the console exposes them via:

show eventer debug timers
show eventer debug sockets

(the console stuff need to be cleaned up to support autocomplete)

refs #221

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_atomic.h"
36 #include "utils/noit_skiplist.h"
37 #include "utils/noit_log.h"
38
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <signal.h>
43 #include <port.h>
44 #include <pthread.h>
45 #include <assert.h>
46
47 #define MAX_PORT_EVENTS 1024
48
49 struct _eventer_impl eventer_ports_impl;
50 #define LOCAL_EVENTER eventer_ports_impl
51 #define LOCAL_EVENTER_foreach_fdevent eventer_ports_impl_foreach_fdevent
52 #define LOCAL_EVENTER_foreach_timedevent eventer_ports_impl_foreach_timedevent
53 #define maxfds LOCAL_EVENTER.maxfds
54 #define master_fds LOCAL_EVENTER.master_fds
55
56 #include "eventer/eventer_impl_private.h"
57
58 static pthread_t master_thread;
59 static int port_fd = -1;
60
61 static int eventer_ports_impl_init() {
62   struct rlimit rlim;
63   int rv;
64
65   /* super init */
66   if((rv = eventer_impl_init()) != 0) return rv;
67
68   master_thread = pthread_self();
69   signal(SIGPIPE, SIG_IGN);
70   port_fd = port_create();
71   if(port_fd == -1) {
72     return -1;
73   }
74   getrlimit(RLIMIT_NOFILE, &rlim);
75   maxfds = rlim.rlim_cur;
76   master_fds = calloc(maxfds, sizeof(*master_fds));
77   return 0;
78 }
79 static int eventer_ports_impl_propset(const char *key, const char *value) {
80   if(eventer_impl_propset(key, value)) {
81     return -1;
82   }
83   return 0;
84 }
85 static void alter_fd(eventer_t e, int mask) {
86   if(mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
87     int events = 0;
88     if(mask & EVENTER_READ) events |= POLLIN;
89     if(mask & EVENTER_WRITE) events |= POLLOUT;
90     if(mask & EVENTER_EXCEPTION) events |= POLLERR;
91     if(port_associate(port_fd, PORT_SOURCE_FD, e->fd, events, (void *)e->fd) == -1) {
92       noitL(eventer_err,
93             "eventer port_associate failed: %s\n", strerror(errno));
94       abort();
95     }
96   }
97   else {
98     if(port_dissociate(port_fd, PORT_SOURCE_FD, e->fd) == -1) {
99       if(errno == ENOENT) return; /* Fine */
100       if(errno == EBADFD) return; /* Fine */
101       noitL(eventer_err,
102             "eventer port_dissociate failed: %s\n", strerror(errno));
103       abort();
104     }
105   }
106 }
107 static void eventer_ports_impl_add(eventer_t e) {
108   assert(e->mask);
109   ev_lock_state_t lockstate;
110   const char *cbname;
111   cbname = eventer_name_for_callback(e->callback);
112
113   if(e->mask & EVENTER_ASYNCH) {
114     noitL(eventer_deb, "debug: eventer_add asynch (%s)\n", cbname ? cbname : "???");
115     eventer_add_asynch(NULL, e);
116     return;
117   }
118
119   /* Recurrent delegation */
120   if(e->mask & EVENTER_RECURRENT) {
121     noitL(eventer_deb, "debug: eventer_add recurrent (%s)\n", cbname ? cbname : "???");
122     eventer_add_recurrent(e);
123     return;
124   }
125
126   /* Timed events are simple */
127   if(e->mask & EVENTER_TIMER) {
128     eventer_add_timed(e);
129     return;
130   }
131
132   /* file descriptor event */
133   noitL(eventer_deb, "debug: eventer_add fd (%s,%d,0x%04x)\n", cbname ? cbname : "???", e->fd, e->mask);
134   lockstate = acquire_master_fd(e->fd);
135   master_fds[e->fd].e = e;
136   alter_fd(e, e->mask);
137   release_master_fd(e->fd, lockstate);
138 }
139 static eventer_t eventer_ports_impl_remove(eventer_t e) {
140   eventer_t removed = NULL;
141   if(e->mask & EVENTER_ASYNCH) {
142     abort();
143   }
144   if(e->mask & (EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION)) {
145     ev_lock_state_t lockstate;
146     lockstate = acquire_master_fd(e->fd);
147     if(e == master_fds[e->fd].e) {
148       removed = e;
149       master_fds[e->fd].e = NULL;
150       alter_fd(e, 0);
151     }
152     release_master_fd(e->fd, lockstate);
153   }
154   else if(e->mask & EVENTER_TIMER) {
155     removed = eventer_remove_timed(e);
156   }
157   else if(e->mask & EVENTER_RECURRENT) {
158     removed = eventer_remove_recurrent(e);
159   }
160   else {
161     abort();
162   }
163   return removed;
164 }
165 static void eventer_ports_impl_update(eventer_t e, int mask) {
166   if(e->mask & EVENTER_TIMER) {
167     eventer_update_timed(e,mask);
168     return;
169   }
170   alter_fd(e, mask);
171   e->mask = mask;
172 }
173 static eventer_t eventer_ports_impl_remove_fd(int fd) {
174   eventer_t eiq = NULL;
175   ev_lock_state_t lockstate;
176   if(master_fds[fd].e) {
177     lockstate = acquire_master_fd(fd);
178     eiq = master_fds[fd].e;
179     master_fds[fd].e = NULL;
180     alter_fd(eiq, 0);
181     release_master_fd(fd, lockstate);
182   }
183   return eiq;
184 }
185 static eventer_t eventer_ports_impl_find_fd(int fd) {
186   return master_fds[fd].e;
187 }
188 static void
189 eventer_ports_impl_trigger(eventer_t e, int mask) {
190   ev_lock_state_t lockstate;
191   const char *cbname;
192   struct timeval __now;
193   int fd, oldmask, newmask;
194
195   fd = e->fd;
196   if(e != master_fds[fd].e) return;
197   lockstate = acquire_master_fd(fd);
198   if(lockstate == EV_ALREADY_OWNED) return;
199   assert(lockstate == EV_OWNED);
200
201   gettimeofday(&__now, NULL);
202   oldmask = e->mask;
203   cbname = eventer_name_for_callback(e->callback);
204   noitLT(eventer_deb, &__now, "ports: fire on %d/%x to %s(%p)\n",
205          fd, mask, cbname?cbname:"???", e->callback);
206   newmask = e->callback(e, mask, e->closure, &__now);
207
208   if(newmask) {
209     alter_fd(e, newmask);
210     /* Set our mask */
211     e->mask = newmask;
212     noitLT(eventer_deb, &__now, "ports: complete on %d/(%x->%x) to %s(%p)\n",
213            fd, mask, newmask, cbname?cbname:"???", e->callback);
214   }
215   else {
216     noitLT(eventer_deb, &__now, "ports: complete on %d/none to %s(%p)\n",
217            fd, cbname?cbname:"???", e->callback);
218     /*
219      * Long story long:
220      *  When integrating with a few external event systems, we find
221      *  it difficult to make their use of remove+add as an update
222      *  as it can be recurrent in a single handler call and you cannot
223      *  remove completely from the event system if you are going to
224      *  just update (otherwise the eventer_t in your call stack could
225      *  be stale).  What we do is perform a superficial remove, marking
226      *  the mask as 0, but not eventer_remove_fd.  Then on an add, if
227      *  we already have an event, we just update the mask (as we
228      *  have not yet returned to the eventer's loop.
229      *  This leaves us in a tricky situation when a remove is called
230      *  and the add doesn't roll in, we return 0 (mask == 0) and hit
231      *  this spot.  We have intended to remove the event, but it still
232      *  resides at master_fds[fd].e -- even after we free it.
233      *  So, in the evnet that we return 0 and the event that
234      *  master_fds[fd].e == the event we're about to free... we NULL
235      *  it out.
236      */
237     if(master_fds[fd].e == e) master_fds[fd].e = NULL;
238     eventer_free(e);
239   }
240   release_master_fd(fd, lockstate);
241 }
242 static int eventer_ports_impl_loop() {
243   int is_master_thread = 0;
244   pthread_t self;
245
246   self = pthread_self();
247   if(pthread_equal(self, master_thread)) is_master_thread = 1;
248
249   while(1) {
250     const char *cbname;
251     struct timeval __now, __sleeptime;
252     struct timespec __ports_sleeptime;
253     unsigned int fd_cnt = 0;
254     int ret;
255     port_event_t pevents[MAX_PORT_EVENTS];
256     int max_timed_events_to_process;
257     int newmask;
258
259     __sleeptime = eventer_max_sleeptime;
260
261     eventer_dispatch_timed(&__now, &__sleeptime);
262
263     /* Handle recurrent events */
264     eventer_dispatch_recurrent(&__now);
265
266     /* Now we move on to our fd-based events */
267     __ports_sleeptime.tv_sec = __sleeptime.tv_sec;
268     __ports_sleeptime.tv_nsec = __sleeptime.tv_usec * 1000;
269     fd_cnt = 1;
270
271     pevents[0].portev_source = 65535; /* This is impossible */
272
273     ret = port_getn(port_fd, pevents, MAX_PORT_EVENTS, &fd_cnt,
274                     &__ports_sleeptime);
275     noitLT(eventer_deb, &__now, "debug: port_getn(%d, [], %d) => %d\n", port_fd,
276            fd_cnt, ret);
277     if(ret < 0)
278       noitLT(eventer_deb, &__now, "port_getn: %s\n", strerror(errno));
279
280     if(ret == -1 && (errno != ETIME && errno != EINTR))
281       noitLT(eventer_err, &__now, "port_getn: %s\n", strerror(errno));
282     if(pevents[0].portev_source == 65535) {
283       /* the impossible still remains, which means our fd_cnt _must_ be 0 */
284       fd_cnt = 0;
285     }
286
287     if(fd_cnt > 0) {
288       int idx;
289       /* Loop a last time to process */
290       for(idx = 0; idx < fd_cnt; idx++) {
291         port_event_t *pe;
292         eventer_t e;
293         int fd, oldmask, mask;
294
295         pe = &pevents[idx];
296         if(pe->portev_source != PORT_SOURCE_FD) continue;
297         fd = (int)pe->portev_object;
298         assert((int)pe->portev_user == fd);
299         e = master_fds[fd].e;
300         mask = 0;
301         if(pe->portev_events & (POLLIN | POLLHUP))
302           mask |= EVENTER_READ;
303         if(pe->portev_events & (POLLOUT))
304           mask |= EVENTER_WRITE;
305         if(pe->portev_events & (POLLERR | POLLHUP | POLLNVAL))
306           mask |= EVENTER_EXCEPTION;
307
308         /* It's possible that someone removed the event and freed it
309          * before we got here.
310          */
311         eventer_ports_impl_trigger(e, mask);
312       }
313     }
314   }
315   /* NOTREACHED */
316 }
317
318 struct _eventer_impl eventer_ports_impl = {
319   "ports",
320   eventer_ports_impl_init,
321   eventer_ports_impl_propset,
322   eventer_ports_impl_add,
323   eventer_ports_impl_remove,
324   eventer_ports_impl_update,
325   eventer_ports_impl_remove_fd,
326   eventer_ports_impl_find_fd,
327   eventer_ports_impl_trigger,
328   eventer_ports_impl_loop,
329   eventer_ports_impl_foreach_fdevent,
330   { 0, 200000 },
331   0,
332   NULL
333 };
Note: See TracBrowser for help on using the browser.