root/src/eventer/eventer_impl.c

Revision a335d729973892ef579ddf315445c22f17037ef4, 11.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

caching of SSL_CTXs we create. The creation (particularly ingestion of a full CA chain) can be quite expensive

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_skiplist.h"
37 #include <pthread.h>
38 #include <errno.h>
39 #include <assert.h>
40
41 static struct timeval *eventer_impl_epoch = NULL;
42 static int EVENTER_DEBUGGING = 0;
43 static int desired_nofiles = 1024*1024;
44 static pthread_mutex_t te_lock;
45 static noit_skiplist *timed_events = NULL;
46
47 #ifdef HAVE_KQUEUE
48 extern struct _eventer_impl eventer_kqueue_impl;
49 #endif
50 #ifdef HAVE_EPOLL
51 extern struct _eventer_impl eventer_epoll_impl;
52 #endif
53 #ifdef HAVE_PORTS
54 extern struct _eventer_impl eventer_ports_impl;
55 #endif
56
57 eventer_impl_t registered_eventers[] = {
58 #ifdef HAVE_KQUEUE
59   &eventer_kqueue_impl,
60 #endif
61 #ifdef HAVE_EPOLL
62   &eventer_epoll_impl,
63 #endif
64 #ifdef HAVE_PORTS
65   &eventer_ports_impl,
66 #endif
67   NULL
68 };
69
70 eventer_impl_t __eventer = NULL;
71 noit_log_stream_t eventer_err = NULL;
72 noit_log_stream_t eventer_deb = NULL;
73
74 static int __default_queue_threads = 5;
75 static int desired_limit = 1024 * 1024;
76 static eventer_jobq_t __global_backq, __default_jobq;
77 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
78 struct recurrent_events {
79   eventer_t e;
80   struct recurrent_events *next;
81 } *recurrent_events = NULL;
82
83
84 int eventer_impl_propset(const char *key, const char *value) {
85   if(!strcasecmp(key, "default_queue_threads")) {
86     __default_queue_threads = atoi(value);
87     if(__default_queue_threads < 1) {
88       noitL(noit_error, "default_queue_threads must be >= 1\n");
89       return -1;
90     }
91     return 0;
92   }
93   else if(!strcasecmp(key, "rlim_nofiles")) {
94     desired_limit = atoi(value);
95     if(desired_limit < 256) {
96       noitL(noit_error, "rlim_nofiles must be >= 256\n");
97       return -1;
98     }
99     return 0;
100   }
101   else if(!strcasecmp(key, "debugging")) {
102     if(strcmp(value, "0")) {
103       EVENTER_DEBUGGING = 1;
104       noitL(noit_error, "Enabling debugging from property\n");
105     }
106     return 0;
107   }
108   else if(!strcasecmp(key, "default_ca_chain")) {
109     /* used by eventer consumers */
110     return 0;
111   }
112   else if(!strcasecmp(key, "ssl_ctx_cache_expiry")) {
113     eventer_ssl_set_ssl_ctx_cache_expiry(atoi(value));
114     return 0;
115   }
116   noitL(noit_error, "Warning: unknown eventer config '%s'\n", key);
117   return 0;
118 }
119
120 eventer_jobq_t *eventer_default_backq() {
121   return &__global_backq;
122 }
123
124 int eventer_get_epoch(struct timeval *epoch) {
125   if(!eventer_impl_epoch) return -1;
126   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
127   return 0;
128 }
129
130 int eventer_impl_init() {
131   struct rlimit rlim;
132   int i, try;
133   eventer_t e;
134   char *evdeb;
135
136   evdeb = getenv("EVENTER_DEBUGGING");
137   if(evdeb) {
138     if(strcmp(evdeb, "0")) {
139       /* Set to anything but "0" turns debugging on */
140       EVENTER_DEBUGGING = 1;
141       noitL(noit_error, "Disabling eventer debugging from environment\n");
142     }
143     else {
144       EVENTER_DEBUGGING = 1;
145       noitL(noit_error, "Enabling eventer debugging from environment\n");
146     }
147   }
148   eventer_name_callback("eventer_jobq_execute_timeout",
149                         eventer_jobq_execute_timeout);
150   eventer_name_callback("eventer_jobq_consume_available",
151                         eventer_jobq_consume_available);
152
153   getrlimit(RLIMIT_NOFILE, &rlim);
154   rlim.rlim_cur = rlim.rlim_max = try = desired_nofiles;
155   while(setrlimit(RLIMIT_NOFILE, &rlim) != 0 && errno == EPERM && try > desired_limit + 10) {
156     noitL(noit_debug, "setrlimit(%u) : %s\n", (u_int32_t)rlim.rlim_cur, strerror(errno));
157     rlim.rlim_cur = rlim.rlim_max = --try;
158   }
159   getrlimit(RLIMIT_NOFILE, &rlim);
160   noitL(noit_error, "rlim { %u, %u }\n", (u_int32_t)rlim.rlim_cur, (u_int32_t)rlim.rlim_max);
161
162   eventer_impl_epoch = malloc(sizeof(struct timeval));
163   gettimeofday(eventer_impl_epoch, NULL);
164   pthread_mutex_init(&te_lock, NULL);
165     timed_events = calloc(1, sizeof(*timed_events));
166   noit_skiplist_init(timed_events);
167   noit_skiplist_set_compare(timed_events,
168                             eventer_timecompare, eventer_timecompare);
169   noit_skiplist_add_index(timed_events,
170                           noit_compare_voidptr, noit_compare_voidptr);
171
172   eventer_err = noit_log_stream_find("error/eventer");
173   eventer_deb = noit_log_stream_find("debug/eventer");
174   if(!eventer_err) eventer_err = noit_stderr;
175   if(!eventer_deb) eventer_deb = noit_debug;
176
177   eventer_ssl_init();
178   eventer_jobq_init(&__global_backq, "default_back_queue");
179   e = eventer_alloc();
180   e->mask = EVENTER_RECURRENT;
181   e->closure = &__global_backq;
182   e->callback = eventer_jobq_consume_available;
183
184   /* We call directly here as we may not be completely initialized */
185   eventer_add_recurrent(e);
186
187   eventer_jobq_init(&__default_jobq, "default_queue");
188   __default_jobq.backq = &__global_backq;
189   for(i=0; i<__default_queue_threads; i++)
190     eventer_jobq_increase_concurrency(&__default_jobq);
191   return 0;
192 }
193
194 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
195   eventer_job_t *job;
196   job = calloc(1, sizeof(*job));
197   job->fd_event = e;
198   job->jobq = q ? q : &__default_jobq;
199   gettimeofday(&job->create_time, NULL);
200   /* If we're debugging the eventer, these cross thread timeouts will
201    * make it impossible for us to slowly trace an asynch job. */
202   if(!EVENTER_DEBUGGING && e->whence.tv_sec) {
203     job->timeout_event = eventer_alloc();
204     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
205     job->timeout_event->mask = EVENTER_TIMER;
206     job->timeout_event->closure = job;
207     job->timeout_event->callback = eventer_jobq_execute_timeout;
208     eventer_add(job->timeout_event);
209   }
210   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
211 }
212
213 void eventer_add_timed(eventer_t e) {
214   assert(e->mask & EVENTER_TIMER);
215   if(EVENTER_DEBUGGING) {
216     const char *cbname;
217     cbname = eventer_name_for_callback(e->callback);
218     noitL(eventer_deb, "debug: eventer_add timed (%s)\n",
219           cbname ? cbname : "???");
220   }
221   pthread_mutex_lock(&te_lock);
222   noit_skiplist_insert(timed_events, e);
223   pthread_mutex_unlock(&te_lock);
224 }
225 eventer_t eventer_remove_timed(eventer_t e) {
226   eventer_t removed = NULL;
227   assert(e->mask & EVENTER_TIMER);
228   pthread_mutex_lock(&te_lock);
229   if(noit_skiplist_remove_compare(timed_events, e, NULL,
230                                   noit_compare_voidptr))
231     removed = e;
232   pthread_mutex_unlock(&te_lock);
233   return removed;
234 }
235 void eventer_update_timed(eventer_t e, int mask) {
236   assert(mask & EVENTER_TIMER);
237   pthread_mutex_lock(&te_lock);
238   noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr);
239   noit_skiplist_insert(timed_events, e);
240   pthread_mutex_unlock(&te_lock);
241 }
242 void eventer_dispatch_timed(struct timeval *now, struct timeval *next) {
243   int max_timed_events_to_process;
244     /* Handle timed events...
245      * we could be multithreaded, so if we pop forever we could starve
246      * ourselves. */
247   max_timed_events_to_process = timed_events->size;
248   while(max_timed_events_to_process-- > 0) {
249     int newmask;
250     eventer_t timed_event;
251
252     gettimeofday(now, NULL);
253
254     pthread_mutex_lock(&te_lock);
255     /* Peek at our next timed event, if should fire, pop it.
256      * otherwise we noop and NULL it out to break the loop. */
257     timed_event = noit_skiplist_peek(timed_events);
258     if(timed_event) {
259       if(compare_timeval(timed_event->whence, *now) < 0) {
260         timed_event = noit_skiplist_pop(timed_events, NULL);
261       }
262       else {
263         sub_timeval(timed_event->whence, *now, next);
264         timed_event = NULL;
265       }
266     }
267     pthread_mutex_unlock(&te_lock);
268     if(timed_event == NULL) break;
269     if(EVENTER_DEBUGGING) {
270       const char *cbname;
271       cbname = eventer_name_for_callback(timed_event->callback);
272       noitLT(eventer_deb, now, "debug: timed dispatch(%s)\n",
273              cbname ? cbname : "???");
274     }
275     /* Make our call */
276     newmask = timed_event->callback(timed_event, EVENTER_TIMER,
277                                     timed_event->closure, now);
278     if(newmask)
279       eventer_add_timed(timed_event);
280     else
281       eventer_free(timed_event);
282   }
283
284   if(compare_timeval(eventer_max_sleeptime, *next) < 0) {
285     /* we exceed our configured maximum, set it down */
286     memcpy(next, &eventer_max_sleeptime, sizeof(*next));
287   }
288 }
289 void
290 eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure) {
291   noit_skiplist_node *iter = NULL;
292   pthread_mutex_lock(&te_lock);
293   for(iter = noit_skiplist_getlist(timed_events); iter;
294       noit_skiplist_next(timed_events,&iter)) {
295     if(iter->data) f(iter->data, closure);
296   }
297   pthread_mutex_unlock(&te_lock);
298 }
299
300 void eventer_dispatch_recurrent(struct timeval *now) {
301   struct recurrent_events *node;
302   struct timeval __now;
303   if(!now) {
304     gettimeofday(&__now, NULL);
305     now = &__now;
306   }
307   pthread_mutex_lock(&recurrent_lock);
308   for(node = recurrent_events; node; node = node->next) {
309     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
310   }
311   pthread_mutex_unlock(&recurrent_lock);
312 }
313 eventer_t eventer_remove_recurrent(eventer_t e) {
314   struct recurrent_events *node, *prev = NULL;
315   pthread_mutex_lock(&recurrent_lock);
316   for(node = recurrent_events; node; node = node->next) {
317     if(node->e == e) {
318       if(prev) prev->next = node->next;
319       else recurrent_events = node->next;
320       free(node);
321       pthread_mutex_unlock(&recurrent_lock);
322       return e;
323     }
324     prev = node;
325   }
326   pthread_mutex_unlock(&recurrent_lock);
327   return NULL;
328 }
329 void eventer_add_recurrent(eventer_t e) {
330   struct recurrent_events *node;
331   assert(e->mask & EVENTER_RECURRENT);
332   pthread_mutex_lock(&recurrent_lock);
333   for(node = recurrent_events; node; node = node->next)
334     if(node->e == e) {
335       pthread_mutex_unlock(&recurrent_lock);
336       return;
337     }
338   node = calloc(1, sizeof(*node));
339   node->e = e;
340   node->next = recurrent_events;
341   recurrent_events = node;
342   pthread_mutex_unlock(&recurrent_lock);
343 }
344
Note: See TracBrowser for help on using the browser.