root/src/eventer/eventer_impl.c

Revision 4c963d7e24928576cb0e4a600c35e964b4ba5cca, 3.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

write the eventer... checks are working, but the telnet console is not, refs #32

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include <pthread.h>
10 #include <assert.h>
11
12 #ifdef HAVE_KQUEUE
13 extern struct _eventer_impl eventer_kqueue_impl;
14 #endif
15 #ifdef HAVE_EPOLL
16 extern struct _eventer_impl eventer_epoll_impl;
17 #endif
18 #ifdef HAVE_PORTS
19 extern struct _eventer_impl eventer_ports_impl;
20 #endif
21
22 eventer_impl_t registered_eventers[] = {
23 #ifdef HAVE_KQUEUE
24   &eventer_kqueue_impl,
25 #endif
26 #ifdef HAVE_EPOLL
27   &eventer_epoll_impl,
28 #endif
29 #ifdef HAVE_PORTS
30   &eventer_ports_impl,
31 #endif
32   NULL
33 };
34
35 eventer_impl_t __eventer = NULL;
36 noit_log_stream_t eventer_err = NULL;
37 noit_log_stream_t eventer_deb = NULL;
38
39 static int __default_queue_threads = 5;
40 static eventer_jobq_t __global_backq, __default_jobq;
41 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
42 struct recurrent_events {
43   eventer_t e;
44   struct recurrent_events *next;
45 } *recurrent_events = NULL;
46
47
48 int eventer_impl_propset(const char *key, const char *value) {
49   if(!strcasecmp(key, "default_queue_threads")) {
50     __default_queue_threads = atoi(value);
51     if(__default_queue_threads < 1) {
52       noitL(noit_error, "default_queue_threads must be >= 1\n");
53       return -1;
54     }
55     return 0;
56   }
57   return -1;
58 }
59
60 eventer_jobq_t *eventer_default_backq() {
61   return &__global_backq;
62 }
63
64 int eventer_impl_init() {
65   int i;
66   eventer_t e;
67
68   eventer_err = noit_log_stream_find("error/eventer");
69   eventer_deb = noit_log_stream_find("debug/eventer");
70   if(!eventer_err) eventer_err = noit_stderr;
71   if(!eventer_deb) eventer_deb = noit_debug;
72
73   eventer_ssl_init();
74   eventer_jobq_init(&__global_backq);
75   e = eventer_alloc();
76   e->mask = EVENTER_RECURRENT;
77   e->closure = &__global_backq;
78   e->callback = eventer_jobq_consume_available;
79
80   /* We call directly here as we may not be completely initialized */
81   eventer_add_recurrent(e);
82
83   eventer_jobq_init(&__default_jobq);
84   __default_jobq.backq = &__global_backq;
85   for(i=0; i<__default_queue_threads; i++)
86     eventer_jobq_increase_concurrency(&__default_jobq);
87   return 0;
88 }
89
90 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
91   eventer_job_t *job;
92   job = calloc(1, sizeof(*job));
93   job->fd_event = e;
94   gettimeofday(&job->create_time, NULL);
95   if(e->whence.tv_sec) {
96     job->timeout_event = eventer_alloc();
97     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
98     job->timeout_event->mask = EVENTER_TIMER;
99     job->timeout_event->closure = job;
100     job->timeout_event->callback = eventer_jobq_execute_timeout;
101     eventer_add(job->timeout_event);
102   }
103   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
104 }
105
106 void eventer_dispatch_recurrent(struct timeval *now) {
107   struct recurrent_events *node;
108   struct timeval __now;
109   if(!now) {
110     gettimeofday(&__now, NULL);
111     now = &__now;
112   }
113   pthread_mutex_lock(&recurrent_lock);
114   for(node = recurrent_events; node; node = node->next) {
115     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
116   }
117   pthread_mutex_unlock(&recurrent_lock);
118 }
119 eventer_t eventer_remove_recurrent(eventer_t e) {
120   struct recurrent_events *node, *prev = NULL;
121   pthread_mutex_lock(&recurrent_lock);
122   for(node = recurrent_events; node; node = node->next) {
123     if(node->e == e) {
124       if(prev) prev->next = node->next;
125       else recurrent_events = node->next;
126       free(node);
127       pthread_mutex_unlock(&recurrent_lock);
128       return e;
129     }
130     prev = node;
131   }
132   pthread_mutex_unlock(&recurrent_lock);
133   return NULL;
134 }
135 void eventer_add_recurrent(eventer_t e) {
136   struct recurrent_events *node;
137   assert(e->mask & EVENTER_RECURRENT);
138   pthread_mutex_lock(&recurrent_lock);
139   for(node = recurrent_events; node; node = node->next)
140     if(node->e == e) {
141       pthread_mutex_unlock(&recurrent_lock);
142       return;
143     }
144   node = calloc(1, sizeof(*node));
145   node->e = e;
146   node->next = recurrent_events;
147   recurrent_events = node;
148   pthread_mutex_unlock(&recurrent_lock);
149 }
150
Note: See TracBrowser for help on using the browser.