root/src/eventer/eventer_impl.c

Revision 28b073c9b8925b6b5bab4ecdf6b76690e8adf946, 6.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

ship a default CA list for eventer consumers, refs #13

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include <pthread.h>
37 #include <assert.h>
38
39 static struct timeval *eventer_impl_epoch = NULL;
40 static int EVENTER_DEBUGGING = 0;
41
42 #ifdef HAVE_KQUEUE
43 extern struct _eventer_impl eventer_kqueue_impl;
44 #endif
45 #ifdef HAVE_EPOLL
46 extern struct _eventer_impl eventer_epoll_impl;
47 #endif
48 #ifdef HAVE_PORTS
49 extern struct _eventer_impl eventer_ports_impl;
50 #endif
51
52 eventer_impl_t registered_eventers[] = {
53 #ifdef HAVE_KQUEUE
54   &eventer_kqueue_impl,
55 #endif
56 #ifdef HAVE_EPOLL
57   &eventer_epoll_impl,
58 #endif
59 #ifdef HAVE_PORTS
60   &eventer_ports_impl,
61 #endif
62   NULL
63 };
64
65 eventer_impl_t __eventer = NULL;
66 noit_log_stream_t eventer_err = NULL;
67 noit_log_stream_t eventer_deb = NULL;
68
69 static int __default_queue_threads = 5;
70 static eventer_jobq_t __global_backq, __default_jobq;
71 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
72 struct recurrent_events {
73   eventer_t e;
74   struct recurrent_events *next;
75 } *recurrent_events = NULL;
76
77
78 int eventer_impl_propset(const char *key, const char *value) {
79   if(!strcasecmp(key, "default_queue_threads")) {
80     __default_queue_threads = atoi(value);
81     if(__default_queue_threads < 1) {
82       noitL(noit_error, "default_queue_threads must be >= 1\n");
83       return -1;
84     }
85     return 0;
86   }
87   else if(!strcasecmp(key, "debugging")) {
88     if(strcmp(value, "0")) {
89       EVENTER_DEBUGGING = 1;
90       noitL(noit_error, "Enabling debugging from property\n");
91     }
92     return 0;
93   }
94   else if(!strcasecmp(key, "default_ca_chain")) {
95     /* used by eventer consumers */
96     return 0;
97   }
98   noitL(noit_error, "Warning: unknown eventer config '%s'\n", key);
99   return 0;
100 }
101
102 eventer_jobq_t *eventer_default_backq() {
103   return &__global_backq;
104 }
105
106 int eventer_get_epoch(struct timeval *epoch) {
107   if(!eventer_impl_epoch) return -1;
108   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
109   return 0;
110 }
111
112 int eventer_impl_init() {
113   int i;
114   eventer_t e;
115   char *evdeb;
116
117   evdeb = getenv("EVENTER_DEBUGGING");
118   if(evdeb) {
119     if(strcmp(evdeb, "0")) {
120       /* Set to anything but "0" turns debugging on */
121       EVENTER_DEBUGGING = 1;
122       noitL(noit_error, "Disabling eventer debugging from environment\n");
123     }
124     else {
125       EVENTER_DEBUGGING = 1;
126       noitL(noit_error, "Enabling eventer debugging from environment\n");
127     }
128   }
129   eventer_impl_epoch = malloc(sizeof(struct timeval));
130   gettimeofday(eventer_impl_epoch, NULL);
131
132   eventer_err = noit_log_stream_find("error/eventer");
133   eventer_deb = noit_log_stream_find("debug/eventer");
134   if(!eventer_err) eventer_err = noit_stderr;
135   if(!eventer_deb) eventer_deb = noit_debug;
136
137   eventer_ssl_init();
138   eventer_jobq_init(&__global_backq, "default_back_queue");
139   e = eventer_alloc();
140   e->mask = EVENTER_RECURRENT;
141   e->closure = &__global_backq;
142   e->callback = eventer_jobq_consume_available;
143
144   /* We call directly here as we may not be completely initialized */
145   eventer_add_recurrent(e);
146
147   eventer_jobq_init(&__default_jobq, "default_queue");
148   __default_jobq.backq = &__global_backq;
149   for(i=0; i<__default_queue_threads; i++)
150     eventer_jobq_increase_concurrency(&__default_jobq);
151   return 0;
152 }
153
154 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
155   eventer_job_t *job;
156   job = calloc(1, sizeof(*job));
157   job->fd_event = e;
158   gettimeofday(&job->create_time, NULL);
159   /* If we're debugging the eventer, these cross thread timeouts will
160    * make it impossible for us to slowly trace an asynch job. */
161   if(!EVENTER_DEBUGGING && e->whence.tv_sec) {
162     job->timeout_event = eventer_alloc();
163     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
164     job->timeout_event->mask = EVENTER_TIMER;
165     job->timeout_event->closure = job;
166     job->timeout_event->callback = eventer_jobq_execute_timeout;
167     eventer_add(job->timeout_event);
168   }
169   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
170 }
171
172 void eventer_dispatch_recurrent(struct timeval *now) {
173   struct recurrent_events *node;
174   struct timeval __now;
175   if(!now) {
176     gettimeofday(&__now, NULL);
177     now = &__now;
178   }
179   pthread_mutex_lock(&recurrent_lock);
180   for(node = recurrent_events; node; node = node->next) {
181     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
182   }
183   pthread_mutex_unlock(&recurrent_lock);
184 }
185 eventer_t eventer_remove_recurrent(eventer_t e) {
186   struct recurrent_events *node, *prev = NULL;
187   pthread_mutex_lock(&recurrent_lock);
188   for(node = recurrent_events; node; node = node->next) {
189     if(node->e == e) {
190       if(prev) prev->next = node->next;
191       else recurrent_events = node->next;
192       free(node);
193       pthread_mutex_unlock(&recurrent_lock);
194       return e;
195     }
196     prev = node;
197   }
198   pthread_mutex_unlock(&recurrent_lock);
199   return NULL;
200 }
201 void eventer_add_recurrent(eventer_t e) {
202   struct recurrent_events *node;
203   assert(e->mask & EVENTER_RECURRENT);
204   pthread_mutex_lock(&recurrent_lock);
205   for(node = recurrent_events; node; node = node->next)
206     if(node->e == e) {
207       pthread_mutex_unlock(&recurrent_lock);
208       return;
209     }
210   node = calloc(1, sizeof(*node));
211   node->e = e;
212   node->next = recurrent_events;
213   recurrent_events = node;
214   pthread_mutex_unlock(&recurrent_lock);
215 }
216
Note: See TracBrowser for help on using the browser.