| 1 |
/* |
|---|
| 2 |
* Copyright (c) 2007, OmniTI Computer Consulting, Inc. |
|---|
| 3 |
* All rights reserved. |
|---|
| 4 |
* |
|---|
| 5 |
* Redistribution and use in source and binary forms, with or without |
|---|
| 6 |
* modification, are permitted provided that the following conditions are |
|---|
| 7 |
* met: |
|---|
| 8 |
* |
|---|
| 9 |
* * Redistributions of source code must retain the above copyright |
|---|
| 10 |
* notice, this list of conditions and the following disclaimer. |
|---|
| 11 |
* * Redistributions in binary form must reproduce the above |
|---|
| 12 |
* copyright notice, this list of conditions and the following |
|---|
| 13 |
* disclaimer in the documentation and/or other materials provided |
|---|
| 14 |
* with the distribution. |
|---|
| 15 |
* * Neither the name OmniTI Computer Consulting, Inc. nor the names |
|---|
| 16 |
* of its contributors may be used to endorse or promote products |
|---|
| 17 |
* derived from this software without specific prior written |
|---|
| 18 |
* permission. |
|---|
| 19 |
* |
|---|
| 20 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
|---|
| 21 |
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
|---|
| 22 |
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
|---|
| 23 |
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
|---|
| 24 |
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|---|
| 25 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|---|
| 26 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|---|
| 27 |
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|---|
| 28 |
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|---|
| 29 |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
|---|
| 30 |
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|---|
| 31 |
*/ |
|---|
| 32 |
|
|---|
| 33 |
#include "noit_defines.h" |
|---|
| 34 |
#include "eventer/eventer.h" |
|---|
| 35 |
#include "utils/noit_log.h" |
|---|
| 36 |
#include "utils/noit_skiplist.h" |
|---|
| 37 |
#include "eventer/dtrace_probes.h" |
|---|
| 38 |
#include <pthread.h> |
|---|
| 39 |
#include <errno.h> |
|---|
| 40 |
#include <assert.h> |
|---|
| 41 |
|
|---|
| 42 |
static struct timeval *eventer_impl_epoch = NULL; |
|---|
| 43 |
static int EVENTER_DEBUGGING = 0; |
|---|
| 44 |
static int desired_nofiles = 1024*1024; |
|---|
| 45 |
static pthread_mutex_t te_lock; |
|---|
| 46 |
static noit_skiplist *timed_events = NULL; |
|---|
| 47 |
|
|---|
| 48 |
#ifdef HAVE_KQUEUE |
|---|
| 49 |
extern struct _eventer_impl eventer_kqueue_impl; |
|---|
| 50 |
#endif |
|---|
| 51 |
#ifdef HAVE_EPOLL |
|---|
| 52 |
extern struct _eventer_impl eventer_epoll_impl; |
|---|
| 53 |
#endif |
|---|
| 54 |
#ifdef HAVE_PORTS |
|---|
| 55 |
extern struct _eventer_impl eventer_ports_impl; |
|---|
| 56 |
#endif |
|---|
| 57 |
|
|---|
| 58 |
eventer_impl_t registered_eventers[] = { |
|---|
| 59 |
#ifdef HAVE_KQUEUE |
|---|
| 60 |
&eventer_kqueue_impl, |
|---|
| 61 |
#endif |
|---|
| 62 |
#ifdef HAVE_EPOLL |
|---|
| 63 |
&eventer_epoll_impl, |
|---|
| 64 |
#endif |
|---|
| 65 |
#ifdef HAVE_PORTS |
|---|
| 66 |
&eventer_ports_impl, |
|---|
| 67 |
#endif |
|---|
| 68 |
NULL |
|---|
| 69 |
}; |
|---|
| 70 |
|
|---|
| 71 |
eventer_impl_t __eventer = NULL; |
|---|
| 72 |
noit_log_stream_t eventer_err = NULL; |
|---|
| 73 |
noit_log_stream_t eventer_deb = NULL; |
|---|
| 74 |
|
|---|
| 75 |
static int __default_queue_threads = 5; |
|---|
| 76 |
static int desired_limit = 1024 * 1024; |
|---|
| 77 |
static eventer_jobq_t __global_backq, __default_jobq; |
|---|
| 78 |
static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER; |
|---|
| 79 |
struct recurrent_events { |
|---|
| 80 |
eventer_t e; |
|---|
| 81 |
struct recurrent_events *next; |
|---|
| 82 |
} *recurrent_events = NULL; |
|---|
| 83 |
|
|---|
| 84 |
|
|---|
| 85 |
int eventer_impl_propset(const char *key, const char *value) { |
|---|
| 86 |
if(!strcasecmp(key, "default_queue_threads")) { |
|---|
| 87 |
__default_queue_threads = atoi(value); |
|---|
| 88 |
if(__default_queue_threads < 1) { |
|---|
| 89 |
noitL(noit_error, "default_queue_threads must be >= 1\n"); |
|---|
| 90 |
return -1; |
|---|
| 91 |
} |
|---|
| 92 |
return 0; |
|---|
| 93 |
} |
|---|
| 94 |
else if(!strcasecmp(key, "rlim_nofiles")) { |
|---|
| 95 |
desired_limit = atoi(value); |
|---|
| 96 |
if(desired_limit < 256) { |
|---|
| 97 |
noitL(noit_error, "rlim_nofiles must be >= 256\n"); |
|---|
| 98 |
return -1; |
|---|
| 99 |
} |
|---|
| 100 |
return 0; |
|---|
| 101 |
} |
|---|
| 102 |
else if(!strcasecmp(key, "debugging")) { |
|---|
| 103 |
if(strcmp(value, "0")) { |
|---|
| 104 |
EVENTER_DEBUGGING = 1; |
|---|
| 105 |
noitL(noit_error, "Enabling debugging from property\n"); |
|---|
| 106 |
} |
|---|
| 107 |
return 0; |
|---|
| 108 |
} |
|---|
| 109 |
else if(!strcasecmp(key, "default_ca_chain")) { |
|---|
| 110 |
/* used by eventer consumers */ |
|---|
| 111 |
return 0; |
|---|
| 112 |
} |
|---|
| 113 |
else if(!strcasecmp(key, "ssl_ctx_cache_expiry")) { |
|---|
| 114 |
eventer_ssl_set_ssl_ctx_cache_expiry(atoi(value)); |
|---|
| 115 |
return 0; |
|---|
| 116 |
} |
|---|
| 117 |
noitL(noit_error, "Warning: unknown eventer config '%s'\n", key); |
|---|
| 118 |
return 0; |
|---|
| 119 |
} |
|---|
| 120 |
|
|---|
| 121 |
eventer_jobq_t *eventer_default_backq() { |
|---|
| 122 |
return &__global_backq; |
|---|
| 123 |
} |
|---|
| 124 |
|
|---|
| 125 |
int eventer_get_epoch(struct timeval *epoch) { |
|---|
| 126 |
if(!eventer_impl_epoch) return -1; |
|---|
| 127 |
memcpy(epoch, eventer_impl_epoch, sizeof(*epoch)); |
|---|
| 128 |
return 0; |
|---|
| 129 |
} |
|---|
| 130 |
|
|---|
| 131 |
int eventer_impl_init() { |
|---|
| 132 |
struct rlimit rlim; |
|---|
| 133 |
int i, try; |
|---|
| 134 |
eventer_t e; |
|---|
| 135 |
char *evdeb; |
|---|
| 136 |
|
|---|
| 137 |
evdeb = getenv("EVENTER_DEBUGGING"); |
|---|
| 138 |
if(evdeb) { |
|---|
| 139 |
if(strcmp(evdeb, "0")) { |
|---|
| 140 |
/* Set to anything but "0" turns debugging on */ |
|---|
| 141 |
EVENTER_DEBUGGING = 1; |
|---|
| 142 |
noitL(noit_error, "Disabling eventer debugging from environment\n"); |
|---|
| 143 |
} |
|---|
| 144 |
else { |
|---|
| 145 |
EVENTER_DEBUGGING = 1; |
|---|
| 146 |
noitL(noit_error, "Enabling eventer debugging from environment\n"); |
|---|
| 147 |
} |
|---|
| 148 |
} |
|---|
| 149 |
eventer_name_callback("eventer_jobq_execute_timeout", |
|---|
| 150 |
eventer_jobq_execute_timeout); |
|---|
| 151 |
eventer_name_callback("eventer_jobq_consume_available", |
|---|
| 152 |
eventer_jobq_consume_available); |
|---|
| 153 |
|
|---|
| 154 |
getrlimit(RLIMIT_NOFILE, &rlim); |
|---|
| 155 |
rlim.rlim_cur = rlim.rlim_max = try = desired_nofiles; |
|---|
| 156 |
while(setrlimit(RLIMIT_NOFILE, &rlim) != 0 && errno == EPERM && try > desired_limit + 10) { |
|---|
| 157 |
noitL(noit_debug, "setrlimit(%u) : %s\n", (u_int32_t)rlim.rlim_cur, strerror(errno)); |
|---|
| 158 |
rlim.rlim_cur = rlim.rlim_max = --try; |
|---|
| 159 |
} |
|---|
| 160 |
getrlimit(RLIMIT_NOFILE, &rlim); |
|---|
| 161 |
noitL(noit_error, "rlim { %u, %u }\n", (u_int32_t)rlim.rlim_cur, (u_int32_t)rlim.rlim_max); |
|---|
| 162 |
|
|---|
| 163 |
eventer_impl_epoch = malloc(sizeof(struct timeval)); |
|---|
| 164 |
gettimeofday(eventer_impl_epoch, NULL); |
|---|
| 165 |
pthread_mutex_init(&te_lock, NULL); |
|---|
| 166 |
timed_events = calloc(1, sizeof(*timed_events)); |
|---|
| 167 |
noit_skiplist_init(timed_events); |
|---|
| 168 |
noit_skiplist_set_compare(timed_events, |
|---|
| 169 |
eventer_timecompare, eventer_timecompare); |
|---|
| 170 |
noit_skiplist_add_index(timed_events, |
|---|
| 171 |
noit_compare_voidptr, noit_compare_voidptr); |
|---|
| 172 |
|
|---|
| 173 |
eventer_err = noit_log_stream_find("error/eventer"); |
|---|
| 174 |
eventer_deb = noit_log_stream_find("debug/eventer"); |
|---|
| 175 |
if(!eventer_err) eventer_err = noit_stderr; |
|---|
| 176 |
if(!eventer_deb) eventer_deb = noit_debug; |
|---|
| 177 |
|
|---|
| 178 |
eventer_ssl_init(); |
|---|
| 179 |
eventer_jobq_init(&__global_backq, "default_back_queue"); |
|---|
| 180 |
e = eventer_alloc(); |
|---|
| 181 |
e->mask = EVENTER_RECURRENT; |
|---|
| 182 |
e->closure = &__global_backq; |
|---|
| 183 |
e->callback = eventer_jobq_consume_available; |
|---|
| 184 |
|
|---|
| 185 |
/* We call directly here as we may not be completely initialized */ |
|---|
| 186 |
eventer_add_recurrent(e); |
|---|
| 187 |
|
|---|
| 188 |
eventer_jobq_init(&__default_jobq, "default_queue"); |
|---|
| 189 |
__default_jobq.backq = &__global_backq; |
|---|
| 190 |
for(i=0; i<__default_queue_threads; i++) |
|---|
| 191 |
eventer_jobq_increase_concurrency(&__default_jobq); |
|---|
| 192 |
return 0; |
|---|
| 193 |
} |
|---|
| 194 |
|
|---|
| 195 |
void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) { |
|---|
| 196 |
eventer_job_t *job; |
|---|
| 197 |
job = calloc(1, sizeof(*job)); |
|---|
| 198 |
job->fd_event = e; |
|---|
| 199 |
job->jobq = q ? q : &__default_jobq; |
|---|
| 200 |
gettimeofday(&job->create_time, NULL); |
|---|
| 201 |
/* If we're debugging the eventer, these cross thread timeouts will |
|---|
| 202 |
* make it impossible for us to slowly trace an asynch job. */ |
|---|
| 203 |
if(!EVENTER_DEBUGGING && e->whence.tv_sec) { |
|---|
| 204 |
job->timeout_event = eventer_alloc(); |
|---|
| 205 |
memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence)); |
|---|
| 206 |
job->timeout_event->mask = EVENTER_TIMER; |
|---|
| 207 |
job->timeout_event->closure = job; |
|---|
| 208 |
job->timeout_event->callback = eventer_jobq_execute_timeout; |
|---|
| 209 |
eventer_add(job->timeout_event); |
|---|
| 210 |
} |
|---|
| 211 |
eventer_jobq_enqueue(q ? q : &__default_jobq, job); |
|---|
| 212 |
} |
|---|
| 213 |
|
|---|
| 214 |
void eventer_add_timed(eventer_t e) { |
|---|
| 215 |
assert(e->mask & EVENTER_TIMER); |
|---|
| 216 |
if(EVENTER_DEBUGGING) { |
|---|
| 217 |
const char *cbname; |
|---|
| 218 |
cbname = eventer_name_for_callback(e->callback); |
|---|
| 219 |
noitL(eventer_deb, "debug: eventer_add timed (%s)\n", |
|---|
| 220 |
cbname ? cbname : "???"); |
|---|
| 221 |
} |
|---|
| 222 |
pthread_mutex_lock(&te_lock); |
|---|
| 223 |
noit_skiplist_insert(timed_events, e); |
|---|
| 224 |
pthread_mutex_unlock(&te_lock); |
|---|
| 225 |
} |
|---|
| 226 |
eventer_t eventer_remove_timed(eventer_t e) { |
|---|
| 227 |
eventer_t removed = NULL; |
|---|
| 228 |
assert(e->mask & EVENTER_TIMER); |
|---|
| 229 |
pthread_mutex_lock(&te_lock); |
|---|
| 230 |
if(noit_skiplist_remove_compare(timed_events, e, NULL, |
|---|
| 231 |
noit_compare_voidptr)) |
|---|
| 232 |
removed = e; |
|---|
| 233 |
pthread_mutex_unlock(&te_lock); |
|---|
| 234 |
return removed; |
|---|
| 235 |
} |
|---|
| 236 |
void eventer_update_timed(eventer_t e, int mask) { |
|---|
| 237 |
assert(mask & EVENTER_TIMER); |
|---|
| 238 |
pthread_mutex_lock(&te_lock); |
|---|
| 239 |
noit_skiplist_remove_compare(timed_events, e, NULL, noit_compare_voidptr); |
|---|
| 240 |
noit_skiplist_insert(timed_events, e); |
|---|
| 241 |
pthread_mutex_unlock(&te_lock); |
|---|
| 242 |
} |
|---|
| 243 |
void eventer_dispatch_timed(struct timeval *now, struct timeval *next) { |
|---|
| 244 |
int max_timed_events_to_process; |
|---|
| 245 |
/* Handle timed events... |
|---|
| 246 |
* we could be multithreaded, so if we pop forever we could starve |
|---|
| 247 |
* ourselves. */ |
|---|
| 248 |
max_timed_events_to_process = timed_events->size; |
|---|
| 249 |
while(max_timed_events_to_process-- > 0) { |
|---|
| 250 |
int newmask; |
|---|
| 251 |
const char *cbname = NULL; |
|---|
| 252 |
eventer_t timed_event; |
|---|
| 253 |
|
|---|
| 254 |
gettimeofday(now, NULL); |
|---|
| 255 |
|
|---|
| 256 |
pthread_mutex_lock(&te_lock); |
|---|
| 257 |
/* Peek at our next timed event, if should fire, pop it. |
|---|
| 258 |
* otherwise we noop and NULL it out to break the loop. */ |
|---|
| 259 |
timed_event = noit_skiplist_peek(timed_events); |
|---|
| 260 |
if(timed_event) { |
|---|
| 261 |
if(compare_timeval(timed_event->whence, *now) < 0) { |
|---|
| 262 |
timed_event = noit_skiplist_pop(timed_events, NULL); |
|---|
| 263 |
} |
|---|
| 264 |
else { |
|---|
| 265 |
sub_timeval(timed_event->whence, *now, next); |
|---|
| 266 |
timed_event = NULL; |
|---|
| 267 |
} |
|---|
| 268 |
} |
|---|
| 269 |
pthread_mutex_unlock(&te_lock); |
|---|
| 270 |
if(timed_event == NULL) break; |
|---|
| 271 |
if(EVENTER_DEBUGGING || |
|---|
| 272 |
EVENTER_CALLBACK_ENTRY_ENABLED() || |
|---|
| 273 |
EVENTER_CALLBACK_RETURN_ENABLED()) { |
|---|
| 274 |
cbname = eventer_name_for_callback(timed_event->callback); |
|---|
| 275 |
noitLT(eventer_deb, now, "debug: timed dispatch(%s)\n", |
|---|
| 276 |
cbname ? cbname : "???"); |
|---|
| 277 |
} |
|---|
| 278 |
/* Make our call */ |
|---|
| 279 |
EVENTER_CALLBACK_ENTRY((void *)timed_event->callback, (char *)cbname, -1, |
|---|
| 280 |
timed_event->mask, EVENTER_TIMER); |
|---|
| 281 |
newmask = timed_event->callback(timed_event, EVENTER_TIMER, |
|---|
| 282 |
timed_event->closure, now); |
|---|
| 283 |
EVENTER_CALLBACK_RETURN((void *)timed_event->callback, (char *)cbname, newmask); |
|---|
| 284 |
if(newmask) |
|---|
| 285 |
eventer_add_timed(timed_event); |
|---|
| 286 |
else |
|---|
| 287 |
eventer_free(timed_event); |
|---|
| 288 |
} |
|---|
| 289 |
|
|---|
| 290 |
if(compare_timeval(eventer_max_sleeptime, *next) < 0) { |
|---|
| 291 |
/* we exceed our configured maximum, set it down */ |
|---|
| 292 |
memcpy(next, &eventer_max_sleeptime, sizeof(*next)); |
|---|
| 293 |
} |
|---|
| 294 |
} |
|---|
| 295 |
void |
|---|
| 296 |
eventer_foreach_timedevent (void (*f)(eventer_t e, void *), void *closure) { |
|---|
| 297 |
noit_skiplist_node *iter = NULL; |
|---|
| 298 |
pthread_mutex_lock(&te_lock); |
|---|
| 299 |
for(iter = noit_skiplist_getlist(timed_events); iter; |
|---|
| 300 |
noit_skiplist_next(timed_events,&iter)) { |
|---|
| 301 |
if(iter->data) f(iter->data, closure); |
|---|
| 302 |
} |
|---|
| 303 |
pthread_mutex_unlock(&te_lock); |
|---|
| 304 |
} |
|---|
| 305 |
|
|---|
| 306 |
void eventer_dispatch_recurrent(struct timeval *now) { |
|---|
| 307 |
struct recurrent_events *node; |
|---|
| 308 |
struct timeval __now; |
|---|
| 309 |
if(!now) { |
|---|
| 310 |
gettimeofday(&__now, NULL); |
|---|
| 311 |
now = &__now; |
|---|
| 312 |
} |
|---|
| 313 |
pthread_mutex_lock(&recurrent_lock); |
|---|
| 314 |
for(node = recurrent_events; node; node = node->next) { |
|---|
| 315 |
node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now); |
|---|
| 316 |
} |
|---|
| 317 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 318 |
} |
|---|
| 319 |
eventer_t eventer_remove_recurrent(eventer_t e) { |
|---|
| 320 |
struct recurrent_events *node, *prev = NULL; |
|---|
| 321 |
pthread_mutex_lock(&recurrent_lock); |
|---|
| 322 |
for(node = recurrent_events; node; node = node->next) { |
|---|
| 323 |
if(node->e == e) { |
|---|
| 324 |
if(prev) prev->next = node->next; |
|---|
| 325 |
else recurrent_events = node->next; |
|---|
| 326 |
free(node); |
|---|
| 327 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 328 |
return e; |
|---|
| 329 |
} |
|---|
| 330 |
prev = node; |
|---|
| 331 |
} |
|---|
| 332 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 333 |
return NULL; |
|---|
| 334 |
} |
|---|
| 335 |
void eventer_add_recurrent(eventer_t e) { |
|---|
| 336 |
struct recurrent_events *node; |
|---|
| 337 |
assert(e->mask & EVENTER_RECURRENT); |
|---|
| 338 |
pthread_mutex_lock(&recurrent_lock); |
|---|
| 339 |
for(node = recurrent_events; node; node = node->next) |
|---|
| 340 |
if(node->e == e) { |
|---|
| 341 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 342 |
return; |
|---|
| 343 |
} |
|---|
| 344 |
node = calloc(1, sizeof(*node)); |
|---|
| 345 |
node->e = e; |
|---|
| 346 |
node->next = recurrent_events; |
|---|
| 347 |
recurrent_events = node; |
|---|
| 348 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 349 |
} |
|---|
| 350 |
|
|---|