root/src/eventer/eventer_impl.c

Revision e218b2e668e2669ac1172a80f29dcece78664779, 3.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

epoll eventer implementation

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include <pthread.h>
10 #include <assert.h>
11
12 #ifdef HAVE_KQUEUE
13 extern struct _eventer_impl eventer_kqueue_impl;
14 #endif
15 #ifdef HAVE_EPOLL
16 extern struct _eventer_impl eventer_epoll_impl;
17 #endif
18
19 eventer_impl_t registered_eventers[] = {
20 #ifdef HAVE_KQUEUE
21   &eventer_kqueue_impl,
22 #endif
23 #ifdef HAVE_EPOLL
24   &eventer_epoll_impl,
25 #endif
26   NULL
27 };
28
29 eventer_impl_t __eventer = NULL;
30 noit_log_stream_t eventer_err = NULL;
31 noit_log_stream_t eventer_deb = NULL;
32
33 static int __default_queue_threads = 5;
34 static eventer_jobq_t __global_backq, __default_jobq;
35 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
36 struct recurrent_events {
37   eventer_t e;
38   struct recurrent_events *next;
39 } *recurrent_events = NULL;
40
41
42 int eventer_impl_propset(const char *key, const char *value) {
43   if(!strcasecmp(key, "default_queue_threads")) {
44     __default_queue_threads = atoi(value);
45     if(__default_queue_threads < 1) {
46       noitL(noit_error, "default_queue_threads must be >= 1\n");
47       return -1;
48     }
49     return 0;
50   }
51   return -1;
52 }
53
54 eventer_jobq_t *eventer_default_backq() {
55   return &__global_backq;
56 }
57
58 int eventer_impl_init() {
59   int i;
60   eventer_t e;
61
62   eventer_err = noit_log_stream_find("error/eventer");
63   eventer_deb = noit_log_stream_find("debug/eventer");
64   if(!eventer_err) eventer_err = noit_stderr;
65   if(!eventer_deb) eventer_deb = noit_debug;
66
67   eventer_ssl_init();
68   eventer_jobq_init(&__global_backq);
69   e = eventer_alloc();
70   e->mask = EVENTER_RECURRENT;
71   e->closure = &__global_backq;
72   e->callback = eventer_jobq_consume_available;
73
74   /* We call directly here as we may not be completely initialized */
75   eventer_add_recurrent(e);
76
77   eventer_jobq_init(&__default_jobq);
78   __default_jobq.backq = &__global_backq;
79   for(i=0; i<__default_queue_threads; i++)
80     eventer_jobq_increase_concurrency(&__default_jobq);
81   return 0;
82 }
83
84 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
85   eventer_job_t *job;
86   job = calloc(1, sizeof(*job));
87   job->fd_event = e;
88   gettimeofday(&job->create_time, NULL);
89   if(e->whence.tv_sec) {
90     job->timeout_event = eventer_alloc();
91     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
92     job->timeout_event->mask = EVENTER_TIMER;
93     job->timeout_event->closure = job;
94     job->timeout_event->callback = eventer_jobq_execute_timeout;
95     eventer_add(job->timeout_event);
96   }
97   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
98 }
99
100 void eventer_dispatch_recurrent(struct timeval *now) {
101   struct recurrent_events *node;
102   struct timeval __now;
103   if(!now) {
104     gettimeofday(&__now, NULL);
105     now = &__now;
106   }
107   pthread_mutex_lock(&recurrent_lock);
108   for(node = recurrent_events; node; node = node->next) {
109     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
110   }
111   pthread_mutex_unlock(&recurrent_lock);
112 }
113 eventer_t eventer_remove_recurrent(eventer_t e) {
114   struct recurrent_events *node, *prev = NULL;
115   pthread_mutex_lock(&recurrent_lock);
116   for(node = recurrent_events; node; node = node->next) {
117     if(node->e == e) {
118       if(prev) prev->next = node->next;
119       else recurrent_events = node->next;
120       free(node);
121       pthread_mutex_unlock(&recurrent_lock);
122       return e;
123     }
124     prev = node;
125   }
126   pthread_mutex_unlock(&recurrent_lock);
127   return NULL;
128 }
129 void eventer_add_recurrent(eventer_t e) {
130   struct recurrent_events *node;
131   assert(e->mask & EVENTER_RECURRENT);
132   pthread_mutex_lock(&recurrent_lock);
133   for(node = recurrent_events; node; node = node->next)
134     if(node->e == e) {
135       pthread_mutex_unlock(&recurrent_lock);
136       return;
137     }
138   node = calloc(1, sizeof(*node));
139   node->e = e;
140   node->next = recurrent_events;
141   recurrent_events = node;
142   pthread_mutex_unlock(&recurrent_lock);
143 }
144
Note: See TracBrowser for help on using the browser.