root/src/eventer/eventer_impl.c

Revision 3b3b432b41dd3bfb80c144aa7ba28e75daa2337f, 3.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 11 years ago)

asynchronous job queues

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "eventer/eventer_impl.h"
9 #include "eventer/eventer_jobq.h"
10 #include "utils/noit_log.h"
11 #include <pthread.h>
12 #include <assert.h>
13
14 #ifdef HAVE_KQUEUE
15 extern struct _eventer_impl eventer_kqueue_impl;
16 #endif
17
18 eventer_impl_t registered_eventers[] = {
19 #ifdef HAVE_KQUEUE
20   &eventer_kqueue_impl,
21 #endif
22   NULL
23 };
24
25 eventer_impl_t __eventer = NULL;
26
27
28 static int __default_queue_threads = 5;
29 static eventer_jobq_t __global_backq, __default_jobq;
30 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
31 struct recurrent_events {
32   eventer_t e;
33   struct recurrent_events *next;
34 } *recurrent_events = NULL;
35
36
37 int eventer_impl_propset(const char *key, const char *value) {
38   if(!strcasecmp(key, "default_queue_threads")) {
39     __default_queue_threads = atoi(value);
40     if(__default_queue_threads < 1) {
41       noitL(noit_error, "default_queue_threads must be >= 1\n");
42       return -1;
43     }
44     return 0;
45   }
46   return -1;
47 }
48
49 int eventer_impl_init() {
50   int i;
51   eventer_t e;
52   eventer_jobq_init(&__global_backq);
53   e = eventer_alloc();
54   e->mask = EVENTER_RECURRENT;
55   e->closure = &__global_backq;
56   e->callback = eventer_jobq_consume_available;
57
58   /* We call directly here as we may not be completely initialized */
59   eventer_add_recurrent(e);
60
61   eventer_jobq_init(&__default_jobq);
62   __default_jobq.backq = &__global_backq;
63   for(i=0; i<__default_queue_threads; i++)
64     eventer_jobq_increase_concurrency(&__default_jobq);
65   return 0;
66 }
67
68 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
69   eventer_job_t *job;
70   job = calloc(1, sizeof(*job));
71   job->fd_event = e;
72   gettimeofday(&job->create_time, NULL);
73   if(e->whence.tv_sec) {
74     job->timeout_event = eventer_alloc();
75     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
76     job->timeout_event->mask = EVENTER_TIMER;
77     job->timeout_event->closure = job;
78     job->timeout_event->callback = eventer_jobq_execute_timeout;
79     eventer_add(job->timeout_event);
80   }
81   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
82 }
83
84 void eventer_dispatch_recurrent(struct timeval *now) {
85   struct recurrent_events *node;
86   struct timeval __now;
87   if(!now) {
88     gettimeofday(&__now, NULL);
89     now = &__now;
90   }
91   pthread_mutex_lock(&recurrent_lock);
92   for(node = recurrent_events; node; node = node->next) {
93     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
94   }
95   pthread_mutex_unlock(&recurrent_lock);
96 }
97 eventer_t eventer_remove_recurrent(eventer_t e) {
98   struct recurrent_events *node, *prev = NULL;
99   pthread_mutex_lock(&recurrent_lock);
100   for(node = recurrent_events; node; node = node->next) {
101     if(node->e == e) {
102       if(prev) prev->next = node->next;
103       else recurrent_events = node->next;
104       free(node);
105       pthread_mutex_unlock(&recurrent_lock);
106       return e;
107     }
108     prev = node;
109   }
110   pthread_mutex_unlock(&recurrent_lock);
111   return NULL;
112 }
113 void eventer_add_recurrent(eventer_t e) {
114   struct recurrent_events *node;
115   assert(e->mask & EVENTER_RECURRENT);
116   pthread_mutex_lock(&recurrent_lock);
117   for(node = recurrent_events; node; node = node->next)
118     if(node->e == e) {
119       pthread_mutex_unlock(&recurrent_lock);
120       return;
121     }
122   node = calloc(1, sizeof(*node));
123   node->e = e;
124   node->next = recurrent_events;
125   recurrent_events = node;
126   pthread_mutex_unlock(&recurrent_lock);
127 }
128
Note: See TracBrowser for help on using the browser.