root/src/eventer/eventer_impl.c

Revision b5f532b42a354e0463bc43a0aac27f591b9090fb, 3.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

provide an API to get the global backq (default backq) so that others can create their own job queues that get completed by the default backq

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include <pthread.h>
10 #include <assert.h>
11
12 #ifdef HAVE_KQUEUE
13 extern struct _eventer_impl eventer_kqueue_impl;
14 #endif
15
16 eventer_impl_t registered_eventers[] = {
17 #ifdef HAVE_KQUEUE
18   &eventer_kqueue_impl,
19 #endif
20   NULL
21 };
22
23 eventer_impl_t __eventer = NULL;
24
25
26 static int __default_queue_threads = 5;
27 static eventer_jobq_t __global_backq, __default_jobq;
28 static pthread_mutex_t recurrent_lock = PTHREAD_MUTEX_INITIALIZER;
29 struct recurrent_events {
30   eventer_t e;
31   struct recurrent_events *next;
32 } *recurrent_events = NULL;
33
34
35 int eventer_impl_propset(const char *key, const char *value) {
36   if(!strcasecmp(key, "default_queue_threads")) {
37     __default_queue_threads = atoi(value);
38     if(__default_queue_threads < 1) {
39       noitL(noit_error, "default_queue_threads must be >= 1\n");
40       return -1;
41     }
42     return 0;
43   }
44   return -1;
45 }
46
47 eventer_jobq_t *eventer_default_backq() {
48   return &__global_backq;
49 }
50
51 int eventer_impl_init() {
52   int i;
53   eventer_t e;
54   eventer_ssl_init();
55   eventer_jobq_init(&__global_backq);
56   e = eventer_alloc();
57   e->mask = EVENTER_RECURRENT;
58   e->closure = &__global_backq;
59   e->callback = eventer_jobq_consume_available;
60
61   /* We call directly here as we may not be completely initialized */
62   eventer_add_recurrent(e);
63
64   eventer_jobq_init(&__default_jobq);
65   __default_jobq.backq = &__global_backq;
66   for(i=0; i<__default_queue_threads; i++)
67     eventer_jobq_increase_concurrency(&__default_jobq);
68   return 0;
69 }
70
71 void eventer_add_asynch(eventer_jobq_t *q, eventer_t e) {
72   eventer_job_t *job;
73   job = calloc(1, sizeof(*job));
74   job->fd_event = e;
75   gettimeofday(&job->create_time, NULL);
76   if(e->whence.tv_sec) {
77     job->timeout_event = eventer_alloc();
78     memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence));
79     job->timeout_event->mask = EVENTER_TIMER;
80     job->timeout_event->closure = job;
81     job->timeout_event->callback = eventer_jobq_execute_timeout;
82     eventer_add(job->timeout_event);
83   }
84   eventer_jobq_enqueue(q ? q : &__default_jobq, job);
85 }
86
87 void eventer_dispatch_recurrent(struct timeval *now) {
88   struct recurrent_events *node;
89   struct timeval __now;
90   if(!now) {
91     gettimeofday(&__now, NULL);
92     now = &__now;
93   }
94   pthread_mutex_lock(&recurrent_lock);
95   for(node = recurrent_events; node; node = node->next) {
96     node->e->callback(node->e, EVENTER_RECURRENT, node->e->closure, now);
97   }
98   pthread_mutex_unlock(&recurrent_lock);
99 }
100 eventer_t eventer_remove_recurrent(eventer_t e) {
101   struct recurrent_events *node, *prev = NULL;
102   pthread_mutex_lock(&recurrent_lock);
103   for(node = recurrent_events; node; node = node->next) {
104     if(node->e == e) {
105       if(prev) prev->next = node->next;
106       else recurrent_events = node->next;
107       free(node);
108       pthread_mutex_unlock(&recurrent_lock);
109       return e;
110     }
111     prev = node;
112   }
113   pthread_mutex_unlock(&recurrent_lock);
114   return NULL;
115 }
116 void eventer_add_recurrent(eventer_t e) {
117   struct recurrent_events *node;
118   assert(e->mask & EVENTER_RECURRENT);
119   pthread_mutex_lock(&recurrent_lock);
120   for(node = recurrent_events; node; node = node->next)
121     if(node->e == e) {
122       pthread_mutex_unlock(&recurrent_lock);
123       return;
124     }
125   node = calloc(1, sizeof(*node));
126   node->e = e;
127   node->next = recurrent_events;
128   recurrent_events = node;
129   pthread_mutex_unlock(&recurrent_lock);
130 }
131
Note: See TracBrowser for help on using the browser.