root/src/eventer/eventer_impl.c

Revision 8135eaee0b0d6dfb4595c77c878f2a998a7303e1, 10.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

add more names fpr callbacks for introspection, refs #221

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_skiplist.h"
37 #include <pthread.h>
38 #include <assert.h>
39
40 static struct timeval *eventer_impl_epoch = NULL;
41 static int EVENTER_DEBUGGING = 0;
42 static pthread_mutex_t te_lock;
43 static noit_skiplist *timed_events = NULL;
44
45 #ifdef HAVE_KQUEUE
46 extern struct _eventer_impl eventer_kqueue_impl;
47 #endif
48 #ifdef HAVE_EPOLL
49 extern struct _eventer_impl eventer_epoll_impl;
50 #endif
51 #ifdef HAVE_PORTS
52 extern struct _eventer_impl eventer_ports_impl;
53 #endif
54
55 eventer_impl_t registered_eventers[] = {
56 #ifdef HAVE_KQUEUE
57   &eventer_kqueue_impl,
58 #endif
59 #ifdef HAVE_EPOLL
60   &eventer_epoll_impl,
61 #endif
62 #ifdef HAVE_PORTS
63   &eventer_ports_impl,
64 #endif
65   NULL
66 };
67
68 eventer_impl_t __eventer = NULL;
69 noit_log_stream_t eventer_err = NULL;
70 noit_log_stream_t eventer_deb = NULL;
71
72 static int __default_queue_threads = 5;
73 static eventer_jobq_t __global_backq, __default_jobq;
74 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
75 struct recurrent_events {
76   eventer_t e;
77   struct recurrent_events *next;
78 } *recurrent_events = NULL;
79
80
81 int eventer_impl_propset(const char *key, const char *value) {
82   if(!strcasecmp(key, "default_queue_threads")) {
83     __default_queue_threads = atoi(value);
84     if(__default_queue_threads < 1) {
85       noitL(noit_error, "default_queue_threads must be >= 1\n");
86       return -1;
87     }
88     return 0;
89   }
90   else if(!strcasecmp(key, "debugging")) {
91     if(strcmp(value, "0")) {
92       EVENTER_DEBUGGING = 1;
93       noitL(noit_error, "Enabling debugging from property\n");
94     }
95     return 0;
96   }
97   else if(!strcasecmp(key, "default_ca_chain")) {
98     /* used by eventer consumers */
99     return 0;
100   }
101   noitL(noit_error, "Warning: unknown eventer config '%s'\n", key);
102   return 0;
103 }
104
105 eventer_jobq_t *eventer_default_backq() {
106   return &__global_backq;
107 }
108
109 int eventer_get_epoch(struct timeval *epoch) {
110   if(!eventer_impl_epoch) return -1;
111   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
112   return 0;
113 }
114
115 int eventer_impl_init() {
116   int i;
117   eventer_t e;
118   char *evdeb;
119
120   evdeb = getenv("EVENTER_DEBUGGING");
121   if(evdeb) {
122     if(strcmp(evdeb, "0")) {
123       /* Set to anything but "0" turns debugging on */
124       EVENTER_DEBUGGING = 1;
125       noitL(noit_error, "Disabling eventer debugging from environment\n");
126     }
127     else {
128       EVENTER_DEBUGGING = 1;
129       noitL(noit_error, "Enabling eventer debugging from environment\n");
130     }
131   }
132   eventer_name_callback("eventer_jobq_execute_timeout",
133                         eventer_jobq_execute_timeout);
134   eventer_name_callback("eventer_jobq_consume_available",
135                         eventer_jobq_consume_available);
136
137   eventer_impl_epoch = malloc(sizeof(struct timeval));
138   gettimeofday(eventer_impl_epoch, NULL);
139   pthread_mutex_init(&te_lock, NULL);
140     timed_events = calloc(1, sizeof(*timed_events));
141   noit_skiplist_init(timed_events);
142   noit_skiplist_set_compare(timed_events,
143                             eventer_timecompare, eventer_timecompare);
144   noit_skiplist_add_index(timed_events,
145                           noit_compare_voidptr, noit_compare_voidptr);
146
147   eventer_err = noit_log_stream_find("error/eventer");
148   eventer_deb = noit_log_stream_find("debug/eventer");
149   if(!eventer_err) eventer_err = noit_stderr;
150   if(!eventer_deb) eventer_deb = noit_debug;
151
152   eventer_ssl_init();
153   eventer_jobq_init(&__global_backq, "default_back_queue");
154   e = eventer_alloc();
155   e->mask = EVENTER_RECURRENT;
156   e->closure = &__global_backq;
157   e->callback = eventer_jobq_consume_available;
158
159   /* We call directly here as we may not be completely initialized */
160   eventer_add_recurrent(e);
161
162   eventer_jobq_init(&__default_jobq, "default_queue");
163   __default_jobq.backq = &__global_backq;
164   for(i=0; i<__default_queue_threads; i++)
165     eventer_jobq_increase_concurrency(&__default_jobq);
166   return 0;
167 }
168
169 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
170   eventer_job_t *job;
171   job = calloc(1, sizeof(*job));
172   job->fd_event = e;
173   gettimeofday(&job->create_time, NULL);
174   /* If we're debugging the eventer, these cross thread timeouts will
175    * make it impossible for us to slowly trace an asynch job. */
176   if(!EVENTER_DEBUGGING && e->whence.tv_sec) {
177     job->timeout_event = eventer_alloc();
178     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
179     job->timeout_event->mask = EVENTER_TIMER;
180     job->timeout_event->closure = job;
181     job->timeout_event->callback = eventer_jobq_execute_timeout;
182     eventer_add(job->timeout_event);
183   }
184   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
185 }
186
187 void eventer_add_timed(eventer_t e) {
188   assert(e->mask & EVENTER_TIMER);
189   if(EVENTER_DEBUGGING) {
190     const char *cbname;
191     cbname = eventer_name_for_callback(e->callback);
192     noitL(eventer_deb, "debug: eventer_add timed (%s)\n",
193           cbname ? cbname : "???");
194   }
195   pthread_mutex_lock(&te_lock);
196   noit_skiplist_insert(timed_events, e);
197   pthread_mutex_unlock(&te_lock);
198 }
199 eventer_t eventer_remove_timed(eventer_t e) {
200   eventer_t removed = NULL;
201   assert(e->mask & EVENTER_TIMER);
202   pthread_mutex_lock(&te_lock);
203   if(noit_skiplist_remove_compare(timed_events, e, NULL,
204                                   noit_compare_voidptr))
205     removed = e;
206   pthread_mutex_unlock(&te_lock);
207   return removed;
208 }
209 void eventer_update_timed(eventer_t e, int mask) {
210   assert(mask & EVENTER_TIMER);
211   pthread_mutex_lock(&te_lock);
212   noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
213   noit_skiplist_insert(timed_events, e);
214   pthread_mutex_unlock(&te_lock);
215 }
216 void eventer_dispatch_timed(struct timeval *now, struct timeval *next) {
217   int max_timed_events_to_process;
218     /* Handle timed events...
219      * we could be multithreaded, so if we pop forever we could starve
220      * ourselves. */
221   max_timed_events_to_process = timed_events->size;
222   while(max_timed_events_to_process-- > 0) {
223     int newmask;
224     eventer_t timed_event;
225
226     gettimeofday(now, NULL);
227
228     pthread_mutex_lock(&te_lock);
229     /* Peek at our next timed event, if should fire, pop it.
230      * otherwise we noop and NULL it out to break the loop. */
231     timed_event = noit_skiplist_peek(timed_events);
232     if(timed_event) {
233       if(compare_timeval(timed_event->whence, *now) < 0) {
234         timed_event = noit_skiplist_pop(timed_events, NULL);
235       }
236       else {
237         sub_timeval(timed_event->whence, *now, next);
238         timed_event = NULL;
239       }
240     }
241     pthread_mutex_unlock(&te_lock);
242     if(timed_event == NULL) break;
243     if(EVENTER_DEBUGGING) {
244       const char *cbname;
245       cbname = eventer_name_for_callback(timed_event->callback);
246       noitLT(eventer_deb, now, "debug: timed dispatch(%s)\n",
247              cbname ? cbname : "???");
248     }
249     /* Make our call */
250     newmask = timed_event->callback(timed_event, EVENTER_TIMER,
251                                     timed_event->closure, now);
252     if(newmask)
253       eventer_add_timed(timed_event);
254     else
255       eventer_free(timed_event);
256   }
257
258   if(compare_timeval(eventer_max_sleeptime, *next) < 0) {
259     /* we exceed our configured maximum, set it down */
260     memcpy(next, &eventer_max_sleeptime, sizeof(*next));
261   }
262 }
263 void
264 eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure) {
265   noit_skiplist_node *iter = NULL;
266   pthread_mutex_lock(&te_lock);
267   for(iter = noit_skiplist_getlist(timed_events); iter;
268       noit_skiplist_next(timed_events,&iter)) {
269     if(iter->data) f(iter->data, closure);
270   }
271   pthread_mutex_unlock(&te_lock);
272 }
273
274 void eventer_dispatch_recurrent(struct timeval *now) {
275   struct recurrent_events *node;
276   struct timeval __now;
277   if(!now) {
278     gettimeofday(&__now, NULL);
279     now = &__now;
280   }
281   pthread_mutex_lock(&recurrent_lock);
282   for(node = recurrent_events; node; node = node->next) {
283     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
284   }
285   pthread_mutex_unlock(&recurrent_lock);
286 }
287 eventer_t eventer_remove_recurrent(eventer_t e) {
288   struct recurrent_events *node, *prev = NULL;
289   pthread_mutex_lock(&recurrent_lock);
290   for(node = recurrent_events; node; node = node->next) {
291     if(node->e == e) {
292       if(prev) prev->next = node->next;
293       else recurrent_events = node->next;
294       free(node);
295       pthread_mutex_unlock(&recurrent_lock);
296       return e;
297     }
298     prev = node;
299   }
300   pthread_mutex_unlock(&recurrent_lock);
301   return NULL;
302 }
303 void eventer_add_recurrent(eventer_t e) {
304   struct recurrent_events *node;
305   assert(e->mask & EVENTER_RECURRENT);
306   pthread_mutex_lock(&recurrent_lock);
307   for(node = recurrent_events; node; node = node->next)
308     if(node->e == e) {
309       pthread_mutex_unlock(&recurrent_lock);
310       return;
311     }
312   node = calloc(1, sizeof(*node));
313   node->e = e;
314   node->next = recurrent_events;
315   recurrent_events = node;
316   pthread_mutex_unlock(&recurrent_lock);
317 }
318
Note: See TracBrowser for help on using the browser.