root/src/eventer/eventer_jobq.c

Revision f870be02daa5f1ef34211f67b8489564f94b0a88, 17.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

fix types

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "utils/noit_log.h"
35 #include "utils/noit_atomic.h"
36 #include "eventer/eventer.h"
37 #include "eventer/dtrace_probes.h"
38 #include <errno.h>
39 #include <setjmp.h>
40 #include <assert.h>
41 #include <signal.h>
42
43 #ifndef JOBQ_SIGNAL
44 #define JOBQ_SIGNAL SIGALRM
45 #endif
46
47 #define pthread_self_ptr() ((void *)pthread_self())
48
49 static noit_atomic32_t threads_jobq_inited = 0;
50 static pthread_key_t threads_jobq;
51 static sigset_t alarm_mask;
52 static noit_hash_table all_queues = NOIT_HASH_EMPTY;
53 pthread_mutex_t all_queues_lock;
54
55 static void
56 eventer_jobq_handler(int signo)
57 {
58   eventer_jobq_t *jobq;
59   eventer_job_t *job;
60   sigjmp_buf *env;
61
62   jobq = pthread_getspecific(threads_jobq);
63   assert(jobq);
64   env = pthread_getspecific(jobq->threadenv);
65   job = pthread_getspecific(jobq->activejob);
66   if(env && job && job->fd_event->mask & EVENTER_EVIL_BRUTAL)
67     if(noit_atomic_cas32(&job->inflight, 0, 1) == 1)
68        siglongjmp(*env, 1);
69 }
70
71 int
72 eventer_jobq_init(eventer_jobq_t *jobq, const char *queue_name) {
73   pthread_mutexattr_t mutexattr;
74
75   if(noit_atomic_cas32(&threads_jobq_inited, 1, 0) == 0) {
76     struct sigaction act;
77
78     sigemptyset(&alarm_mask);
79     sigaddset(&alarm_mask, JOBQ_SIGNAL);
80     act.sa_handler = eventer_jobq_handler;
81     act.sa_flags = 0;
82     sigemptyset(&act.sa_mask);
83
84     if(sigaction(JOBQ_SIGNAL, &act, NULL) < 0) {
85       noitL(noit_error, "Cannot initialize signal handler: %s\n",
86             strerror(errno));
87       return -1;
88     }
89
90     if(pthread_key_create(&threads_jobq, NULL)) {
91       noitL(noit_error, "Cannot initialize thread-specific jobq: %s\n",
92             strerror(errno));
93       return -1;
94     }
95     if(pthread_mutex_init(&all_queues_lock, NULL)) {
96       noitL(noit_error, "Cannot initialize all_queues mutex: %s\n",
97             strerror(errno));
98       return -1;
99     }
100   }
101
102   memset(jobq, 0, sizeof(*jobq));
103   jobq->queue_name = strdup(queue_name);
104   if(pthread_mutexattr_init(&mutexattr) != 0) {
105     noitL(noit_error, "Cannot initialize lock attributes\n");
106     return -1;
107   }
108   if(pthread_mutex_init(&jobq->lock, &mutexattr) != 0) {
109     noitL(noit_error, "Cannot initialize lock\n");
110     return -1;
111   }
112   if(sem_init(&jobq->semaphore, 0, 0) != 0) {
113     noitL(noit_error, "Cannot initialize semaphore: %s\n",
114           strerror(errno));
115     return -1;
116   }
117   if(pthread_key_create(&jobq->activejob, NULL)) {
118     noitL(noit_error, "Cannot initialize thread-specific activejob: %s\n",
119           strerror(errno));
120     return -1;
121   }
122   if(pthread_key_create(&jobq->threadenv, NULL)) {
123     noitL(noit_error, "Cannot initialize thread-specific sigsetjmp env: %s\n",
124           strerror(errno));
125     return -1;
126   }
127   pthread_mutex_lock(&all_queues_lock);
128   if(noit_hash_store(&all_queues, jobq->queue_name, strlen(jobq->queue_name),
129                      jobq) == 0) {
130     noitL(noit_error, "Duplicate queue name!\n");
131     pthread_mutex_unlock(&all_queues_lock);
132     return -1;
133   }
134   pthread_mutex_unlock(&all_queues_lock);
135   return 0;
136 }
137
138 eventer_jobq_t *
139 eventer_jobq_retrieve(const char *name) {
140   void *vjq = NULL;
141   pthread_mutex_lock(&all_queues_lock);
142   noit_hash_retrieve(&all_queues, name, strlen(name), &vjq);
143   pthread_mutex_unlock(&all_queues_lock);
144   return vjq;
145 }
146
147 static void *
148 eventer_jobq_consumer_pthreadentry(void *vp) {
149   return eventer_jobq_consumer((eventer_jobq_t *)vp);
150 }
151 static void
152 eventer_jobq_maybe_spawn(eventer_jobq_t *jobq) {
153   int32_t current = jobq->concurrency;
154   /* if we've no desired concurrency, this doesn't apply to us */
155   if(jobq->desired_concurrency == 0) return;
156   /* See if we need to launch one */
157   if(jobq->desired_concurrency > current) {
158     /* we need another thread, maybe... this is a race as we do the
159      * increment in the new thread, but we check there and back it out
160      * if we did something we weren't supposed to. */
161     pthread_t tid;
162     pthread_attr_t tattr;
163     noitL(eventer_deb, "Starting queue[%s] thread now at %d\n",
164           jobq->queue_name, jobq->concurrency);
165     pthread_attr_init(&tattr);
166     pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
167     pthread_create(&tid, &tattr, eventer_jobq_consumer_pthreadentry, jobq);
168   }
169   noitL(eventer_deb, "jobq_queue[%s] pending cancels [%d/%d]\n",
170         jobq->queue_name, jobq->pending_cancels,
171         jobq->desired_concurrency);
172   if(jobq->pending_cancels == jobq->desired_concurrency) {
173     /* we're absolutely screwed at this point... it's time to just die */
174     noitL(noit_error, "jobq_queue[%s] induced [%d/%d] game over.\n",
175           jobq->queue_name, jobq->pending_cancels,
176           jobq->desired_concurrency);
177     assert(jobq->pending_cancels != jobq->desired_concurrency);
178   }
179 }
180 void
181 eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job) {
182   job->next = NULL;
183   eventer_jobq_maybe_spawn(jobq);
184   pthread_mutex_lock(&jobq->lock);
185   if(jobq->tailq) {
186     /* If there is a tail (queue has items), just push it on the end. */
187     jobq->tailq->next = job;
188     jobq->tailq = job;
189   }
190   else {
191     /* Otherwise, this is the first and only item on the list. */
192     jobq->headq = jobq->tailq = job;
193   }
194   pthread_mutex_unlock(&jobq->lock);
195
196   /* Signal consumers */
197   sem_post(&jobq->semaphore);
198 }
199
200 static eventer_job_t *
201 __eventer_jobq_dequeue(eventer_jobq_t *jobq, int should_wait) {
202   eventer_job_t *job = NULL;
203
204   /* Wait for a job */
205   if(should_wait) while(sem_wait(&jobq->semaphore) && errno == EINTR);
206   /* Or Try-wait for a job */
207   else if(sem_trywait(&jobq->semaphore)) return NULL;
208
209   pthread_mutex_lock(&jobq->lock);
210   if(jobq->headq) {
211     /* If there are items, pop and advance the header pointer */
212     job = jobq->headq;
213     jobq->headq = jobq->headq->next;
214     if(!jobq->headq) jobq->tailq = NULL;
215   }
216   pthread_mutex_unlock(&jobq->lock);
217
218   if(job) job->next = NULL; /* To reduce any confusion */
219   return job;
220 }
221
222 eventer_job_t *
223 eventer_jobq_dequeue(eventer_jobq_t *jobq) {
224   return __eventer_jobq_dequeue(jobq, 1);
225 }
226
227 eventer_job_t *
228 eventer_jobq_dequeue_nowait(eventer_jobq_t *jobq) {
229   return __eventer_jobq_dequeue(jobq, 0);
230 }
231
232 void
233 eventer_jobq_destroy(eventer_jobq_t *jobq) {
234   pthread_mutex_destroy(&jobq->lock);
235   sem_destroy(&jobq->semaphore);
236 }
237 int
238 eventer_jobq_execute_timeout(eventer_t e, int mask, void *closure,
239                              struct timeval *now) {
240   eventer_job_t *job = closure;
241   job->timeout_triggered = 1;
242   job->timeout_event = NULL;
243   noitL(eventer_deb, "%p jobq -> timeout job [%p]\n", pthread_self_ptr(), job);
244   if(job->inflight) {
245     eventer_job_t *jobcopy;
246     if(job->fd_event->mask & (EVENTER_CANCEL)) {
247       eventer_t my_precious = job->fd_event;
248       /* we set this to null so we can't complete on it */
249       job->fd_event = NULL;
250       noitL(eventer_deb, "[inline] timeout cancelling job\n");
251       noit_atomic_inc32(&job->jobq->pending_cancels);
252       pthread_cancel(job->executor);
253       /* complete on it ourselves */
254       if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
255         /* We need to cleanup... we haven't done it yet. */
256         noitL(eventer_deb, "[inline] %p jobq[%s] -> cleanup [%p]\n",
257               pthread_self_ptr(), job->jobq->queue_name, job);
258         /* This is the real question... asynch cleanup is supposed to
259          * be called asynch -- we're going to call it synchronously
260          * I think this is a bad idea, but not cleaning up seems worse.
261          * Because we're synchronous, if we hang, we'll be watchdogged.
262          *
263          * Uncooperative plugins/third-party libs can truly suck
264          * one's soul out.
265          */
266         if(my_precious) {
267           EVENTER_CALLBACK_ENTRY((void *)my_precious->callback, NULL,
268                                  my_precious->fd, my_precious->mask,
269                                  EVENTER_ASYNCH_CLEANUP);
270           my_precious->callback(my_precious, EVENTER_ASYNCH_CLEANUP,
271                                 my_precious->closure, &job->finish_time);
272           EVENTER_CALLBACK_RETURN((void *)my_precious->callback, NULL, -1);
273         }
274       }
275       jobcopy = malloc(sizeof(*jobcopy));
276       memcpy(jobcopy, job, sizeof(*jobcopy));
277       free(job);
278       jobcopy->fd_event = my_precious;
279       eventer_jobq_maybe_spawn(jobcopy->jobq);
280       eventer_jobq_enqueue(jobcopy->jobq->backq, jobcopy);
281     }
282     else
283       pthread_kill(job->executor, JOBQ_SIGNAL);
284   }
285   return 0;
286 }
287 int
288 eventer_jobq_consume_available(eventer_t e, int mask, void *closure,
289                                struct timeval *now) {
290   eventer_jobq_t *jobq = closure;
291   eventer_job_t *job;
292   /* This can only be called with a backq jobq
293    * (a standalone queue with no backq itself)
294    */
295   assert(jobq && !jobq->backq);
296   while((job = eventer_jobq_dequeue_nowait(jobq)) != NULL) {
297     int newmask;
298     EVENTER_CALLBACK_ENTRY((void *)job->fd_event->callback, NULL,
299                            job->fd_event->fd, job->fd_event->mask,
300                            job->fd_event->mask);
301     newmask = job->fd_event->callback(job->fd_event, job->fd_event->mask,
302                                       job->fd_event->closure, now);
303     EVENTER_CALLBACK_RETURN((void *)job->fd_event->callback, NULL, newmask);
304     if(!newmask) eventer_free(job->fd_event);
305     else {
306       job->fd_event->mask = newmask;
307       eventer_add(job->fd_event);
308     }
309     job->fd_event = NULL;
310     assert(job->timeout_event == NULL);
311     free(job);
312   }
313   return EVENTER_RECURRENT;
314 }
315 static void
316 eventer_jobq_cancel_cleanup(void *vp) {
317   eventer_jobq_t *jobq = vp;
318   noit_atomic_dec32(&jobq->pending_cancels);
319   noit_atomic_dec32(&jobq->concurrency);
320 }
321 void *
322 eventer_jobq_consumer(eventer_jobq_t *jobq) {
323   eventer_job_t *job;
324   int32_t current_count;
325   sigjmp_buf env;
326
327   assert(jobq->backq);
328   current_count = noit_atomic_inc32(&jobq->concurrency);
329   noitL(eventer_deb, "jobq[%s] -> %d\n", jobq->queue_name, current_count);
330   if(current_count > jobq->desired_concurrency) {
331     noitL(eventer_deb, "jobq[%s] over provisioned, backing out.",
332           jobq->queue_name);
333     noit_atomic_dec32(&jobq->concurrency);
334     pthread_exit(NULL);
335     return NULL;
336   }
337   /* Each thread can consume from only one queue */
338   pthread_setspecific(threads_jobq, jobq);
339   pthread_setspecific(jobq->threadenv, &env);
340   pthread_cleanup_push(eventer_jobq_cancel_cleanup, jobq);
341
342   while(1) {
343     pthread_setspecific(jobq->activejob, NULL);
344     job = eventer_jobq_dequeue(jobq);
345     if(!job) continue;
346     if(!job->fd_event) {
347       free(job);
348       break;
349     }
350     pthread_setspecific(jobq->activejob, job);
351     noitL(eventer_deb, "%p jobq[%s] -> running job [%p]\n", pthread_self_ptr(),
352           jobq->queue_name, job);
353
354     /* Mark our commencement */
355     gettimeofday(&job->start_time, NULL);
356
357     /* Safely check and handle if we've timed out while in queue */
358     pthread_mutex_lock(&job->lock);
359     if(job->timeout_triggered) {
360       struct timeval diff, diff2;
361       /* This happens if the timeout occurred before we even had the change
362        * to pull the job off the queue.  We must be in bad shape here.
363        */
364       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p]\n",
365             pthread_self_ptr(), jobq->queue_name, job);
366       gettimeofday(&job->finish_time, NULL); /* We're done */
367       sub_timeval(job->finish_time, job->fd_event->whence, &diff);
368       sub_timeval(job->finish_time, job->create_time, &diff2);
369       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p] -%0.6f (%0.6f)\n",
370             pthread_self_ptr(), jobq->queue_name, job,
371             (float)diff.tv_sec + (float)diff.tv_usec/1000000.0,
372             (float)diff2.tv_sec + (float)diff2.tv_usec/1000000.0);
373       pthread_mutex_unlock(&job->lock);
374       EVENTER_CALLBACK_ENTRY((void *)job->fd_event->callback, NULL,
375                              job->fd_event->fd, job->fd_event->mask,
376                              EVENTER_ASYNCH_CLEANUP);
377       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
378                               job->fd_event->closure, &job->finish_time);
379       EVENTER_CALLBACK_RETURN((void *)job->fd_event->callback, NULL, -1);
380       eventer_jobq_enqueue(jobq->backq, job);
381       continue;
382     }
383     pthread_mutex_unlock(&job->lock);
384
385     /* Run the job, if we timeout, will be killed with a JOBQ_SIGNAL from
386      * the master thread.  We handle the alarm by longjmp'd out back here.
387      */
388     job->executor = pthread_self();
389     if(0 == (job->fd_event->mask & EVENTER_EVIL_BRUTAL) ||
390        sigsetjmp(env, 1) == 0) {
391       /* We could get hit right here... (timeout and terminated from
392        * another thread.  inflight isn't yet set (next line), so it
393        * won't longjmp.  But timeout_triggered will be set... so we
394        * should recheck that after we mark ourselves inflight.
395        */
396       if(noit_atomic_cas32(&job->inflight, 1, 0) == 0) {
397         if(!job->timeout_triggered) {
398           noitL(eventer_deb, "%p jobq[%s] -> executing [%p]\n",
399                 pthread_self_ptr(), jobq->queue_name, job);
400           /* Choose the right cancellation policy (or none) */
401           if(job->fd_event->mask & EVENTER_CANCEL_ASYNCH) {
402             noitL(eventer_deb, "PTHREAD_CANCEL_ASYNCHRONOUS\n");
403             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
404             pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
405           }
406           else if(job->fd_event->mask & EVENTER_CANCEL_DEFERRED) {
407             noitL(eventer_deb, "PTHREAD_CANCEL_DEFERRED\n");
408             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
409             pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
410           }
411           else {
412             noitL(eventer_deb, "PTHREAD_CANCEL_DISABLE\n");
413             pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
414           }
415           /* run the job */
416           noitL(eventer_deb, "jobq[%s] -> dispatch BEGIN\n", jobq->queue_name);
417           job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_WORK,
418                                   job->fd_event->closure, &job->start_time);
419           noitL(eventer_deb, "jobq[%s] -> dispatch END\n", jobq->queue_name);
420           if(job->fd_event && job->fd_event->mask & EVENTER_CANCEL)
421             pthread_testcancel();
422           /* reset the cancellation policy */
423           pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
424           pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
425         }
426       }
427     }
428
429     job->inflight = 0;
430     noitL(eventer_deb, "%p jobq[%s] -> finished [%p]\n", pthread_self_ptr(),
431           jobq->queue_name, job);
432     /* No we know we won't have siglongjmp called on us */
433
434     gettimeofday(&job->finish_time, NULL);
435     if(job->timeout_event &&
436        eventer_remove(job->timeout_event)) {
437       eventer_free(job->timeout_event);
438     }
439     job->timeout_event = NULL;
440
441     if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
442       /* We need to cleanup... we haven't done it yet. */
443       noitL(eventer_deb, "%p jobq[%s] -> cleanup [%p]\n", pthread_self_ptr(),
444             jobq->queue_name, job);
445       if(job->fd_event)
446         job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
447                                 job->fd_event->closure, &job->finish_time);
448     }
449     eventer_jobq_enqueue(jobq->backq, job);
450   }
451   pthread_cleanup_pop(0);
452   noit_atomic_dec32(&jobq->concurrency);
453   pthread_exit(NULL);
454   return NULL;
455 }
456
457 void eventer_jobq_increase_concurrency(eventer_jobq_t *jobq) {
458   noit_atomic_inc32(&jobq->desired_concurrency);
459 }
460 void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq) {
461   eventer_job_t *job;
462   noit_atomic_dec32(&jobq->desired_concurrency);
463   job = calloc(1, sizeof(*job));
464   eventer_jobq_enqueue(jobq, job);
465 }
466 void eventer_jobq_process_each(void (*func)(eventer_jobq_t *, void *),
467                                void *closure) {
468   const char *key;
469   int klen;
470   void *vjobq;
471   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
472
473   pthread_mutex_lock(&all_queues_lock);
474   while(noit_hash_next(&all_queues, &iter, &key, &klen, &vjobq)) {
475     func((eventer_jobq_t *)vjobq, closure);
476   }
477   pthread_mutex_unlock(&all_queues_lock);
478 }
Note: See TracBrowser for help on using the browser.