| 1 |
/* |
|---|
| 2 |
* Copyright (c) 2007, OmniTI Computer Consulting, Inc. |
|---|
| 3 |
* All rights reserved. |
|---|
| 4 |
* |
|---|
| 5 |
* Redistribution and use in source and binary forms, with or without |
|---|
| 6 |
* modification, are permitted provided that the following conditions are |
|---|
| 7 |
* met: |
|---|
| 8 |
* |
|---|
| 9 |
* * Redistributions of source code must retain the above copyright |
|---|
| 10 |
* notice, this list of conditions and the following disclaimer. |
|---|
| 11 |
* * Redistributions in binary form must reproduce the above |
|---|
| 12 |
* copyright notice, this list of conditions and the following |
|---|
| 13 |
* disclaimer in the documentation and/or other materials provided |
|---|
| 14 |
* with the distribution. |
|---|
| 15 |
* * Neither the name OmniTI Computer Consulting, Inc. nor the names |
|---|
| 16 |
* of its contributors may be used to endorse or promote products |
|---|
| 17 |
* derived from this software without specific prior written |
|---|
| 18 |
* permission. |
|---|
| 19 |
* |
|---|
| 20 |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
|---|
| 21 |
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
|---|
| 22 |
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR |
|---|
| 23 |
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT |
|---|
| 24 |
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, |
|---|
| 25 |
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT |
|---|
| 26 |
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, |
|---|
| 27 |
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY |
|---|
| 28 |
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT |
|---|
| 29 |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
|---|
| 30 |
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|---|
| 31 |
*/ |
|---|
| 32 |
|
|---|
| 33 |
#include "noit_defines.h" |
|---|
| 34 |
#include "eventer/eventer.h" |
|---|
| 35 |
#include "utils/noit_log.h" |
|---|
| 36 |
#include <pthread.h> |
|---|
| 37 |
#include <assert.h> |
|---|
| 38 |
|
|---|
| 39 |
static struct timeval *eventer_impl_epoch = NULL; |
|---|
| 40 |
static int EVENTER_DEBUGGING = 0; |
|---|
| 41 |
|
|---|
| 42 |
#ifdef HAVE_KQUEUE |
|---|
| 43 |
extern struct _eventer_impl eventer_kqueue_impl; |
|---|
| 44 |
#endif |
|---|
| 45 |
#ifdef HAVE_EPOLL |
|---|
| 46 |
extern struct _eventer_impl eventer_epoll_impl; |
|---|
| 47 |
#endif |
|---|
| 48 |
#ifdef HAVE_PORTS |
|---|
| 49 |
extern struct _eventer_impl eventer_ports_impl; |
|---|
| 50 |
#endif |
|---|
| 51 |
|
|---|
| 52 |
eventer_impl_t registered_eventers[] = { |
|---|
| 53 |
#ifdef HAVE_KQUEUE |
|---|
| 54 |
&eventer_kqueue_impl, |
|---|
| 55 |
#endif |
|---|
| 56 |
#ifdef HAVE_EPOLL |
|---|
| 57 |
&eventer_epoll_impl, |
|---|
| 58 |
#endif |
|---|
| 59 |
#ifdef HAVE_PORTS |
|---|
| 60 |
&eventer_ports_impl, |
|---|
| 61 |
#endif |
|---|
| 62 |
NULL |
|---|
| 63 |
}; |
|---|
| 64 |
|
|---|
| 65 |
eventer_impl_t __eventer = NULL; |
|---|
| 66 |
noit_log_stream_t eventer_err = NULL; |
|---|
| 67 |
noit_log_stream_t eventer_deb = NULL; |
|---|
| 68 |
|
|---|
| 69 |
static int __default_queue_threads = 5; |
|---|
| 70 |
static eventer_jobq_t __global_backq, __default_jobq; |
|---|
| 71 |
static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER; |
|---|
| 72 |
struct recurrent_events { |
|---|
| 73 |
eventer_t e; |
|---|
| 74 |
struct recurrent_events *next; |
|---|
| 75 |
} *recurrent_events = NULL; |
|---|
| 76 |
|
|---|
| 77 |
|
|---|
| 78 |
int eventer_impl_propset(const char *key, const char *value) { |
|---|
| 79 |
if(!strcasecmp(key, "default_queue_threads")) { |
|---|
| 80 |
__default_queue_threads = atoi(value); |
|---|
| 81 |
if(__default_queue_threads < 1) { |
|---|
| 82 |
noitL(noit_error, "default_queue_threads must be >= 1\n"); |
|---|
| 83 |
return -1; |
|---|
| 84 |
} |
|---|
| 85 |
return 0; |
|---|
| 86 |
} |
|---|
| 87 |
else if(!strcasecmp(key, "debugging")) { |
|---|
| 88 |
if(strcmp(value, "0")) { |
|---|
| 89 |
EVENTER_DEBUGGING = 1; |
|---|
| 90 |
noitL(noit_error, "Enabling debugging from property\n"); |
|---|
| 91 |
} |
|---|
| 92 |
return 0; |
|---|
| 93 |
} |
|---|
| 94 |
else if(!strcasecmp(key, "default_ca_chain")) { |
|---|
| 95 |
/* used by eventer consumers */ |
|---|
| 96 |
return 0; |
|---|
| 97 |
} |
|---|
| 98 |
noitL(noit_error, "Warning: unknown eventer config '%s'\n", key); |
|---|
| 99 |
return 0; |
|---|
| 100 |
} |
|---|
| 101 |
|
|---|
| 102 |
eventer_jobq_t *eventer_default_backq() { |
|---|
| 103 |
return &__global_backq; |
|---|
| 104 |
} |
|---|
| 105 |
|
|---|
| 106 |
int eventer_get_epoch(struct timeval *epoch) { |
|---|
| 107 |
if(!eventer_impl_epoch) return -1; |
|---|
| 108 |
memcpy(epoch, eventer_impl_epoch, sizeof(*epoch)); |
|---|
| 109 |
return 0; |
|---|
| 110 |
} |
|---|
| 111 |
|
|---|
| 112 |
int eventer_impl_init() { |
|---|
| 113 |
int i; |
|---|
| 114 |
eventer_t e; |
|---|
| 115 |
char *evdeb; |
|---|
| 116 |
|
|---|
| 117 |
evdeb = getenv("EVENTER_DEBUGGING"); |
|---|
| 118 |
if(evdeb) { |
|---|
| 119 |
if(strcmp(evdeb, "0")) { |
|---|
| 120 |
/* Set to anything but "0" turns debugging on */ |
|---|
| 121 |
EVENTER_DEBUGGING = 1; |
|---|
| 122 |
noitL(noit_error, "Disabling eventer debugging from environment\n"); |
|---|
| 123 |
} |
|---|
| 124 |
else { |
|---|
| 125 |
EVENTER_DEBUGGING = 1; |
|---|
| 126 |
noitL(noit_error, "Enabling eventer debugging from environment\n"); |
|---|
| 127 |
} |
|---|
| 128 |
} |
|---|
| 129 |
eventer_impl_epoch = malloc(sizeof(struct timeval)); |
|---|
| 130 |
gettimeofday(eventer_impl_epoch, NULL); |
|---|
| 131 |
|
|---|
| 132 |
eventer_err = noit_log_stream_find("error/eventer"); |
|---|
| 133 |
eventer_deb = noit_log_stream_find("debug/eventer"); |
|---|
| 134 |
if(!eventer_err) eventer_err = noit_stderr; |
|---|
| 135 |
if(!eventer_deb) eventer_deb = noit_debug; |
|---|
| 136 |
|
|---|
| 137 |
eventer_ssl_init(); |
|---|
| 138 |
eventer_jobq_init(&__global_backq, "default_back_queue"); |
|---|
| 139 |
e = eventer_alloc(); |
|---|
| 140 |
e->mask = EVENTER_RECURRENT; |
|---|
| 141 |
e->closure = &__global_backq; |
|---|
| 142 |
e->callback = eventer_jobq_consume_available; |
|---|
| 143 |
|
|---|
| 144 |
/* We call directly here as we may not be completely initialized */ |
|---|
| 145 |
eventer_add_recurrent(e); |
|---|
| 146 |
|
|---|
| 147 |
eventer_jobq_init(&__default_jobq, "default_queue"); |
|---|
| 148 |
__default_jobq.backq = &__global_backq; |
|---|
| 149 |
for(i=0; i<__default_queue_threads; i++) |
|---|
| 150 |
eventer_jobq_increase_concurrency(&__default_jobq); |
|---|
| 151 |
return 0; |
|---|
| 152 |
} |
|---|
| 153 |
|
|---|
| 154 |
void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) { |
|---|
| 155 |
eventer_job_t *job; |
|---|
| 156 |
job = calloc(1, sizeof(*job)); |
|---|
| 157 |
job->fd_event = e; |
|---|
| 158 |
gettimeofday(&job->create_time, NULL); |
|---|
| 159 |
/* If we're debugging the eventer, these cross thread timeouts will |
|---|
| 160 |
* make it impossible for us to slowly trace an asynch job. */ |
|---|
| 161 |
if(!EVENTER_DEBUGGING && e->whence.tv_sec) { |
|---|
| 162 |
job->timeout_event = eventer_alloc(); |
|---|
| 163 |
memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence)); |
|---|
| 164 |
job->timeout_event->mask = EVENTER_TIMER; |
|---|
| 165 |
job->timeout_event->closure = job; |
|---|
| 166 |
job->timeout_event->callback = eventer_jobq_execute_timeout; |
|---|
| 167 |
eventer_add(job->timeout_event); |
|---|
| 168 |
} |
|---|
| 169 |
eventer_jobq_enqueue(q ? q : &__default_jobq, job); |
|---|
| 170 |
} |
|---|
| 171 |
|
|---|
| 172 |
void eventer_dispatch_recurrent(struct timeval *now) { |
|---|
| 173 |
struct recurrent_events *node; |
|---|
| 174 |
struct timeval __now; |
|---|
| 175 |
if(!now) { |
|---|
| 176 |
gettimeofday(&__now, NULL); |
|---|
| 177 |
now = &__now; |
|---|
| 178 |
} |
|---|
| 179 |
pthread_mutex_lock(&recurrent_lock); |
|---|
| 180 |
for(node = recurrent_events; node; node = node->next) { |
|---|
| 181 |
node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now); |
|---|
| 182 |
} |
|---|
| 183 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 184 |
} |
|---|
| 185 |
eventer_t eventer_remove_recurrent(eventer_t e) { |
|---|
| 186 |
struct recurrent_events *node, *prev = NULL; |
|---|
| 187 |
pthread_mutex_lock(&recurrent_lock); |
|---|
| 188 |
for(node = recurrent_events; node; node = node->next) { |
|---|
| 189 |
if(node->e == e) { |
|---|
| 190 |
if(prev) prev->next = node->next; |
|---|
| 191 |
else recurrent_events = node->next; |
|---|
| 192 |
free(node); |
|---|
| 193 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 194 |
return e; |
|---|
| 195 |
} |
|---|
| 196 |
prev = node; |
|---|
| 197 |
} |
|---|
| 198 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 199 |
return NULL; |
|---|
| 200 |
} |
|---|
| 201 |
void eventer_add_recurrent(eventer_t e) { |
|---|
| 202 |
struct recurrent_events *node; |
|---|
| 203 |
assert(e->mask & EVENTER_RECURRENT); |
|---|
| 204 |
pthread_mutex_lock(&recurrent_lock); |
|---|
| 205 |
for(node = recurrent_events; node; node = node->next) |
|---|
| 206 |
if(node->e == e) { |
|---|
| 207 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 208 |
return; |
|---|
| 209 |
} |
|---|
| 210 |
node = calloc(1, sizeof(*node)); |
|---|
| 211 |
node->e = e; |
|---|
| 212 |
node->next = recurrent_events; |
|---|
| 213 |
recurrent_events = node; |
|---|
| 214 |
pthread_mutex_unlock(&recurrent_lock); |
|---|
| 215 |
} |
|---|
| 216 |
|
|---|