root/src/eventer/eventer_jobq.c

Revision 171d8ed2238b758bd302a0bb23cd0aeeabc50b33, 7.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 years ago)

make eventer.h one-stop-shopping

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "utils/noit_log.h"
8 #include "utils/noit_atomic.h"
9 #include "eventer/eventer.h"
10 #include <errno.h>
11 #include <setjmp.h>
12 #include <assert.h>
13 #include <signal.h>
14
15 static noit_atomic32_t threads_jobq_inited = 0;
16 static pthread_key_t threads_jobq;
17 static sigset_t alarm_mask;
18
19 static void
20 eventer_jobq_handler(int signo)
21 {
22   eventer_jobq_t *jobq;
23   eventer_job_t *job;
24   sigjmp_buf *env;
25
26   jobq = pthread_getspecific(threads_jobq);
27   assert(jobq);
28   env = pthread_getspecific(jobq->threadenv);
29   job = pthread_getspecific(jobq->activejob);
30   if(env && job && job->inflight) siglongjmp(*env, 1);
31 }
32
33 int
34 eventer_jobq_init(eventer_jobq_t *jobq) {
35   pthread_mutexattr_t mutexattr;
36
37   if(noit_atomic_cas32(&threads_jobq_inited, 1, 0) == 0) {
38     struct sigaction act;
39
40     sigemptyset(&alarm_mask);
41     sigaddset(&alarm_mask, SIGALRM);
42     act.sa_handler = eventer_jobq_handler;
43     act.sa_flags = 0;
44     sigemptyset(&act.sa_mask);
45
46     if(sigaction(SIGALRM, &act, NULL) < 0) {
47       noitL(noit_error, "Cannot initialize signal handler: %s\n",
48             strerror(errno));
49       return -1;
50     }
51
52     if(pthread_key_create(&threads_jobq, NULL)) {
53       noitL(noit_error, "Cannot initialize thread-specific jmp environment: %s\n",
54             strerror(errno));
55       return -1;
56     }
57   }
58
59   memset(jobq, 0, sizeof(*jobq));
60   if(pthread_mutexattr_init(&mutexattr) != 0) {
61     noitL(noit_error, "Cannot initialize lock attributes\n");
62     return -1;
63   }
64   if(pthread_mutex_init(&jobq->lock, &mutexattr) != 0) {
65     noitL(noit_error, "Cannot initialize lock\n");
66     return -1;
67   }
68   if(sem_init(&jobq->semaphore, 0, 0) != 0) {
69     noitL(noit_error, "Cannot initialize semaphore: %s\n",
70           strerror(errno));
71     return -1;
72   }
73   return 0;
74 }
75
76 void
77 eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job) {
78   job->next = NULL;
79
80   pthread_mutex_lock(&jobq->lock);
81   if(jobq->tailq) {
82     /* If there is a tail (queue has items), just push it on the end. */
83     jobq->tailq->next = job;
84     jobq->tailq = job;
85   }
86   else {
87     /* Otherwise, this is the first and only item on the list. */
88     jobq->headq = jobq->tailq = job;
89   }
90   pthread_mutex_unlock(&jobq->lock);
91
92   /* Signal consumers */
93   sem_post(&jobq->semaphore);
94 }
95
96 static eventer_job_t *
97 __eventer_jobq_dequeue(eventer_jobq_t *jobq, int should_wait) {
98   eventer_job_t *job = NULL;
99
100   /* Wait for a job */
101   if(should_wait) while(sem_wait(&jobq->semaphore) && errno == EINTR);
102   /* Or Try-wait for a job */
103   else if(sem_trywait(&jobq->semaphore)) return NULL;
104
105   pthread_mutex_lock(&jobq->lock);
106   if(jobq->headq) {
107     /* If there are items, pop and advance the header pointer */
108     job = jobq->headq;
109     jobq->headq = jobq->headq->next;
110     if(!jobq->headq) jobq->tailq = NULL;
111   }
112   pthread_mutex_unlock(&jobq->lock);
113
114   job->next = NULL; /* To reduce any confusion */
115   return job;
116 }
117
118 eventer_job_t *
119 eventer_jobq_dequeue(eventer_jobq_t *jobq) {
120   return __eventer_jobq_dequeue(jobq, 1);
121 }
122
123 eventer_job_t *
124 eventer_jobq_dequeue_nowait(eventer_jobq_t *jobq) {
125   return __eventer_jobq_dequeue(jobq, 0);
126 }
127
128 void
129 eventer_jobq_destroy(eventer_jobq_t *jobq) {
130   pthread_mutex_destroy(&jobq->lock);
131   sem_destroy(&jobq->semaphore);
132 }
133 int
134 eventer_jobq_execute_timeout(eventer_t e, int mask, void *closure,
135                              struct timeval *now) {
136   eventer_job_t *job = closure;
137   job->timeout_triggered = 1;
138   job->timeout_event = NULL;
139   if(job->inflight) pthread_kill(job->executor, SIGALRM);
140   return 0;
141 }
142 int
143 eventer_jobq_consume_available(eventer_t e, int mask, void *closure,
144                                struct timeval *now) {
145   eventer_jobq_t *jobq = closure;
146   eventer_job_t *job;
147   /* This can only be called with a backq jobq
148    * (a standalone queue with no backq itself)
149    */
150   assert(jobq && !jobq->backq);
151   while((job = eventer_jobq_dequeue_nowait(jobq)) != NULL) {
152     int newmask;
153     newmask = job->fd_event->callback(job->fd_event, job->fd_event->mask,
154                                       job->fd_event->closure, now);
155     if(!newmask) eventer_free(job->fd_event);
156     else eventer_add(job->fd_event);
157     job->fd_event = NULL;
158     free(job);
159   }
160   return EVENTER_RECURRENT;
161 }
162 static void *
163 eventer_jobq_consumer_pthreadentry(void *vp) {
164   return eventer_jobq_consumer((eventer_jobq_t *)vp);
165 }
166 void *
167 eventer_jobq_consumer(eventer_jobq_t *jobq) {
168   eventer_job_t *job;
169   sigjmp_buf env;
170
171   assert(jobq->backq);
172   noit_atomic_inc32(&jobq->concurrency);
173   /* Each thread can consume from only one queue */
174   pthread_setspecific(threads_jobq, jobq);
175   pthread_setspecific(jobq->threadenv, &env);
176
177   while(1) {
178     pthread_setspecific(jobq->activejob, NULL);
179     job = eventer_jobq_dequeue(jobq);
180     if(!job) continue;
181     if(!job->fd_event) {
182       free(job);
183       break;
184     }
185     pthread_setspecific(jobq->activejob, job);
186
187     /* Mark our commencement */
188     gettimeofday(&job->start_time, NULL);
189
190     /* Safely check and handle if we've timed out while in queue */
191     pthread_mutex_lock(&job->lock);
192     if(job->timeout_triggered) {
193       /* This happens if the timeout occurred before we even had the change
194        * to pull the job off the queue.  We must be in bad shape here.
195        */
196       gettimeofday(&job->finish_time, NULL); /* We're done */
197       pthread_mutex_unlock(&job->lock);
198       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
199                               job->fd_event->closure, &job->finish_time);
200       eventer_jobq_enqueue(jobq->backq, job);
201     }
202     pthread_mutex_unlock(&job->lock);
203
204     /* Run the job, if we timeout, will be killed with an ALRM from the
205      * master thread.  We handle the alarm by longjmp'd out back here.
206      */
207     job->executor = pthread_self();
208     if(sigsetjmp(env, 1) == 0) {
209       /* We could get hit right here... (timeout and terminated from
210        * another thread.  inflight isn't yet set (next line), so it
211        * won't longjmp.  But timeout_triggered will be set... so we
212        * should recheck that after we mark ourselves inflight.
213        */
214       if(noit_atomic_cas32(&job->inflight, 1, 0) == 0) {
215         if(!job->timeout_triggered) {
216           job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_WORK,
217                                   job->fd_event->closure, &job->start_time);
218         }
219       }
220     }
221     if(noit_atomic_cas32(&job->inflight, 0, 1) != 1) {
222       /* We were alredy terminated?!  Wicked race.  That's fine, just means
223        * that we longjmp'd here.
224        */
225       gettimeofday(&job->finish_time, NULL);
226       if(eventer_remove(job->timeout_event)) {
227         eventer_free(job->timeout_event);
228         job->timeout_event = NULL;
229       }
230     }
231     if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
232       /* We need to cleanup... we haven't done it yet. */
233       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
234                               job->fd_event->closure, &job->finish_time);
235     }
236     eventer_jobq_enqueue(jobq->backq, job);
237   }
238   noit_atomic_dec32(&jobq->concurrency);
239   pthread_exit(NULL);
240   return NULL;
241 }
242
243 void eventer_jobq_increase_concurrency(eventer_jobq_t *jobq) {
244   pthread_t tid;
245   pthread_create(&tid, NULL, eventer_jobq_consumer_pthreadentry, jobq);
246 }
247 void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq) {
248   eventer_job_t *job;
249   job = calloc(1, sizeof(*job));
250   eventer_jobq_enqueue(jobq, job);
251 }
252
Note: See TracBrowser for help on using the browser.