root/src/eventer/eventer_impl.c

Revision a9077178423e39a94a9b624e44cd4b37899d6fd3, 5.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include <pthread.h>
10 #include <assert.h>
11
12 static struct timeval *eventer_impl_epoch = NULL;
13 static int EVENTER_DEBUGGING = 0;
14
15 #ifdef HAVE_KQUEUE
16 extern struct _eventer_impl eventer_kqueue_impl;
17 #endif
18 #ifdef HAVE_EPOLL
19 extern struct _eventer_impl eventer_epoll_impl;
20 #endif
21 #ifdef HAVE_PORTS
22 extern struct _eventer_impl eventer_ports_impl;
23 #endif
24
25 eventer_impl_t registered_eventers[] = {
26 #ifdef HAVE_KQUEUE
27   &eventer_kqueue_impl,
28 #endif
29 #ifdef HAVE_EPOLL
30   &eventer_epoll_impl,
31 #endif
32 #ifdef HAVE_PORTS
33   &eventer_ports_impl,
34 #endif
35   NULL
36 };
37
38 eventer_impl_t __eventer = NULL;
39 noit_log_stream_t eventer_err = NULL;
40 noit_log_stream_t eventer_deb = NULL;
41
42 static int __default_queue_threads = 5;
43 static eventer_jobq_t __global_backq, __default_jobq;
44 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
45 struct recurrent_events {
46   eventer_t e;
47   struct recurrent_events *next;
48 } *recurrent_events = NULL;
49
50
51 int eventer_impl_propset(const char *key, const char *value) {
52   if(!strcasecmp(key, "default_queue_threads")) {
53     __default_queue_threads = atoi(value);
54     if(__default_queue_threads < 1) {
55       noitL(noit_error, "default_queue_threads must be >= 1\n");
56       return -1;
57     }
58     return 0;
59   }
60   else if(!strcasecmp(key, "debugging")) {
61     if(strcmp(value, "0")) {
62       EVENTER_DEBUGGING = 1;
63       noitL(noit_error, "Enabling debugging from property\n");
64     }
65     return 0;
66   }
67   return -1;
68 }
69
70 eventer_jobq_t *eventer_default_backq() {
71   return &__global_backq;
72 }
73
74 int eventer_get_epoch(struct timeval *epoch) {
75   if(!eventer_impl_epoch) return -1;
76   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
77   return 0;
78 }
79
80 int eventer_impl_init() {
81   int i;
82   eventer_t e;
83   char *evdeb;
84
85   evdeb = getenv("EVENTER_DEBUGGING");
86   if(evdeb) {
87     if(strcmp(evdeb, "0")) {
88       /* Set to anything but "0" turns debugging on */
89       EVENTER_DEBUGGING = 1;
90       noitL(noit_error, "Disabling eventer debugging from environment\n");
91     }
92     else {
93       EVENTER_DEBUGGING = 1;
94       noitL(noit_error, "Enabling eventer debugging from environment\n");
95     }
96   }
97   eventer_impl_epoch = malloc(sizeof(struct timeval));
98   gettimeofday(eventer_impl_epoch, NULL);
99
100   eventer_err = noit_log_stream_find("error/eventer");
101   eventer_deb = noit_log_stream_find("debug/eventer");
102   if(!eventer_err) eventer_err = noit_stderr;
103   if(!eventer_deb) eventer_deb = noit_debug;
104
105   eventer_ssl_init();
106   eventer_jobq_init(&__global_backq, "default_back_queue");
107   e = eventer_alloc();
108   e->mask = EVENTER_RECURRENT;
109   e->closure = &__global_backq;
110   e->callback = eventer_jobq_consume_available;
111
112   /* We call directly here as we may not be completely initialized */
113   eventer_add_recurrent(e);
114
115   eventer_jobq_init(&__default_jobq, "default_queue");
116   __default_jobq.backq = &__global_backq;
117   for(i=0; i<__default_queue_threads; i++)
118     eventer_jobq_increase_concurrency(&__default_jobq);
119   return 0;
120 }
121
122 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
123   eventer_job_t *job;
124   job = calloc(1, sizeof(*job));
125   job->fd_event = e;
126   gettimeofday(&job->create_time, NULL);
127   /* If we're debugging the eventer, these cross thread timeouts will
128    * make it impossible for us to slowly trace an asynch job. */
129   if(!EVENTER_DEBUGGING && e->whence.tv_sec) {
130     job->timeout_event = eventer_alloc();
131     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
132     job->timeout_event->mask = EVENTER_TIMER;
133     job->timeout_event->closure = job;
134     job->timeout_event->callback = eventer_jobq_execute_timeout;
135     eventer_add(job->timeout_event);
136   }
137   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
138 }
139
140 void eventer_dispatch_recurrent(struct timeval *now) {
141   struct recurrent_events *node;
142   struct timeval __now;
143   if(!now) {
144     gettimeofday(&__now, NULL);
145     now = &__now;
146   }
147   pthread_mutex_lock(&recurrent_lock);
148   for(node = recurrent_events; node; node = node->next) {
149     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
150   }
151   pthread_mutex_unlock(&recurrent_lock);
152 }
153 eventer_t eventer_remove_recurrent(eventer_t e) {
154   struct recurrent_events *node, *prev = NULL;
155   pthread_mutex_lock(&recurrent_lock);
156   for(node = recurrent_events; node; node = node->next) {
157     if(node->e == e) {
158       if(prev) prev->next = node->next;
159       else recurrent_events = node->next;
160       free(node);
161       pthread_mutex_unlock(&recurrent_lock);
162       return e;
163     }
164     prev = node;
165   }
166   pthread_mutex_unlock(&recurrent_lock);
167   return NULL;
168 }
169 void eventer_add_recurrent(eventer_t e) {
170   struct recurrent_events *node;
171   assert(e->mask & EVENTER_RECURRENT);
172   pthread_mutex_lock(&recurrent_lock);
173   for(node = recurrent_events; node; node = node->next)
174     if(node->e == e) {
175       pthread_mutex_unlock(&recurrent_lock);
176       return;
177     }
178   node = calloc(1, sizeof(*node));
179   node->e = e;
180   node->next = recurrent_events;
181   recurrent_events = node;
182   pthread_mutex_unlock(&recurrent_lock);
183 }
184
Note: See TracBrowser for help on using the browser.