root/src/eventer/eventer_impl.c

Revision 31d42e564259174c10ca8df5d8b206096a29c957, 10.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

This pulls the timer stuff into the shared base and consolidates a lot
of repeated code across the different scheduler implementations.

times and fdevents are API exposed now and the console exposes them via:

show eventer debug timers
show eventer debug sockets

(the console stuff need to be cleaned up to support autocomplete)

refs #221

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_skiplist.h"
37 #include <pthread.h>
38 #include <assert.h>
39
40 static struct timeval *eventer_impl_epoch = NULL;
41 static int EVENTER_DEBUGGING = 0;
42 static pthread_mutex_t te_lock;
43 static noit_skiplist *timed_events = NULL;
44
45 #ifdef HAVE_KQUEUE
46 extern struct _eventer_impl eventer_kqueue_impl;
47 #endif
48 #ifdef HAVE_EPOLL
49 extern struct _eventer_impl eventer_epoll_impl;
50 #endif
51 #ifdef HAVE_PORTS
52 extern struct _eventer_impl eventer_ports_impl;
53 #endif
54
55 eventer_impl_t registered_eventers[] = {
56 #ifdef HAVE_KQUEUE
57   &eventer_kqueue_impl,
58 #endif
59 #ifdef HAVE_EPOLL
60   &eventer_epoll_impl,
61 #endif
62 #ifdef HAVE_PORTS
63   &eventer_ports_impl,
64 #endif
65   NULL
66 };
67
68 eventer_impl_t __eventer = NULL;
69 noit_log_stream_t eventer_err = NULL;
70 noit_log_stream_t eventer_deb = NULL;
71
72 static int __default_queue_threads = 5;
73 static eventer_jobq_t __global_backq, __default_jobq;
74 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
75 struct recurrent_events {
76   eventer_t e;
77   struct recurrent_events *next;
78 } *recurrent_events = NULL;
79
80
81 int eventer_impl_propset(const char *key, const char *value) {
82   if(!strcasecmp(key, "default_queue_threads")) {
83     __default_queue_threads = atoi(value);
84     if(__default_queue_threads < 1) {
85       noitL(noit_error, "default_queue_threads must be >= 1\n");
86       return -1;
87     }
88     return 0;
89   }
90   else if(!strcasecmp(key, "debugging")) {
91     if(strcmp(value, "0")) {
92       EVENTER_DEBUGGING = 1;
93       noitL(noit_error, "Enabling debugging from property\n");
94     }
95     return 0;
96   }
97   else if(!strcasecmp(key, "default_ca_chain")) {
98     /* used by eventer consumers */
99     return 0;
100   }
101   noitL(noit_error, "Warning: unknown eventer config '%s'\n", key);
102   return 0;
103 }
104
105 eventer_jobq_t *eventer_default_backq() {
106   return &__global_backq;
107 }
108
109 int eventer_get_epoch(struct timeval *epoch) {
110   if(!eventer_impl_epoch) return -1;
111   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
112   return 0;
113 }
114
115 int eventer_impl_init() {
116   int i;
117   eventer_t e;
118   char *evdeb;
119
120   evdeb = getenv("EVENTER_DEBUGGING");
121   if(evdeb) {
122     if(strcmp(evdeb, "0")) {
123       /* Set to anything but "0" turns debugging on */
124       EVENTER_DEBUGGING = 1;
125       noitL(noit_error, "Disabling eventer debugging from environment\n");
126     }
127     else {
128       EVENTER_DEBUGGING = 1;
129       noitL(noit_error, "Enabling eventer debugging from environment\n");
130     }
131   }
132   eventer_impl_epoch = malloc(sizeof(struct timeval));
133   gettimeofday(eventer_impl_epoch, NULL);
134   pthread_mutex_init(&te_lock, NULL);
135     timed_events = calloc(1, sizeof(*timed_events));
136   noit_skiplist_init(timed_events);
137   noit_skiplist_set_compare(timed_events,
138                             eventer_timecompare, eventer_timecompare);
139   noit_skiplist_add_index(timed_events,
140                           noit_compare_voidptr, noit_compare_voidptr);
141
142   eventer_err = noit_log_stream_find("error/eventer");
143   eventer_deb = noit_log_stream_find("debug/eventer");
144   if(!eventer_err) eventer_err = noit_stderr;
145   if(!eventer_deb) eventer_deb = noit_debug;
146
147   eventer_ssl_init();
148   eventer_jobq_init(&__global_backq, "default_back_queue");
149   e = eventer_alloc();
150   e->mask = EVENTER_RECURRENT;
151   e->closure = &__global_backq;
152   e->callback = eventer_jobq_consume_available;
153
154   /* We call directly here as we may not be completely initialized */
155   eventer_add_recurrent(e);
156
157   eventer_jobq_init(&__default_jobq, "default_queue");
158   __default_jobq.backq = &__global_backq;
159   for(i=0; i<__default_queue_threads; i++)
160     eventer_jobq_increase_concurrency(&__default_jobq);
161   return 0;
162 }
163
164 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
165   eventer_job_t *job;
166   job = calloc(1, sizeof(*job));
167   job->fd_event = e;
168   gettimeofday(&job->create_time, NULL);
169   /* If we're debugging the eventer, these cross thread timeouts will
170    * make it impossible for us to slowly trace an asynch job. */
171   if(!EVENTER_DEBUGGING && e->whence.tv_sec) {
172     job->timeout_event = eventer_alloc();
173     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
174     job->timeout_event->mask = EVENTER_TIMER;
175     job->timeout_event->closure = job;
176     job->timeout_event->callback = eventer_jobq_execute_timeout;
177     eventer_add(job->timeout_event);
178   }
179   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
180 }
181
182 void eventer_add_timed(eventer_t e) {
183   assert(e->mask & EVENTER_TIMER);
184   if(EVENTER_DEBUGGING) {
185     const char *cbname;
186     cbname = eventer_name_for_callback(e->callback);
187     noitL(eventer_deb, "debug: eventer_add timed (%s)\n",
188           cbname ? cbname : "???");
189   }
190   pthread_mutex_lock(&te_lock);
191   noit_skiplist_insert(timed_events, e);
192   pthread_mutex_unlock(&te_lock);
193 }
194 eventer_t eventer_remove_timed(eventer_t e) {
195   eventer_t removed = NULL;
196   assert(e->mask & EVENTER_TIMER);
197   pthread_mutex_lock(&te_lock);
198   if(noit_skiplist_remove_compare(timed_events, e, NULL,
199                                   noit_compare_voidptr))
200     removed = e;
201   pthread_mutex_unlock(&te_lock);
202   return removed;
203 }
204 void eventer_update_timed(eventer_t e, int mask) {
205   assert(mask & EVENTER_TIMER);
206   pthread_mutex_lock(&te_lock);
207   noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
208   noit_skiplist_insert(timed_events, e);
209   pthread_mutex_unlock(&te_lock);
210 }
211 void eventer_dispatch_timed(struct timeval *now, struct timeval *next) {
212   int max_timed_events_to_process;
213     /* Handle timed events...
214      * we could be multithreaded, so if we pop forever we could starve
215      * ourselves. */
216   max_timed_events_to_process = timed_events->size;
217   while(max_timed_events_to_process-- > 0) {
218     int newmask;
219     eventer_t timed_event;
220
221     gettimeofday(now, NULL);
222
223     pthread_mutex_lock(&te_lock);
224     /* Peek at our next timed event, if should fire, pop it.
225      * otherwise we noop and NULL it out to break the loop. */
226     timed_event = noit_skiplist_peek(timed_events);
227     if(timed_event) {
228       if(compare_timeval(timed_event->whence, *now) < 0) {
229         timed_event = noit_skiplist_pop(timed_events, NULL);
230       }
231       else {
232         sub_timeval(timed_event->whence, *now, next);
233         timed_event = NULL;
234       }
235     }
236     pthread_mutex_unlock(&te_lock);
237     if(timed_event == NULL) break;
238     if(EVENTER_DEBUGGING) {
239       const char *cbname;
240       cbname = eventer_name_for_callback(timed_event->callback);
241       noitLT(eventer_deb, now, "debug: timed dispatch(%s)\n",
242              cbname ? cbname : "???");
243     }
244     /* Make our call */
245     newmask = timed_event->callback(timed_event, EVENTER_TIMER,
246                                     timed_event->closure, now);
247     if(newmask)
248       eventer_add_timed(timed_event);
249     else
250       eventer_free(timed_event);
251   }
252
253   if(compare_timeval(eventer_max_sleeptime, *next) < 0) {
254     /* we exceed our configured maximum, set it down */
255     memcpy(next, &eventer_max_sleeptime, sizeof(*next));
256   }
257 }
258 void
259 eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure) {
260   noit_skiplist_node *iter = NULL;
261   pthread_mutex_lock(&te_lock);
262   for(iter = noit_skiplist_getlist(timed_events); iter;
263       noit_skiplist_next(timed_events,&iter)) {
264     if(iter->data) f(iter->data, closure);
265   }
266   pthread_mutex_unlock(&te_lock);
267 }
268
269 void eventer_dispatch_recurrent(struct timeval *now) {
270   struct recurrent_events *node;
271   struct timeval __now;
272   if(!now) {
273     gettimeofday(&__now, NULL);
274     now = &__now;
275   }
276   pthread_mutex_lock(&recurrent_lock);
277   for(node = recurrent_events; node; node = node->next) {
278     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
279   }
280   pthread_mutex_unlock(&recurrent_lock);
281 }
282 eventer_t eventer_remove_recurrent(eventer_t e) {
283   struct recurrent_events *node, *prev = NULL;
284   pthread_mutex_lock(&recurrent_lock);
285   for(node = recurrent_events; node; node = node->next) {
286     if(node->e == e) {
287       if(prev) prev->next = node->next;
288       else recurrent_events = node->next;
289       free(node);
290       pthread_mutex_unlock(&recurrent_lock);
291       return e;
292     }
293     prev = node;
294   }
295   pthread_mutex_unlock(&recurrent_lock);
296   return NULL;
297 }
298 void eventer_add_recurrent(eventer_t e) {
299   struct recurrent_events *node;
300   assert(e->mask & EVENTER_RECURRENT);
301   pthread_mutex_lock(&recurrent_lock);
302   for(node = recurrent_events; node; node = node->next)
303     if(node->e == e) {
304       pthread_mutex_unlock(&recurrent_lock);
305       return;
306     }
307   node = calloc(1, sizeof(*node));
308   node->e = e;
309   node->next = recurrent_events;
310   recurrent_events = node;
311   pthread_mutex_unlock(&recurrent_lock);
312 }
313
Note: See TracBrowser for help on using the browser.