root/src/eventer/eventer_impl.c

Revision 3d39c2b98ac4279f4067a34378f1292d9a989140, 11.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 8 years ago)

fixes #291

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_skiplist.h"
37 #include <pthread.h>
38 #include <errno.h>
39 #include <assert.h>
40
41 static struct timeval *eventer_impl_epoch = NULL;
42 static int EVENTER_DEBUGGING = 0;
43 static int desired_nofiles = 1024*1024;
44 static pthread_mutex_t te_lock;
45 static noit_skiplist *timed_events = NULL;
46
47 #ifdef HAVE_KQUEUE
48 extern struct _eventer_impl eventer_kqueue_impl;
49 #endif
50 #ifdef HAVE_EPOLL
51 extern struct _eventer_impl eventer_epoll_impl;
52 #endif
53 #ifdef HAVE_PORTS
54 extern struct _eventer_impl eventer_ports_impl;
55 #endif
56
57 eventer_impl_t registered_eventers[] = {
58 #ifdef HAVE_KQUEUE
59   &eventer_kqueue_impl,
60 #endif
61 #ifdef HAVE_EPOLL
62   &eventer_epoll_impl,
63 #endif
64 #ifdef HAVE_PORTS
65   &eventer_ports_impl,
66 #endif
67   NULL
68 };
69
70 eventer_impl_t __eventer = NULL;
71 noit_log_stream_t eventer_err = NULL;
72 noit_log_stream_t eventer_deb = NULL;
73
74 static int __default_queue_threads = 5;
75 static int desired_limit = 1024 * 1024;
76 static eventer_jobq_t __global_backq, __default_jobq;
77 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
78 struct recurrent_events {
79   eventer_t e;
80   struct recurrent_events *next;
81 } *recurrent_events = NULL;
82
83
84 int eventer_impl_propset(const char *key, const char *value) {
85   if(!strcasecmp(key, "default_queue_threads")) {
86     __default_queue_threads = atoi(value);
87     if(__default_queue_threads < 1) {
88       noitL(noit_error, "default_queue_threads must be >= 1\n");
89       return -1;
90     }
91     return 0;
92   }
93   else if(!strcasecmp(key, "debugging")) {
94     desired_limit = atoi(value);
95     if(__default_queue_threads < 256) {
96       noitL(noit_error, "rlim_nofiles must be >= 256\n");
97       return -1;
98     }
99     return 0;
100   }
101   else if(!strcasecmp(key, "debugging")) {
102     if(strcmp(value, "0")) {
103       EVENTER_DEBUGGING = 1;
104       noitL(noit_error, "Enabling debugging from property\n");
105     }
106     return 0;
107   }
108   else if(!strcasecmp(key, "default_ca_chain")) {
109     /* used by eventer consumers */
110     return 0;
111   }
112   noitL(noit_error, "Warning: unknown eventer config '%s'\n", key);
113   return 0;
114 }
115
116 eventer_jobq_t *eventer_default_backq() {
117   return &__global_backq;
118 }
119
120 int eventer_get_epoch(struct timeval *epoch) {
121   if(!eventer_impl_epoch) return -1;
122   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
123   return 0;
124 }
125
126 int eventer_impl_init() {
127   struct rlimit rlim;
128   int i, try;
129   eventer_t e;
130   char *evdeb;
131
132   evdeb = getenv("EVENTER_DEBUGGING");
133   if(evdeb) {
134     if(strcmp(evdeb, "0")) {
135       /* Set to anything but "0" turns debugging on */
136       EVENTER_DEBUGGING = 1;
137       noitL(noit_error, "Disabling eventer debugging from environment\n");
138     }
139     else {
140       EVENTER_DEBUGGING = 1;
141       noitL(noit_error, "Enabling eventer debugging from environment\n");
142     }
143   }
144   eventer_name_callback("eventer_jobq_execute_timeout",
145                         eventer_jobq_execute_timeout);
146   eventer_name_callback("eventer_jobq_consume_available",
147                         eventer_jobq_consume_available);
148
149   getrlimit(RLIMIT_NOFILE, &rlim);
150   rlim.rlim_cur = rlim.rlim_max = try = desired_nofiles;
151   while(setrlimit(RLIMIT_NOFILE, &rlim) != 0 && errno == EPERM && try > desired_limit + 10) {
152     noitL(noit_debug, "setrlimit(%u) : %s\n", (u_int32_t)rlim.rlim_cur, strerror(errno));
153     rlim.rlim_cur = rlim.rlim_max = --try;
154   }
155   getrlimit(RLIMIT_NOFILE, &rlim);
156   noitL(noit_error, "rlim { %u, %u }\n", (u_int32_t)rlim.rlim_cur, (u_int32_t)rlim.rlim_max);
157
158   eventer_impl_epoch = malloc(sizeof(struct timeval));
159   gettimeofday(eventer_impl_epoch, NULL);
160   pthread_mutex_init(&te_lock, NULL);
161     timed_events = calloc(1, sizeof(*timed_events));
162   noit_skiplist_init(timed_events);
163   noit_skiplist_set_compare(timed_events,
164                             eventer_timecompare, eventer_timecompare);
165   noit_skiplist_add_index(timed_events,
166                           noit_compare_voidptr, noit_compare_voidptr);
167
168   eventer_err = noit_log_stream_find("error/eventer");
169   eventer_deb = noit_log_stream_find("debug/eventer");
170   if(!eventer_err) eventer_err = noit_stderr;
171   if(!eventer_deb) eventer_deb = noit_debug;
172
173   eventer_ssl_init();
174   eventer_jobq_init(&__global_backq, "default_back_queue");
175   e = eventer_alloc();
176   e->mask = EVENTER_RECURRENT;
177   e->closure = &__global_backq;
178   e->callback = eventer_jobq_consume_available;
179
180   /* We call directly here as we may not be completely initialized */
181   eventer_add_recurrent(e);
182
183   eventer_jobq_init(&__default_jobq, "default_queue");
184   __default_jobq.backq = &__global_backq;
185   for(i=0; i<__default_queue_threads; i++)
186     eventer_jobq_increase_concurrency(&__default_jobq);
187   return 0;
188 }
189
190 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
191   eventer_job_t *job;
192   job = calloc(1, sizeof(*job));
193   job->fd_event = e;
194   gettimeofday(&job->create_time, NULL);
195   /* If we're debugging the eventer, these cross thread timeouts will
196    * make it impossible for us to slowly trace an asynch job. */
197   if(!EVENTER_DEBUGGING && e->whence.tv_sec) {
198     job->timeout_event = eventer_alloc();
199     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
200     job->timeout_event->mask = EVENTER_TIMER;
201     job->timeout_event->closure = job;
202     job->timeout_event->callback = eventer_jobq_execute_timeout;
203     eventer_add(job->timeout_event);
204   }
205   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
206 }
207
208 void eventer_add_timed(eventer_t e) {
209   assert(e->mask & EVENTER_TIMER);
210   if(EVENTER_DEBUGGING) {
211     const char *cbname;
212     cbname = eventer_name_for_callback(e->callback);
213     noitL(eventer_deb, "debug: eventer_add timed (%s)\n",
214           cbname ? cbname : "???");
215   }
216   pthread_mutex_lock(&te_lock);
217   noit_skiplist_insert(timed_events, e);
218   pthread_mutex_unlock(&te_lock);
219 }
220 eventer_t eventer_remove_timed(eventer_t e) {
221   eventer_t removed = NULL;
222   assert(e->mask & EVENTER_TIMER);
223   pthread_mutex_lock(&te_lock);
224   if(noit_skiplist_remove_compare(timed_events, e, NULL,
225                                   noit_compare_voidptr))
226     removed = e;
227   pthread_mutex_unlock(&te_lock);
228   return removed;
229 }
230 void eventer_update_timed(eventer_t e, int mask) {
231   assert(mask & EVENTER_TIMER);
232   pthread_mutex_lock(&te_lock);
233   noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
234   noit_skiplist_insert(timed_events, e);
235   pthread_mutex_unlock(&te_lock);
236 }
237 void eventer_dispatch_timed(struct timeval *now, struct timeval *next) {
238   int max_timed_events_to_process;
239     /* Handle timed events...
240      * we could be multithreaded, so if we pop forever we could starve
241      * ourselves. */
242   max_timed_events_to_process = timed_events->size;
243   while(max_timed_events_to_process-- > 0) {
244     int newmask;
245     eventer_t timed_event;
246
247     gettimeofday(now, NULL);
248
249     pthread_mutex_lock(&te_lock);
250     /* Peek at our next timed event, if should fire, pop it.
251      * otherwise we noop and NULL it out to break the loop. */
252     timed_event = noit_skiplist_peek(timed_events);
253     if(timed_event) {
254       if(compare_timeval(timed_event->whence, *now) < 0) {
255         timed_event = noit_skiplist_pop(timed_events, NULL);
256       }
257       else {
258         sub_timeval(timed_event->whence, *now, next);
259         timed_event = NULL;
260       }
261     }
262     pthread_mutex_unlock(&te_lock);
263     if(timed_event == NULL) break;
264     if(EVENTER_DEBUGGING) {
265       const char *cbname;
266       cbname = eventer_name_for_callback(timed_event->callback);
267       noitLT(eventer_deb, now, "debug: timed dispatch(%s)\n",
268              cbname ? cbname : "???");
269     }
270     /* Make our call */
271     newmask = timed_event->callback(timed_event, EVENTER_TIMER,
272                                     timed_event->closure, now);
273     if(newmask)
274       eventer_add_timed(timed_event);
275     else
276       eventer_free(timed_event);
277   }
278
279   if(compare_timeval(eventer_max_sleeptime, *next) < 0) {
280     /* we exceed our configured maximum, set it down */
281     memcpy(next, &eventer_max_sleeptime, sizeof(*next));
282   }
283 }
284 void
285 eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure) {
286   noit_skiplist_node *iter = NULL;
287   pthread_mutex_lock(&te_lock);
288   for(iter = noit_skiplist_getlist(timed_events); iter;
289       noit_skiplist_next(timed_events,&iter)) {
290     if(iter->data) f(iter->data, closure);
291   }
292   pthread_mutex_unlock(&te_lock);
293 }
294
295 void eventer_dispatch_recurrent(struct timeval *now) {
296   struct recurrent_events *node;
297   struct timeval __now;
298   if(!now) {
299     gettimeofday(&__now, NULL);
300     now = &__now;
301   }
302   pthread_mutex_lock(&recurrent_lock);
303   for(node = recurrent_events; node; node = node->next) {
304     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
305   }
306   pthread_mutex_unlock(&recurrent_lock);
307 }
308 eventer_t eventer_remove_recurrent(eventer_t e) {
309   struct recurrent_events *node, *prev = NULL;
310   pthread_mutex_lock(&recurrent_lock);
311   for(node = recurrent_events; node; node = node->next) {
312     if(node->e == e) {
313       if(prev) prev->next = node->next;
314       else recurrent_events = node->next;
315       free(node);
316       pthread_mutex_unlock(&recurrent_lock);
317       return e;
318     }
319     prev = node;
320   }
321   pthread_mutex_unlock(&recurrent_lock);
322   return NULL;
323 }
324 void eventer_add_recurrent(eventer_t e) {
325   struct recurrent_events *node;
326   assert(e->mask & EVENTER_RECURRENT);
327   pthread_mutex_lock(&recurrent_lock);
328   for(node = recurrent_events; node; node = node->next)
329     if(node->e == e) {
330       pthread_mutex_unlock(&recurrent_lock);
331       return;
332     }
333   node = calloc(1, sizeof(*node));
334   node->e = e;
335   node->next = recurrent_events;
336   recurrent_events = node;
337   pthread_mutex_unlock(&recurrent_lock);
338 }
339
Note: See TracBrowser for help on using the browser.