root/src/eventer/eventer_jobq.c

Revision a9077178423e39a94a9b624e44cd4b37899d6fd3, 9.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "utils/noit_log.h"
8 #include "utils/noit_atomic.h"
9 #include "eventer/eventer.h"
10 #include <errno.h>
11 #include <setjmp.h>
12 #include <assert.h>
13 #include <signal.h>
14
15 #ifndef JOBQ_SIGNAL
16 #define JOBQ_SIGNAL SIGALRM
17 #endif
18
19 static noit_atomic32_t threads_jobq_inited = 0;
20 static pthread_key_t threads_jobq;
21 static sigset_t alarm_mask;
22 static noit_hash_table all_queues = NOIT_HASH_EMPTY;
23 pthread_mutex_t all_queues_lock;
24
25 static void
26 eventer_jobq_handler(int signo)
27 {
28   eventer_jobq_t *jobq;
29   eventer_job_t *job;
30   sigjmp_buf *env;
31
32   jobq = pthread_getspecific(threads_jobq);
33   assert(jobq);
34   env = pthread_getspecific(jobq->threadenv);
35   job = pthread_getspecific(jobq->activejob);
36   if(env && job)
37     if(noit_atomic_cas32(&job->inflight, 0, 1) == 1)
38        siglongjmp(*env, 1);
39 }
40
41 int
42 eventer_jobq_init(eventer_jobq_t *jobq, const char *queue_name) {
43   pthread_mutexattr_t mutexattr;
44
45   if(noit_atomic_cas32(&threads_jobq_inited, 1, 0) == 0) {
46     struct sigaction act;
47
48     sigemptyset(&alarm_mask);
49     sigaddset(&alarm_mask, JOBQ_SIGNAL);
50     act.sa_handler = eventer_jobq_handler;
51     act.sa_flags = 0;
52     sigemptyset(&act.sa_mask);
53
54     if(sigaction(JOBQ_SIGNAL, &act, NULL) < 0) {
55       noitL(noit_error, "Cannot initialize signal handler: %s\n",
56             strerror(errno));
57       return -1;
58     }
59
60     if(pthread_key_create(&threads_jobq, NULL)) {
61       noitL(noit_error, "Cannot initialize thread-specific jobq: %s\n",
62             strerror(errno));
63       return -1;
64     }
65     if(pthread_mutex_init(&all_queues_lock, NULL)) {
66       noitL(noit_error, "Cannot initialize all_queues mutex: %s\n",
67             strerror(errno));
68       return -1;
69     }
70   }
71
72   memset(jobq, 0, sizeof(*jobq));
73   jobq->queue_name = strdup(queue_name);
74   if(pthread_mutexattr_init(&mutexattr) != 0) {
75     noitL(noit_error, "Cannot initialize lock attributes\n");
76     return -1;
77   }
78   if(pthread_mutex_init(&jobq->lock, &mutexattr) != 0) {
79     noitL(noit_error, "Cannot initialize lock\n");
80     return -1;
81   }
82   if(sem_init(&jobq->semaphore, 0, 0) != 0) {
83     noitL(noit_error, "Cannot initialize semaphore: %s\n",
84           strerror(errno));
85     return -1;
86   }
87   if(pthread_key_create(&jobq->activejob, NULL)) {
88     noitL(noit_error, "Cannot initialize thread-specific activejob: %s\n",
89           strerror(errno));
90     return -1;
91   }
92   if(pthread_key_create(&jobq->threadenv, NULL)) {
93     noitL(noit_error, "Cannot initialize thread-specific sigsetjmp env: %s\n",
94           strerror(errno));
95     return -1;
96   }
97   pthread_mutex_lock(&all_queues_lock);
98   if(noit_hash_store(&all_queues, jobq->queue_name, strlen(jobq->queue_name),
99                      jobq) == 0) {
100     noitL(noit_error, "Duplicate queue name!\n");
101     pthread_mutex_unlock(&all_queues_lock);
102     return -1;
103   }
104   pthread_mutex_unlock(&all_queues_lock);
105   return 0;
106 }
107
108 void
109 eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job) {
110   job->next = NULL;
111
112   pthread_mutex_lock(&jobq->lock);
113   if(jobq->tailq) {
114     /* If there is a tail (queue has items), just push it on the end. */
115     jobq->tailq->next = job;
116     jobq->tailq = job;
117   }
118   else {
119     /* Otherwise, this is the first and only item on the list. */
120     jobq->headq = jobq->tailq = job;
121   }
122   pthread_mutex_unlock(&jobq->lock);
123
124   /* Signal consumers */
125   sem_post(&jobq->semaphore);
126 }
127
128 static eventer_job_t *
129 __eventer_jobq_dequeue(eventer_jobq_t *jobq, int should_wait) {
130   eventer_job_t *job = NULL;
131
132   /* Wait for a job */
133   if(should_wait) while(sem_wait(&jobq->semaphore) && errno == EINTR);
134   /* Or Try-wait for a job */
135   else if(sem_trywait(&jobq->semaphore)) return NULL;
136
137   pthread_mutex_lock(&jobq->lock);
138   if(jobq->headq) {
139     /* If there are items, pop and advance the header pointer */
140     job = jobq->headq;
141     jobq->headq = jobq->headq->next;
142     if(!jobq->headq) jobq->tailq = NULL;
143   }
144   pthread_mutex_unlock(&jobq->lock);
145
146   if(job) job->next = NULL; /* To reduce any confusion */
147   return job;
148 }
149
150 eventer_job_t *
151 eventer_jobq_dequeue(eventer_jobq_t *jobq) {
152   return __eventer_jobq_dequeue(jobq, 1);
153 }
154
155 eventer_job_t *
156 eventer_jobq_dequeue_nowait(eventer_jobq_t *jobq) {
157   return __eventer_jobq_dequeue(jobq, 0);
158 }
159
160 void
161 eventer_jobq_destroy(eventer_jobq_t *jobq) {
162   pthread_mutex_destroy(&jobq->lock);
163   sem_destroy(&jobq->semaphore);
164 }
165 int
166 eventer_jobq_execute_timeout(eventer_t e, int mask, void *closure,
167                              struct timeval *now) {
168   eventer_job_t *job = closure;
169   job->timeout_triggered = 1;
170   job->timeout_event = NULL;
171   noitL(eventer_deb, "%p jobq -> timeout job [%p]\n", pthread_self(), job);
172   if(job->inflight) pthread_kill(job->executor, JOBQ_SIGNAL);
173   return 0;
174 }
175 int
176 eventer_jobq_consume_available(eventer_t e, int mask, void *closure,
177                                struct timeval *now) {
178   eventer_jobq_t *jobq = closure;
179   eventer_job_t *job;
180   /* This can only be called with a backq jobq
181    * (a standalone queue with no backq itself)
182    */
183   assert(jobq && !jobq->backq);
184   while((job = eventer_jobq_dequeue_nowait(jobq)) != NULL) {
185     int newmask;
186     newmask = job->fd_event->callback(job->fd_event, job->fd_event->mask,
187                                       job->fd_event->closure, now);
188     if(!newmask) eventer_free(job->fd_event);
189     else eventer_add(job->fd_event);
190     job->fd_event = NULL;
191     assert(job->timeout_event == NULL);
192     free(job);
193   }
194   return EVENTER_RECURRENT;
195 }
196 static void *
197 eventer_jobq_consumer_pthreadentry(void *vp) {
198   return eventer_jobq_consumer((eventer_jobq_t *)vp);
199 }
200 void *
201 eventer_jobq_consumer(eventer_jobq_t *jobq) {
202   eventer_job_t *job;
203   sigjmp_buf env;
204
205   assert(jobq->backq);
206   noit_atomic_inc32(&jobq->concurrency);
207   /* Each thread can consume from only one queue */
208   pthread_setspecific(threads_jobq, jobq);
209   pthread_setspecific(jobq->threadenv, &env);
210
211   while(1) {
212     pthread_setspecific(jobq->activejob, NULL);
213     job = eventer_jobq_dequeue(jobq);
214     if(!job) continue;
215     if(!job->fd_event) {
216       free(job);
217       break;
218     }
219     pthread_setspecific(jobq->activejob, job);
220     noitL(eventer_deb, "%p jobq[%s] -> running job [%p]\n", pthread_self(), jobq->queue_name, job);
221
222     /* Mark our commencement */
223     gettimeofday(&job->start_time, NULL);
224
225     /* Safely check and handle if we've timed out while in queue */
226     pthread_mutex_lock(&job->lock);
227     if(job->timeout_triggered) {
228       struct timeval diff, diff2;
229       /* This happens if the timeout occurred before we even had the change
230        * to pull the job off the queue.  We must be in bad shape here.
231        */
232       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p]\n", pthread_self(), jobq->queue_name, job);
233       gettimeofday(&job->finish_time, NULL); /* We're done */
234       sub_timeval(job->finish_time, job->fd_event->whence, &diff);
235       sub_timeval(job->finish_time, job->create_time, &diff2);
236       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p] -%0.6f (%0.6f)\n",
237             pthread_self(), jobq->queue_name, job,
238             (float)diff.tv_sec + (float)diff.tv_usec/1000000.0,
239             (float)diff2.tv_sec + (float)diff2.tv_usec/1000000.0);
240       pthread_mutex_unlock(&job->lock);
241       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
242                               job->fd_event->closure, &job->finish_time);
243       eventer_jobq_enqueue(jobq->backq, job);
244       continue;
245     }
246     pthread_mutex_unlock(&job->lock);
247    
248     /* Run the job, if we timeout, will be killed with a JOBQ_SIGNAL from
249      * the master thread.  We handle the alarm by longjmp'd out back here.
250      */
251     job->executor = pthread_self();
252     if(sigsetjmp(env, 1) == 0) {
253       /* We could get hit right here... (timeout and terminated from
254        * another thread.  inflight isn't yet set (next line), so it
255        * won't longjmp.  But timeout_triggered will be set... so we
256        * should recheck that after we mark ourselves inflight.
257        */
258       if(noit_atomic_cas32(&job->inflight, 1, 0) == 0) {
259         if(!job->timeout_triggered) {
260           noitL(eventer_deb, "%p jobq[%s] -> executing [%p]\n", pthread_self(), jobq->queue_name, job);
261           job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_WORK,
262                                   job->fd_event->closure, &job->start_time);
263         }
264       }
265     }
266
267     job->inflight = 0;
268     noitL(eventer_deb, "%p jobq[%s] -> finished [%p]\n", pthread_self(), jobq->queue_name, job);
269     /* No we know we won't have siglongjmp called on us */
270
271     gettimeofday(&job->finish_time, NULL);
272     if(job->timeout_event &&
273        eventer_remove(job->timeout_event)) {
274       eventer_free(job->timeout_event);
275     }
276     job->timeout_event = NULL;
277
278     if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
279       /* We need to cleanup... we haven't done it yet. */
280       noitL(eventer_deb, "%p jobq[%s] -> cleanup [%p]\n", pthread_self(), jobq->queue_name, job);
281       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
282                               job->fd_event->closure, &job->finish_time);
283     }
284     eventer_jobq_enqueue(jobq->backq, job);
285   }
286   noit_atomic_dec32(&jobq->concurrency);
287   pthread_exit(NULL);
288   return NULL;
289 }
290
291 void eventer_jobq_increase_concurrency(eventer_jobq_t *jobq) {
292   pthread_t tid;
293   pthread_create(&tid, NULL, eventer_jobq_consumer_pthreadentry, jobq);
294 }
295 void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq) {
296   eventer_job_t *job;
297   job = calloc(1, sizeof(*job));
298   eventer_jobq_enqueue(jobq, job);
299 }
300 void eventer_jobq_process_each(void (*func)(eventer_jobq_t *, void *),
301                                void *closure) {
302   const char *key;
303   int klen;
304   void *vjobq;
305   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
306
307   pthread_mutex_lock(&all_queues_lock);
308   while(noit_hash_next(&all_queues, &iter, &key, &klen, &vjobq)) {
309     func((eventer_jobq_t *)vjobq, closure);
310   }
311   pthread_mutex_unlock(&all_queues_lock);
312 }
Note: See TracBrowser for help on using the browser.