root/src/eventer/eventer_impl.c

Revision 84d6f13ffc15b3f1bb50df2ac835b56f70179b3e, 4.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

closes #78

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include <pthread.h>
10 #include <assert.h>
11
12 static struct timeval *eventer_impl_epoch = NULL;
13
14 #ifdef HAVE_KQUEUE
15 extern struct _eventer_impl eventer_kqueue_impl;
16 #endif
17 #ifdef HAVE_EPOLL
18 extern struct _eventer_impl eventer_epoll_impl;
19 #endif
20 #ifdef HAVE_PORTS
21 extern struct _eventer_impl eventer_ports_impl;
22 #endif
23
24 eventer_impl_t registered_eventers[] = {
25 #ifdef HAVE_KQUEUE
26   &eventer_kqueue_impl,
27 #endif
28 #ifdef HAVE_EPOLL
29   &eventer_epoll_impl,
30 #endif
31 #ifdef HAVE_PORTS
32   &eventer_ports_impl,
33 #endif
34   NULL
35 };
36
37 eventer_impl_t __eventer = NULL;
38 noit_log_stream_t eventer_err = NULL;
39 noit_log_stream_t eventer_deb = NULL;
40
41 static int __default_queue_threads = 5;
42 static eventer_jobq_t __global_backq, __default_jobq;
43 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
44 struct recurrent_events {
45   eventer_t e;
46   struct recurrent_events *next;
47 } *recurrent_events = NULL;
48
49
50 int eventer_impl_propset(const char *key, const char *value) {
51   if(!strcasecmp(key, "default_queue_threads")) {
52     __default_queue_threads = atoi(value);
53     if(__default_queue_threads < 1) {
54       noitL(noit_error, "default_queue_threads must be >= 1\n");
55       return -1;
56     }
57     return 0;
58   }
59   return -1;
60 }
61
62 eventer_jobq_t *eventer_default_backq() {
63   return &__global_backq;
64 }
65
66 int eventer_get_epoch(struct timeval *epoch) {
67   if(!eventer_impl_epoch) return -1;
68   memcpy(epoch, eventer_impl_epoch, sizeof(*epoch));
69   return 0;
70 }
71
72 int eventer_impl_init() {
73   int i;
74   eventer_t e;
75
76   eventer_impl_epoch = malloc(sizeof(struct timeval));
77   gettimeofday(eventer_impl_epoch, NULL);
78
79   eventer_err = noit_log_stream_find("error/eventer");
80   eventer_deb = noit_log_stream_find("debug/eventer");
81   if(!eventer_err) eventer_err = noit_stderr;
82   if(!eventer_deb) eventer_deb = noit_debug;
83
84   eventer_ssl_init();
85   eventer_jobq_init(&__global_backq, "default_back_queue");
86   e = eventer_alloc();
87   e->mask = EVENTER_RECURRENT;
88   e->closure = &__global_backq;
89   e->callback = eventer_jobq_consume_available;
90
91   /* We call directly here as we may not be completely initialized */
92   eventer_add_recurrent(e);
93
94   eventer_jobq_init(&__default_jobq, "default_queue");
95   __default_jobq.backq = &__global_backq;
96   for(i=0; i<__default_queue_threads; i++)
97     eventer_jobq_increase_concurrency(&__default_jobq);
98   return 0;
99 }
100
101 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
102   eventer_job_t *job;
103   job = calloc(1, sizeof(*job));
104   job->fd_event = e;
105   gettimeofday(&job->create_time, NULL);
106   if(e->whence.tv_sec) {
107     job->timeout_event = eventer_alloc();
108     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
109     job->timeout_event->mask = EVENTER_TIMER;
110     job->timeout_event->closure = job;
111     job->timeout_event->callback = eventer_jobq_execute_timeout;
112     eventer_add(job->timeout_event);
113   }
114   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
115 }
116
117 void eventer_dispatch_recurrent(struct timeval *now) {
118   struct recurrent_events *node;
119   struct timeval __now;
120   if(!now) {
121     gettimeofday(&__now, NULL);
122     now = &__now;
123   }
124   pthread_mutex_lock(&recurrent_lock);
125   for(node = recurrent_events; node; node = node->next) {
126     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
127   }
128   pthread_mutex_unlock(&recurrent_lock);
129 }
130 eventer_t eventer_remove_recurrent(eventer_t e) {
131   struct recurrent_events *node, *prev = NULL;
132   pthread_mutex_lock(&recurrent_lock);
133   for(node = recurrent_events; node; node = node->next) {
134     if(node->e == e) {
135       if(prev) prev->next = node->next;
136       else recurrent_events = node->next;
137       free(node);
138       pthread_mutex_unlock(&recurrent_lock);
139       return e;
140     }
141     prev = node;
142   }
143   pthread_mutex_unlock(&recurrent_lock);
144   return NULL;
145 }
146 void eventer_add_recurrent(eventer_t e) {
147   struct recurrent_events *node;
148   assert(e->mask & EVENTER_RECURRENT);
149   pthread_mutex_lock(&recurrent_lock);
150   for(node = recurrent_events; node; node = node->next)
151     if(node->e == e) {
152       pthread_mutex_unlock(&recurrent_lock);
153       return;
154     }
155   node = calloc(1, sizeof(*node));
156   node->e = e;
157   node->next = recurrent_events;
158   recurrent_events = node;
159   pthread_mutex_unlock(&recurrent_lock);
160 }
161
Note: See TracBrowser for help on using the browser.