root/src/eventer/eventer_jobq.c

Revision c47c1ab0dff1bf118be7458fd28017d9e160cb60, 18.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 months ago)

Make eventer_jobq_init set the backq to the default. This is just
for convenience so that callers don't have to set it except to
overwrite it.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "utils/noit_log.h"
35 #include "utils/noit_atomic.h"
36 #include "eventer/eventer.h"
37 #include "dtrace_probes.h"
38 #include <errno.h>
39 #include <setjmp.h>
40 #include <assert.h>
41 #include <signal.h>
42
43 #ifndef JOBQ_SIGNAL
44 #define JOBQ_SIGNAL SIGALRM
45 #endif
46
47 #define pthread_self_ptr() ((void *)(vpsized_int)pthread_self())
48
49 static noit_atomic32_t threads_jobq_inited = 0;
50 static pthread_key_t threads_jobq;
51 static sigset_t alarm_mask;
52 static noit_hash_table all_queues = NOIT_HASH_EMPTY;
53 pthread_mutex_t all_queues_lock;
54
55 static void
56 eventer_jobq_finished_job(eventer_jobq_t *jobq, eventer_job_t *job) {
57   eventer_hrtime_t wait_time = job->start_hrtime - job->create_hrtime;
58   eventer_hrtime_t run_time = job->finish_hrtime - job->start_hrtime;
59   noit_atomic_dec32(&jobq->inflight);
60   if(job->timeout_triggered) noit_atomic_inc64(&jobq->timeouts);
61   while(1) {
62     eventer_hrtime_t newv = jobq->avg_wait_ns * 0.8 + wait_time * 0.2;
63     if(noit_atomic_cas64(&jobq->avg_wait_ns, newv, jobq->avg_wait_ns) == jobq->avg_wait_ns)
64       break;
65   }
66   while(1) {
67     eventer_hrtime_t newv = jobq->avg_run_ns * 0.8 + run_time * 0.2;
68     if(noit_atomic_cas64(&jobq->avg_run_ns, newv, jobq->avg_run_ns) == jobq->avg_run_ns)
69       break;
70   }
71 }
72
73 static void
74 eventer_jobq_handler(int signo)
75 {
76   eventer_jobq_t *jobq;
77   eventer_job_t *job;
78   sigjmp_buf *env;
79
80   jobq = pthread_getspecific(threads_jobq);
81   assert(jobq);
82   env = pthread_getspecific(jobq->threadenv);
83   job = pthread_getspecific(jobq->activejob);
84   if(env && job && job->fd_event->mask & EVENTER_EVIL_BRUTAL)
85     if(noit_atomic_cas32(&job->inflight, 0, 1) == 1)
86        siglongjmp(*env, 1);
87 }
88
89 int
90 eventer_jobq_init(eventer_jobq_t *jobq, const char *queue_name) {
91   pthread_mutexattr_t mutexattr;
92
93   if(noit_atomic_cas32(&threads_jobq_inited, 1, 0) == 0) {
94     struct sigaction act;
95
96     sigemptyset(&alarm_mask);
97     sigaddset(&alarm_mask, JOBQ_SIGNAL);
98     act.sa_handler = eventer_jobq_handler;
99     act.sa_flags = 0;
100     sigemptyset(&act.sa_mask);
101
102     if(sigaction(JOBQ_SIGNAL, &act, NULL) < 0) {
103       noitL(noit_error, "Cannot initialize signal handler: %s\n",
104             strerror(errno));
105       return -1;
106     }
107
108     if(pthread_key_create(&threads_jobq, NULL)) {
109       noitL(noit_error, "Cannot initialize thread-specific jobq: %s\n",
110             strerror(errno));
111       return -1;
112     }
113     if(pthread_mutex_init(&all_queues_lock, NULL)) {
114       noitL(noit_error, "Cannot initialize all_queues mutex: %s\n",
115             strerror(errno));
116       return -1;
117     }
118   }
119
120   memset(jobq, 0, sizeof(*jobq));
121   jobq->queue_name = strdup(queue_name);
122   if(pthread_mutexattr_init(&mutexattr) != 0) {
123     noitL(noit_error, "Cannot initialize lock attributes\n");
124     return -1;
125   }
126   if(pthread_mutex_init(&jobq->lock, &mutexattr) != 0) {
127     noitL(noit_error, "Cannot initialize lock\n");
128     return -1;
129   }
130   if(sem_init(&jobq->semaphore, 0, 0) != 0) {
131     noitL(noit_error, "Cannot initialize semaphore: %s\n",
132           strerror(errno));
133     return -1;
134   }
135   if(pthread_key_create(&jobq->activejob, NULL)) {
136     noitL(noit_error, "Cannot initialize thread-specific activejob: %s\n",
137           strerror(errno));
138     return -1;
139   }
140   if(pthread_key_create(&jobq->threadenv, NULL)) {
141     noitL(noit_error, "Cannot initialize thread-specific sigsetjmp env: %s\n",
142           strerror(errno));
143     return -1;
144   }
145   jobq->backq = eventer_default_backq();
146   pthread_mutex_lock(&all_queues_lock);
147   if(noit_hash_store(&all_queues, jobq->queue_name, strlen(jobq->queue_name),
148                      jobq) == 0) {
149     noitL(noit_error, "Duplicate queue name!\n");
150     pthread_mutex_unlock(&all_queues_lock);
151     return -1;
152   }
153   pthread_mutex_unlock(&all_queues_lock);
154   return 0;
155 }
156
157 eventer_jobq_t *
158 eventer_jobq_retrieve(const char *name) {
159   void *vjq = NULL;
160   pthread_mutex_lock(&all_queues_lock);
161   (void)noit_hash_retrieve(&all_queues, name, strlen(name), &vjq);
162   pthread_mutex_unlock(&all_queues_lock);
163   return vjq;
164 }
165
166 static void *
167 eventer_jobq_consumer_pthreadentry(void *vp) {
168   return eventer_jobq_consumer((eventer_jobq_t *)vp);
169 }
170 static void
171 eventer_jobq_maybe_spawn(eventer_jobq_t *jobq) {
172   int32_t current = jobq->concurrency;
173   /* if we've no desired concurrency, this doesn't apply to us */
174   if(jobq->desired_concurrency == 0) return;
175   /* See if we need to launch one */
176   if(jobq->desired_concurrency > current) {
177     /* we need another thread, maybe... this is a race as we do the
178      * increment in the new thread, but we check there and back it out
179      * if we did something we weren't supposed to. */
180     pthread_t tid;
181     pthread_attr_t tattr;
182     noitL(eventer_deb, "Starting queue[%s] thread now at %d\n",
183           jobq->queue_name, jobq->concurrency);
184     pthread_attr_init(&tattr);
185     pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
186     pthread_create(&tid, &tattr, eventer_jobq_consumer_pthreadentry, jobq);
187   }
188   noitL(eventer_deb, "jobq_queue[%s] pending cancels [%d/%d]\n",
189         jobq->queue_name, jobq->pending_cancels,
190         jobq->desired_concurrency);
191   if(jobq->pending_cancels == jobq->desired_concurrency) {
192     /* we're absolutely screwed at this point... it's time to just die */
193     noitL(noit_error, "jobq_queue[%s] induced [%d/%d] game over.\n",
194           jobq->queue_name, jobq->pending_cancels,
195           jobq->desired_concurrency);
196     assert(jobq->pending_cancels != jobq->desired_concurrency);
197   }
198 }
199 void
200 eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job) {
201   job->next = NULL;
202   eventer_jobq_maybe_spawn(jobq);
203   pthread_mutex_lock(&jobq->lock);
204   if(jobq->tailq) {
205     /* If there is a tail (queue has items), just push it on the end. */
206     jobq->tailq->next = job;
207     jobq->tailq = job;
208   }
209   else {
210     /* Otherwise, this is the first and only item on the list. */
211     jobq->headq = jobq->tailq = job;
212   }
213   pthread_mutex_unlock(&jobq->lock);
214   noit_atomic_inc64(&jobq->total_jobs);
215   noit_atomic_inc32(&jobq->backlog);
216
217   /* Signal consumers */
218   sem_post(&jobq->semaphore);
219 }
220
221 static eventer_job_t *
222 __eventer_jobq_dequeue(eventer_jobq_t *jobq, int should_wait) {
223   eventer_job_t *job = NULL;
224
225   /* Wait for a job */
226   if(should_wait) while(sem_wait(&jobq->semaphore) && errno == EINTR);
227   /* Or Try-wait for a job */
228   else if(sem_trywait(&jobq->semaphore)) return NULL;
229
230   pthread_mutex_lock(&jobq->lock);
231   if(jobq->headq) {
232     /* If there are items, pop and advance the header pointer */
233     job = jobq->headq;
234     jobq->headq = jobq->headq->next;
235     if(!jobq->headq) jobq->tailq = NULL;
236   }
237   pthread_mutex_unlock(&jobq->lock);
238
239   if(job) {
240     job->next = NULL; /* To reduce any confusion */
241     noit_atomic_dec32(&jobq->backlog);
242     noit_atomic_inc32(&jobq->inflight);
243   }
244   /* Our semaphores are counting semaphores, not locks. */
245   /* coverity[missing_unlock] */
246   return job;
247 }
248
249 eventer_job_t *
250 eventer_jobq_dequeue(eventer_jobq_t *jobq) {
251   return __eventer_jobq_dequeue(jobq, 1);
252 }
253
254 eventer_job_t *
255 eventer_jobq_dequeue_nowait(eventer_jobq_t *jobq) {
256   return __eventer_jobq_dequeue(jobq, 0);
257 }
258
259 void
260 eventer_jobq_destroy(eventer_jobq_t *jobq) {
261   pthread_mutex_destroy(&jobq->lock);
262   sem_destroy(&jobq->semaphore);
263 }
264 int
265 eventer_jobq_execute_timeout(eventer_t e, int mask, void *closure,
266                              struct timeval *now) {
267   eventer_job_t *job = closure;
268   job->timeout_triggered = 1;
269   job->timeout_event = NULL;
270   noitL(eventer_deb, "%p jobq -> timeout job [%p]\n", pthread_self_ptr(), job);
271   if(job->inflight) {
272     eventer_job_t *jobcopy;
273     if(job->fd_event->mask & (EVENTER_CANCEL)) {
274       eventer_t my_precious = job->fd_event;
275       /* we set this to null so we can't complete on it */
276       job->fd_event = NULL;
277       noitL(eventer_deb, "[inline] timeout cancelling job\n");
278       noit_atomic_inc32(&job->jobq->pending_cancels);
279       pthread_cancel(job->executor);
280       /* complete on it ourselves */
281       if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
282         /* We need to cleanup... we haven't done it yet. */
283         noitL(eventer_deb, "[inline] %p jobq[%s] -> cleanup [%p]\n",
284               pthread_self_ptr(), job->jobq->queue_name, job);
285         /* This is the real question... asynch cleanup is supposed to
286          * be called asynch -- we're going to call it synchronously
287          * I think this is a bad idea, but not cleaning up seems worse.
288          * Because we're synchronous, if we hang, we'll be watchdogged.
289          *
290          * Uncooperative plugins/third-party libs can truly suck
291          * one's soul out.
292          */
293         if(my_precious) {
294           gettimeofday(&job->finish_time, NULL); /* We're done */
295           EVENTER_CALLBACK_ENTRY((void *)my_precious->callback, NULL,
296                                  my_precious->fd, my_precious->mask,
297                                  EVENTER_ASYNCH_CLEANUP);
298           my_precious->callback(my_precious, EVENTER_ASYNCH_CLEANUP,
299                                 my_precious->closure, &job->finish_time);
300           EVENTER_CALLBACK_RETURN((void *)my_precious->callback, NULL, -1);
301         }
302       }
303       jobcopy = malloc(sizeof(*jobcopy));
304       memcpy(jobcopy, job, sizeof(*jobcopy));
305       free(job);
306       jobcopy->fd_event = my_precious;
307       job->finish_hrtime = eventer_gethrtime();
308       eventer_jobq_maybe_spawn(jobcopy->jobq);
309       eventer_jobq_finished_job(jobcopy->jobq, jobcopy);
310       eventer_jobq_enqueue(jobcopy->jobq->backq, jobcopy);
311       eventer_wakeup();
312     }
313     else
314       pthread_kill(job->executor, JOBQ_SIGNAL);
315   }
316   return 0;
317 }
318 int
319 eventer_jobq_consume_available(eventer_t e, int mask, void *closure,
320                                struct timeval *now) {
321   eventer_jobq_t *jobq = closure;
322   eventer_job_t *job;
323   /* This can only be called with a backq jobq
324    * (a standalone queue with no backq itself)
325    */
326   assert(jobq && !jobq->backq);
327   while((job = eventer_jobq_dequeue_nowait(jobq)) != NULL) {
328     int newmask;
329     EVENTER_CALLBACK_ENTRY((void *)job->fd_event->callback, NULL,
330                            job->fd_event->fd, job->fd_event->mask,
331                            job->fd_event->mask);
332     newmask = job->fd_event->callback(job->fd_event, job->fd_event->mask,
333                                       job->fd_event->closure, now);
334     EVENTER_CALLBACK_RETURN((void *)job->fd_event->callback, NULL, newmask);
335     if(!newmask) eventer_free(job->fd_event);
336     else {
337       job->fd_event->mask = newmask;
338       eventer_add(job->fd_event);
339     }
340     job->fd_event = NULL;
341     assert(job->timeout_event == NULL);
342     noit_atomic_dec32(&jobq->inflight);
343     free(job);
344   }
345   return EVENTER_RECURRENT;
346 }
347 static void
348 eventer_jobq_cancel_cleanup(void *vp) {
349   eventer_jobq_t *jobq = vp;
350   noit_atomic_dec32(&jobq->pending_cancels);
351   noit_atomic_dec32(&jobq->concurrency);
352 }
353 void *
354 eventer_jobq_consumer(eventer_jobq_t *jobq) {
355   eventer_job_t *job;
356   int32_t current_count;
357   sigjmp_buf env;
358
359   assert(jobq->backq);
360   current_count = noit_atomic_inc32(&jobq->concurrency);
361   noitL(eventer_deb, "jobq[%s] -> %d\n", jobq->queue_name, current_count);
362   if(current_count > jobq->desired_concurrency) {
363     noitL(eventer_deb, "jobq[%s] over provisioned, backing out.",
364           jobq->queue_name);
365     noit_atomic_dec32(&jobq->concurrency);
366     pthread_exit(NULL);
367     return NULL;
368   }
369   /* Each thread can consume from only one queue */
370   pthread_setspecific(threads_jobq, jobq);
371   pthread_setspecific(jobq->threadenv, &env);
372   pthread_cleanup_push(eventer_jobq_cancel_cleanup, jobq);
373
374   while(1) {
375     pthread_setspecific(jobq->activejob, NULL);
376     job = eventer_jobq_dequeue(jobq);
377     if(!job) continue;
378     if(!job->fd_event) {
379       free(job);
380       break;
381     }
382     pthread_setspecific(jobq->activejob, job);
383     noitL(eventer_deb, "%p jobq[%s] -> running job [%p]\n", pthread_self_ptr(),
384           jobq->queue_name, job);
385
386     /* Mark our commencement */
387     job->start_hrtime = eventer_gethrtime();
388
389     /* Safely check and handle if we've timed out while in queue */
390     pthread_mutex_lock(&job->lock);
391     if(job->timeout_triggered) {
392       struct timeval diff, diff2;
393       eventer_hrtime_t udiff2;
394       /* This happens if the timeout occurred before we even had the change
395        * to pull the job off the queue.  We must be in bad shape here.
396        */
397       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p]\n",
398             pthread_self_ptr(), jobq->queue_name, job);
399       gettimeofday(&job->finish_time, NULL); /* We're done */
400       job->finish_hrtime = eventer_gethrtime();
401       sub_timeval(job->finish_time, job->fd_event->whence, &diff);
402       udiff2 = (job->finish_hrtime - job->create_hrtime)/1000;
403       diff2.tv_sec = udiff2/1000000;
404       diff2.tv_usec = udiff2%1000000;
405       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p] -%0.6f (%0.6f)\n",
406             pthread_self_ptr(), jobq->queue_name, job,
407             (float)diff.tv_sec + (float)diff.tv_usec/1000000.0,
408             (float)diff2.tv_sec + (float)diff2.tv_usec/1000000.0);
409       pthread_mutex_unlock(&job->lock);
410       EVENTER_CALLBACK_ENTRY((void *)job->fd_event->callback, NULL,
411                              job->fd_event->fd, job->fd_event->mask,
412                              EVENTER_ASYNCH_CLEANUP);
413       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
414                               job->fd_event->closure, &job->finish_time);
415       EVENTER_CALLBACK_RETURN((void *)job->fd_event->callback, NULL, -1);
416       eventer_jobq_finished_job(jobq, job);
417       eventer_jobq_enqueue(jobq->backq, job);
418       eventer_wakeup();
419       continue;
420     }
421     pthread_mutex_unlock(&job->lock);
422
423     /* Run the job, if we timeout, will be killed with a JOBQ_SIGNAL from
424      * the master thread.  We handle the alarm by longjmp'd out back here.
425      */
426     job->executor = pthread_self();
427     if(0 == (job->fd_event->mask & EVENTER_EVIL_BRUTAL) ||
428        sigsetjmp(env, 1) == 0) {
429       /* We could get hit right here... (timeout and terminated from
430        * another thread.  inflight isn't yet set (next line), so it
431        * won't longjmp.  But timeout_triggered will be set... so we
432        * should recheck that after we mark ourselves inflight.
433        */
434       if(noit_atomic_cas32(&job->inflight, 1, 0) == 0) {
435         if(!job->timeout_triggered) {
436           noitL(eventer_deb, "%p jobq[%s] -> executing [%p]\n",
437                 pthread_self_ptr(), jobq->queue_name, job);
438           /* Choose the right cancellation policy (or none) */
439           if(job->fd_event->mask & EVENTER_CANCEL_ASYNCH) {
440             noitL(eventer_deb, "PTHREAD_CANCEL_ASYNCHRONOUS\n");
441             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
442             pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
443           }
444           else if(job->fd_event->mask & EVENTER_CANCEL_DEFERRED) {
445             noitL(eventer_deb, "PTHREAD_CANCEL_DEFERRED\n");
446             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
447             pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
448           }
449           else {
450             noitL(eventer_deb, "PTHREAD_CANCEL_DISABLE\n");
451             pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
452           }
453           /* run the job */
454           struct timeval start_time;
455           gettimeofday(&start_time, NULL);
456           noitL(eventer_deb, "jobq[%s] -> dispatch BEGIN\n", jobq->queue_name);
457           job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_WORK,
458                                   job->fd_event->closure, &start_time);
459           noitL(eventer_deb, "jobq[%s] -> dispatch END\n", jobq->queue_name);
460           if(job->fd_event && job->fd_event->mask & EVENTER_CANCEL)
461             pthread_testcancel();
462           /* reset the cancellation policy */
463           pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
464           pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
465         }
466       }
467     }
468
469     job->inflight = 0;
470     noitL(eventer_deb, "%p jobq[%s] -> finished [%p]\n", pthread_self_ptr(),
471           jobq->queue_name, job);
472     /* No we know we won't have siglongjmp called on us */
473
474     gettimeofday(&job->finish_time, NULL);
475     if(job->timeout_event &&
476        eventer_remove(job->timeout_event)) {
477       eventer_free(job->timeout_event);
478     }
479     job->timeout_event = NULL;
480
481     if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
482       /* We need to cleanup... we haven't done it yet. */
483       noitL(eventer_deb, "%p jobq[%s] -> cleanup [%p]\n", pthread_self_ptr(),
484             jobq->queue_name, job);
485       /* threaded issue, need to recheck. */
486       /* coverity[check_after_deref] */
487       if(job->fd_event)
488         job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
489                                 job->fd_event->closure, &job->finish_time);
490     }
491     job->finish_hrtime = eventer_gethrtime();
492     eventer_jobq_finished_job(jobq, job);
493     eventer_jobq_enqueue(jobq->backq, job);
494     eventer_wakeup();
495   }
496   pthread_cleanup_pop(0);
497   noit_atomic_dec32(&jobq->concurrency);
498   pthread_exit(NULL);
499   return NULL;
500 }
501
502 void eventer_jobq_increase_concurrency(eventer_jobq_t *jobq) {
503   noit_atomic_inc32(&jobq->desired_concurrency);
504 }
505 void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq) {
506   eventer_job_t *job;
507   noit_atomic_dec32(&jobq->desired_concurrency);
508   job = calloc(1, sizeof(*job));
509   eventer_jobq_enqueue(jobq, job);
510 }
511 void eventer_jobq_process_each(void (*func)(eventer_jobq_t *, void *),
512                                void *closure) {
513   const char *key;
514   int klen;
515   void *vjobq;
516   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
517
518   pthread_mutex_lock(&all_queues_lock);
519   while(noit_hash_next(&all_queues, &iter, &key, &klen, &vjobq)) {
520     func((eventer_jobq_t *)vjobq, closure);
521   }
522   pthread_mutex_unlock(&all_queues_lock);
523 }
Note: See TracBrowser for help on using the browser.