root/src/eventer/eventer_impl.c

Revision b7ec8071a69b07dca745fa521b287c9f4ffec06e, 3.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

SSL support that doesn't quite work -- so so so close.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "eventer/eventer_impl.h"
9 #include "eventer/eventer_jobq.h"
10 #include "utils/noit_log.h"
11 #include <pthread.h>
12 #include <assert.h>
13
14 #ifdef HAVE_KQUEUE
15 extern struct _eventer_impl eventer_kqueue_impl;
16 #endif
17
18 eventer_impl_t registered_eventers[] = {
19 #ifdef HAVE_KQUEUE
20   &eventer_kqueue_impl,
21 #endif
22   NULL
23 };
24
25 eventer_impl_t __eventer = NULL;
26
27
28 static int __default_queue_threads = 5;
29 static eventer_jobq_t __global_backq, __default_jobq;
30 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
31 struct recurrent_events {
32   eventer_t e;
33   struct recurrent_events *next;
34 } *recurrent_events = NULL;
35
36
37 int eventer_impl_propset(const char *key, const char *value) {
38   if(!strcasecmp(key, "default_queue_threads")) {
39     __default_queue_threads = atoi(value);
40     if(__default_queue_threads < 1) {
41       noitL(noit_error, "default_queue_threads must be >= 1\n");
42       return -1;
43     }
44     return 0;
45   }
46   return -1;
47 }
48
49 int eventer_impl_init() {
50   int i;
51   eventer_t e;
52   eventer_ssl_init();
53   eventer_jobq_init(&__global_backq);
54   e = eventer_alloc();
55   e->mask = EVENTER_RECURRENT;
56   e->closure = &__global_backq;
57   e->callback = eventer_jobq_consume_available;
58
59   /* We call directly here as we may not be completely initialized */
60   eventer_add_recurrent(e);
61
62   eventer_jobq_init(&__default_jobq);
63   __default_jobq.backq = &__global_backq;
64   for(i=0; i<__default_queue_threads; i++)
65     eventer_jobq_increase_concurrency(&__default_jobq);
66   return 0;
67 }
68
69 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
70   eventer_job_t *job;
71   job = calloc(1, sizeof(*job));
72   job->fd_event = e;
73   gettimeofday(&job->create_time, NULL);
74   if(e->whence.tv_sec) {
75     job->timeout_event = eventer_alloc();
76     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
77     job->timeout_event->mask = EVENTER_TIMER;
78     job->timeout_event->closure = job;
79     job->timeout_event->callback = eventer_jobq_execute_timeout;
80     eventer_add(job->timeout_event);
81   }
82   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
83 }
84
85 void eventer_dispatch_recurrent(struct timeval *now) {
86   struct recurrent_events *node;
87   struct timeval __now;
88   if(!now) {
89     gettimeofday(&__now, NULL);
90     now = &__now;
91   }
92   pthread_mutex_lock(&recurrent_lock);
93   for(node = recurrent_events; node; node = node->next) {
94     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
95   }
96   pthread_mutex_unlock(&recurrent_lock);
97 }
98 eventer_t eventer_remove_recurrent(eventer_t e) {
99   struct recurrent_events *node, *prev = NULL;
100   pthread_mutex_lock(&recurrent_lock);
101   for(node = recurrent_events; node; node = node->next) {
102     if(node->e == e) {
103       if(prev) prev->next = node->next;
104       else recurrent_events = node->next;
105       free(node);
106       pthread_mutex_unlock(&recurrent_lock);
107       return e;
108     }
109     prev = node;
110   }
111   pthread_mutex_unlock(&recurrent_lock);
112   return NULL;
113 }
114 void eventer_add_recurrent(eventer_t e) {
115   struct recurrent_events *node;
116   assert(e->mask & EVENTER_RECURRENT);
117   pthread_mutex_lock(&recurrent_lock);
118   for(node = recurrent_events; node; node = node->next)
119     if(node->e == e) {
120       pthread_mutex_unlock(&recurrent_lock);
121       return;
122     }
123   node = calloc(1, sizeof(*node));
124   node->e = e;
125   node->next = recurrent_events;
126   recurrent_events = node;
127   pthread_mutex_unlock(&recurrent_lock);
128 }
129
Note: See TracBrowser for help on using the browser.