root/src/eventer/eventer_impl.c

Revision 4b96846179a35015ac0b22d5fe9e9f92480f06a5, 3.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 11 years ago)

check code consolidation. allow modules to be more terse and add convenience functions to make writing completely asynch checkers much easier. (add a postgres proof-of-concept asynch checker

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include <pthread.h>
10 #include <assert.h>
11
12 #ifdef HAVE_KQUEUE
13 extern struct _eventer_impl eventer_kqueue_impl;
14 #endif
15
16 eventer_impl_t registered_eventers[] = {
17 #ifdef HAVE_KQUEUE
18   &eventer_kqueue_impl,
19 #endif
20   NULL
21 };
22
23 eventer_impl_t __eventer = NULL;
24 noit_log_stream_t eventer_err = NULL;
25 noit_log_stream_t eventer_deb = NULL;
26
27 static int __default_queue_threads = 5;
28 static eventer_jobq_t __global_backq, __default_jobq;
29 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
30 struct recurrent_events {
31   eventer_t e;
32   struct recurrent_events *next;
33 } *recurrent_events = NULL;
34
35
36 int eventer_impl_propset(const char *key, const char *value) {
37   if(!strcasecmp(key, "default_queue_threads")) {
38     __default_queue_threads = atoi(value);
39     if(__default_queue_threads < 1) {
40       noitL(noit_error, "default_queue_threads must be >= 1\n");
41       return -1;
42     }
43     return 0;
44   }
45   return -1;
46 }
47
48 eventer_jobq_t *eventer_default_backq() {
49   return &__global_backq;
50 }
51
52 int eventer_impl_init() {
53   int i;
54   eventer_t e;
55
56   eventer_err = noit_log_stream_find("error/eventer");
57   eventer_deb = noit_log_stream_find("debug/eventer");
58   if(!eventer_err) eventer_err = noit_stderr;
59   if(!eventer_deb) eventer_deb = noit_debug;
60
61   eventer_ssl_init();
62   eventer_jobq_init(&__global_backq);
63   e = eventer_alloc();
64   e->mask = EVENTER_RECURRENT;
65   e->closure = &__global_backq;
66   e->callback = eventer_jobq_consume_available;
67
68   /* We call directly here as we may not be completely initialized */
69   eventer_add_recurrent(e);
70
71   eventer_jobq_init(&__default_jobq);
72   __default_jobq.backq = &__global_backq;
73   for(i=0; i<__default_queue_threads; i++)
74     eventer_jobq_increase_concurrency(&__default_jobq);
75   return 0;
76 }
77
78 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
79   eventer_job_t *job;
80   job = calloc(1, sizeof(*job));
81   job->fd_event = e;
82   gettimeofday(&job->create_time, NULL);
83   if(e->whence.tv_sec) {
84     job->timeout_event = eventer_alloc();
85     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
86     job->timeout_event->mask = EVENTER_TIMER;
87     job->timeout_event->closure = job;
88     job->timeout_event->callback = eventer_jobq_execute_timeout;
89     eventer_add(job->timeout_event);
90   }
91   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
92 }
93
94 void eventer_dispatch_recurrent(struct timeval *now) {
95   struct recurrent_events *node;
96   struct timeval __now;
97   if(!now) {
98     gettimeofday(&__now, NULL);
99     now = &__now;
100   }
101   pthread_mutex_lock(&recurrent_lock);
102   for(node = recurrent_events; node; node = node->next) {
103     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
104   }
105   pthread_mutex_unlock(&recurrent_lock);
106 }
107 eventer_t eventer_remove_recurrent(eventer_t e) {
108   struct recurrent_events *node, *prev = NULL;
109   pthread_mutex_lock(&recurrent_lock);
110   for(node = recurrent_events; node; node = node->next) {
111     if(node->e == e) {
112       if(prev) prev->next = node->next;
113       else recurrent_events = node->next;
114       free(node);
115       pthread_mutex_unlock(&recurrent_lock);
116       return e;
117     }
118     prev = node;
119   }
120   pthread_mutex_unlock(&recurrent_lock);
121   return NULL;
122 }
123 void eventer_add_recurrent(eventer_t e) {
124   struct recurrent_events *node;
125   assert(e->mask & EVENTER_RECURRENT);
126   pthread_mutex_lock(&recurrent_lock);
127   for(node = recurrent_events; node; node = node->next)
128     if(node->e == e) {
129       pthread_mutex_unlock(&recurrent_lock);
130       return;
131     }
132   node = calloc(1, sizeof(*node));
133   node->e = e;
134   node->next = recurrent_events;
135   recurrent_events = node;
136   pthread_mutex_unlock(&recurrent_lock);
137 }
138
Note: See TracBrowser for help on using the browser.