root/src/eventer/eventer_impl.c

Revision f870be02daa5f1ef34211f67b8489564f94b0a88, 11.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

fix types

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_skiplist.h"
37 #include "eventer/dtrace_probes.h"
38 #include <pthread.h>
39 #include <errno.h>
40 #include <assert.h>
41
42 static struct timeval *eventer_impl_epoch = NULL;
43 static int EVENTER_DEBUGGING = 0;
44 static int desired_nofiles = 1024*1024;
45 static pthread_mutex_t te_lock;
46 static noit_skiplist *timed_events = NULL;
47
48 #ifdef HAVE_KQUEUE
49 extern struct _eventer_impl eventer_kqueue_impl;
50 #endif
51 #ifdef HAVE_EPOLL
52 extern struct _eventer_impl eventer_epoll_impl;
53 #endif
54 #ifdef HAVE_PORTS
55 extern struct _eventer_impl eventer_ports_impl;
56 #endif
57
58 eventer_impl_t registered_eventers[] = {
59 #ifdef HAVE_KQUEUE
60   &eventer_kqueue_impl,
61 #endif
62 #ifdef HAVE_EPOLL
63   &eventer_epoll_impl,
64 #endif
65 #ifdef HAVE_PORTS
66   &eventer_ports_impl,
67 #endif
68   NULL
69 };
70
71 eventer_impl_t __eventer = NULL;
72 noit_log_stream_t eventer_err = NULL;
73 noit_log_stream_t eventer_deb = NULL;
74
75 static int __default_queue_threads = 5;
76 static int desired_limit = 1024 * 1024;
77 static eventer_jobq_t __global_backq, __default_jobq;
78 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
79 struct recurrent_events {
80   eventer_t e;
81   struct recurrent_events *next;
82 } *recurrent_events = NULL;
83
84
85 int eventer_impl_propset(const char *key, const char *value) {
86   if(!strcasecmp(key, "default_queue_threads")) {
87     __default_queue_threads = atoi(value);
88     if(__default_queue_threads < 1) {
89       noitL(noit_error, "default_queue_threads must be >= 1\n");
90       return -1;
91     }
92     return 0;
93   }
94   else if(!strcasecmp(key, "rlim_nofiles")) {
95     desired_limit = atoi(value);
96     if(desired_limit < 256) {
97       noitL(noit_error, "rlim_nofiles must be >= 256\n");
98       return -1;
99     }
100     return 0;
101   }
102   else if(!strcasecmp(key, "debugging")) {
103     if(strcmp(value, "0")) {
104       EVENTER_DEBUGGING = 1;
105       noitL(noit_error, "Enabling debugging from property\n");
106     }
107     return 0;
108   }
109   else if(!strcasecmp(key, "default_ca_chain")) {
110     /* used by eventer consumers */
111     return 0;
112   }
113   else if(!strcasecmp(key, "ssl_ctx_cache_expiry")) {
114     eventer_ssl_set_ssl_ctx_cache_expiry(atoi(value));
115     return 0;
116   }
117   noitL(noit_error, "Warning: unknown eventer config '%s'\n", key);
118   return 0;
119 }
120
121 eventer_jobq_t *eventer_default_backq() {
122   return &__global_backq;
123 }
124
125 int eventer_get_epoch(struct timeval *epoch) {
126   if(!eventer_impl_epoch) return -1;
127   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
128   return 0;
129 }
130
131 int eventer_impl_init() {
132   struct rlimit rlim;
133   int i, try;
134   eventer_t e;
135   char *evdeb;
136
137   evdeb = getenv("EVENTER_DEBUGGING");
138   if(evdeb) {
139     if(strcmp(evdeb, "0")) {
140       /* Set to anything but "0" turns debugging on */
141       EVENTER_DEBUGGING = 1;
142       noitL(noit_error, "Disabling eventer debugging from environment\n");
143     }
144     else {
145       EVENTER_DEBUGGING = 1;
146       noitL(noit_error, "Enabling eventer debugging from environment\n");
147     }
148   }
149   eventer_name_callback("eventer_jobq_execute_timeout",
150                         eventer_jobq_execute_timeout);
151   eventer_name_callback("eventer_jobq_consume_available",
152                         eventer_jobq_consume_available);
153
154   getrlimit(RLIMIT_NOFILE, &rlim);
155   rlim.rlim_cur = rlim.rlim_max = try = desired_nofiles;
156   while(setrlimit(RLIMIT_NOFILE, &rlim) != 0 && errno == EPERM && try > desired_limit + 10) {
157     noitL(noit_debug, "setrlimit(%u) : %s\n", (u_int32_t)rlim.rlim_cur, strerror(errno));
158     rlim.rlim_cur = rlim.rlim_max = --try;
159   }
160   getrlimit(RLIMIT_NOFILE, &rlim);
161   noitL(noit_error, "rlim { %u, %u }\n", (u_int32_t)rlim.rlim_cur, (u_int32_t)rlim.rlim_max);
162
163   eventer_impl_epoch = malloc(sizeof(struct timeval));
164   gettimeofday(eventer_impl_epoch, NULL);
165   pthread_mutex_init(&te_lock, NULL);
166     timed_events = calloc(1, sizeof(*timed_events));
167   noit_skiplist_init(timed_events);
168   noit_skiplist_set_compare(timed_events,
169                             eventer_timecompare, eventer_timecompare);
170   noit_skiplist_add_index(timed_events,
171                           noit_compare_voidptr, noit_compare_voidptr);
172
173   eventer_err = noit_log_stream_find("error/eventer");
174   eventer_deb = noit_log_stream_find("debug/eventer");
175   if(!eventer_err) eventer_err = noit_stderr;
176   if(!eventer_deb) eventer_deb = noit_debug;
177
178   eventer_ssl_init();
179   eventer_jobq_init(&__global_backq, "default_back_queue");
180   e = eventer_alloc();
181   e->mask = EVENTER_RECURRENT;
182   e->closure = &__global_backq;
183   e->callback = eventer_jobq_consume_available;
184
185   /* We call directly here as we may not be completely initialized */
186   eventer_add_recurrent(e);
187
188   eventer_jobq_init(&__default_jobq, "default_queue");
189   __default_jobq.backq = &__global_backq;
190   for(i=0; i<__default_queue_threads; i++)
191     eventer_jobq_increase_concurrency(&__default_jobq);
192   return 0;
193 }
194
195 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
196   eventer_job_t *job;
197   job = calloc(1, sizeof(*job));
198   job->fd_event = e;
199   job->jobq = q ? q : &__default_jobq;
200   gettimeofday(&job->create_time, NULL);
201   /* If we're debugging the eventer, these cross thread timeouts will
202    * make it impossible for us to slowly trace an asynch job. */
203   if(!EVENTER_DEBUGGING && e->whence.tv_sec) {
204     job->timeout_event = eventer_alloc();
205     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
206     job->timeout_event->mask = EVENTER_TIMER;
207     job->timeout_event->closure = job;
208     job->timeout_event->callback = eventer_jobq_execute_timeout;
209     eventer_add(job->timeout_event);
210   }
211   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
212 }
213
214 void eventer_add_timed(eventer_t e) {
215   assert(e->mask & EVENTER_TIMER);
216   if(EVENTER_DEBUGGING) {
217     const char *cbname;
218     cbname = eventer_name_for_callback(e->callback);
219     noitL(eventer_deb, "debug: eventer_add timed (%s)\n",
220           cbname ? cbname : "???");
221   }
222   pthread_mutex_lock(&te_lock);
223   noit_skiplist_insert(timed_events, e);
224   pthread_mutex_unlock(&te_lock);
225 }
226 eventer_t eventer_remove_timed(eventer_t e) {
227   eventer_t removed = NULL;
228   assert(e->mask & EVENTER_TIMER);
229   pthread_mutex_lock(&te_lock);
230   if(noit_skiplist_remove_compare(timed_events, e, NULL,
231                                   noit_compare_voidptr))
232     removed = e;
233   pthread_mutex_unlock(&te_lock);
234   return removed;
235 }
236 void eventer_update_timed(eventer_t e, int mask) {
237   assert(mask & EVENTER_TIMER);
238   pthread_mutex_lock(&te_lock);
239   noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
240   noit_skiplist_insert(timed_events, e);
241   pthread_mutex_unlock(&te_lock);
242 }
243 void eventer_dispatch_timed(struct timeval *now, struct timeval *next) {
244   int max_timed_events_to_process;
245     /* Handle timed events...
246      * we could be multithreaded, so if we pop forever we could starve
247      * ourselves. */
248   max_timed_events_to_process = timed_events->size;
249   while(max_timed_events_to_process-- > 0) {
250     int newmask;
251     const char *cbname = NULL;
252     eventer_t timed_event;
253
254     gettimeofday(now, NULL);
255
256     pthread_mutex_lock(&te_lock);
257     /* Peek at our next timed event, if should fire, pop it.
258      * otherwise we noop and NULL it out to break the loop. */
259     timed_event = noit_skiplist_peek(timed_events);
260     if(timed_event) {
261       if(compare_timeval(timed_event->whence, *now) < 0) {
262         timed_event = noit_skiplist_pop(timed_events, NULL);
263       }
264       else {
265         sub_timeval(timed_event->whence, *now, next);
266         timed_event = NULL;
267       }
268     }
269     pthread_mutex_unlock(&te_lock);
270     if(timed_event == NULL) break;
271     if(EVENTER_DEBUGGING ||
272        EVENTER_CALLBACK_ENTRY_ENABLED() ||
273        EVENTER_CALLBACK_RETURN_ENABLED()) {
274       cbname = eventer_name_for_callback(timed_event->callback);
275       noitLT(eventer_deb, now, "debug: timed dispatch(%s)\n",
276              cbname ? cbname : "???");
277     }
278     /* Make our call */
279     EVENTER_CALLBACK_ENTRY((void *)timed_event->callback, (char *)cbname, -1,
280                            timed_event->mask, EVENTER_TIMER);
281     newmask = timed_event->callback(timed_event, EVENTER_TIMER,
282                                     timed_event->closure, now);
283     EVENTER_CALLBACK_RETURN((void *)timed_event->callback, (char *)cbname, newmask);
284     if(newmask)
285       eventer_add_timed(timed_event);
286     else
287       eventer_free(timed_event);
288   }
289
290   if(compare_timeval(eventer_max_sleeptime, *next) < 0) {
291     /* we exceed our configured maximum, set it down */
292     memcpy(next, &eventer_max_sleeptime, sizeof(*next));
293   }
294 }
295 void
296 eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure) {
297   noit_skiplist_node *iter = NULL;
298   pthread_mutex_lock(&te_lock);
299   for(iter = noit_skiplist_getlist(timed_events); iter;
300       noit_skiplist_next(timed_events,&iter)) {
301     if(iter->data) f(iter->data, closure);
302   }
303   pthread_mutex_unlock(&te_lock);
304 }
305
306 void eventer_dispatch_recurrent(struct timeval *now) {
307   struct recurrent_events *node;
308   struct timeval __now;
309   if(!now) {
310     gettimeofday(&__now, NULL);
311     now = &__now;
312   }
313   pthread_mutex_lock(&recurrent_lock);
314   for(node = recurrent_events; node; node = node->next) {
315     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
316   }
317   pthread_mutex_unlock(&recurrent_lock);
318 }
319 eventer_t eventer_remove_recurrent(eventer_t e) {
320   struct recurrent_events *node, *prev = NULL;
321   pthread_mutex_lock(&recurrent_lock);
322   for(node = recurrent_events; node; node = node->next) {
323     if(node->e == e) {
324       if(prev) prev->next = node->next;
325       else recurrent_events = node->next;
326       free(node);
327       pthread_mutex_unlock(&recurrent_lock);
328       return e;
329     }
330     prev = node;
331   }
332   pthread_mutex_unlock(&recurrent_lock);
333   return NULL;
334 }
335 void eventer_add_recurrent(eventer_t e) {
336   struct recurrent_events *node;
337   assert(e->mask & EVENTER_RECURRENT);
338   pthread_mutex_lock(&recurrent_lock);
339   for(node = recurrent_events; node; node = node->next)
340     if(node->e == e) {
341       pthread_mutex_unlock(&recurrent_lock);
342       return;
343     }
344   node = calloc(1, sizeof(*node));
345   node->e = e;
346   node->next = recurrent_events;
347   recurrent_events = node;
348   pthread_mutex_unlock(&recurrent_lock);
349 }
350
Note: See TracBrowser for help on using the browser.