root/src/eventer/eventer_jobq.c

Revision 3bf08fe9560305bad51697deb8cf72e0d2574262, 20.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 months ago)

the job could be entirely freed by the time we call wakeup (wakeup our own copy)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2014, Circonus, Inc.
3  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include "noit_defines.h"
35 #include "utils/noit_memory.h"
36 #include "utils/noit_log.h"
37 #include "utils/noit_atomic.h"
38 #include "eventer/eventer.h"
39 #include "dtrace_probes.h"
40 #include <errno.h>
41 #include <setjmp.h>
42 #include <assert.h>
43 #include <signal.h>
44
45 #ifndef JOBQ_SIGNAL
46 #define JOBQ_SIGNAL SIGALRM
47 #endif
48
49 #define pthread_self_ptr() ((void *)(vpsized_int)pthread_self())
50
51 static noit_atomic32_t threads_jobq_inited = 0;
52 static pthread_key_t threads_jobq;
53 static sigset_t alarm_mask;
54 static noit_hash_table all_queues = NOIT_HASH_EMPTY;
55 pthread_mutex_t all_queues_lock;
56
57 static void
58 eventer_jobq_finished_job(eventer_jobq_t *jobq, eventer_job_t *job) {
59   eventer_hrtime_t wait_time = job->start_hrtime - job->create_hrtime;
60   eventer_hrtime_t run_time = job->finish_hrtime - job->start_hrtime;
61   noit_atomic_dec32(&jobq->inflight);
62   if(job->timeout_triggered) noit_atomic_inc64(&jobq->timeouts);
63   while(1) {
64     eventer_hrtime_t newv = jobq->avg_wait_ns * 0.8 + wait_time * 0.2;
65     if(noit_atomic_cas64(&jobq->avg_wait_ns, newv, jobq->avg_wait_ns) == jobq->avg_wait_ns)
66       break;
67   }
68   while(1) {
69     eventer_hrtime_t newv = jobq->avg_run_ns * 0.8 + run_time * 0.2;
70     if(noit_atomic_cas64(&jobq->avg_run_ns, newv, jobq->avg_run_ns) == jobq->avg_run_ns)
71       break;
72   }
73 }
74
75 static void
76 eventer_jobq_handler(int signo)
77 {
78   eventer_jobq_t *jobq;
79   eventer_job_t *job;
80   sigjmp_buf *env;
81
82   jobq = pthread_getspecific(threads_jobq);
83   assert(jobq);
84   env = pthread_getspecific(jobq->threadenv);
85   job = pthread_getspecific(jobq->activejob);
86   if(env && job && job->fd_event->mask & EVENTER_EVIL_BRUTAL)
87     if(noit_atomic_cas32(&job->inflight, 0, 1) == 1)
88        siglongjmp(*env, 1);
89 }
90
91 int
92 eventer_jobq_init(eventer_jobq_t *jobq, const char *queue_name) {
93   pthread_mutexattr_t mutexattr;
94
95   if(noit_atomic_cas32(&threads_jobq_inited, 1, 0) == 0) {
96     struct sigaction act;
97
98     sigemptyset(&alarm_mask);
99     sigaddset(&alarm_mask, JOBQ_SIGNAL);
100     act.sa_handler = eventer_jobq_handler;
101     act.sa_flags = 0;
102     sigemptyset(&act.sa_mask);
103
104     if(sigaction(JOBQ_SIGNAL, &act, NULL) < 0) {
105       noitL(noit_error, "Cannot initialize signal handler: %s\n",
106             strerror(errno));
107       return -1;
108     }
109
110     if(pthread_key_create(&threads_jobq, NULL)) {
111       noitL(noit_error, "Cannot initialize thread-specific jobq: %s\n",
112             strerror(errno));
113       return -1;
114     }
115     if(pthread_mutex_init(&all_queues_lock, NULL)) {
116       noitL(noit_error, "Cannot initialize all_queues mutex: %s\n",
117             strerror(errno));
118       return -1;
119     }
120   }
121
122   memset(jobq, 0, sizeof(*jobq));
123   jobq->queue_name = strdup(queue_name);
124   if(pthread_mutexattr_init(&mutexattr) != 0) {
125     noitL(noit_error, "Cannot initialize lock attributes\n");
126     return -1;
127   }
128   if(pthread_mutex_init(&jobq->lock, &mutexattr) != 0) {
129     noitL(noit_error, "Cannot initialize lock\n");
130     return -1;
131   }
132   if(sem_init(&jobq->semaphore, 0, 0) != 0) {
133     noitL(noit_error, "Cannot initialize semaphore: %s\n",
134           strerror(errno));
135     return -1;
136   }
137   if(pthread_key_create(&jobq->activejob, NULL)) {
138     noitL(noit_error, "Cannot initialize thread-specific activejob: %s\n",
139           strerror(errno));
140     return -1;
141   }
142   if(pthread_key_create(&jobq->threadenv, NULL)) {
143     noitL(noit_error, "Cannot initialize thread-specific sigsetjmp env: %s\n",
144           strerror(errno));
145     return -1;
146   }
147   pthread_mutex_lock(&all_queues_lock);
148   if(noit_hash_store(&all_queues, jobq->queue_name, strlen(jobq->queue_name),
149                      jobq) == 0) {
150     noitL(noit_error, "Duplicate queue name!\n");
151     pthread_mutex_unlock(&all_queues_lock);
152     return -1;
153   }
154   pthread_mutex_unlock(&all_queues_lock);
155   return 0;
156 }
157
158 eventer_jobq_t *
159 eventer_jobq_retrieve(const char *name) {
160   void *vjq = NULL;
161   pthread_mutex_lock(&all_queues_lock);
162   (void)noit_hash_retrieve(&all_queues, name, strlen(name), &vjq);
163   pthread_mutex_unlock(&all_queues_lock);
164   return vjq;
165 }
166
167 static void *
168 eventer_jobq_consumer_pthreadentry(void *vp) {
169   noit_memory_init_thread();
170   return eventer_jobq_consumer((eventer_jobq_t *)vp);
171 }
172 static void
173 eventer_jobq_maybe_spawn(eventer_jobq_t *jobq) {
174   int32_t current = jobq->concurrency;
175   /* if we've no desired concurrency, this doesn't apply to us */
176   if(jobq->desired_concurrency == 0) return;
177   /* See if we need to launch one */
178   if(jobq->desired_concurrency > current) {
179     /* we need another thread, maybe... this is a race as we do the
180      * increment in the new thread, but we check there and back it out
181      * if we did something we weren't supposed to. */
182     pthread_t tid;
183     pthread_attr_t tattr;
184     noitL(eventer_deb, "Starting queue[%s] thread now at %d\n",
185           jobq->queue_name, jobq->concurrency);
186     pthread_attr_init(&tattr);
187     pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
188     pthread_create(&tid, &tattr, eventer_jobq_consumer_pthreadentry, jobq);
189   }
190   noitL(eventer_deb, "jobq_queue[%s] pending cancels [%d/%d]\n",
191         jobq->queue_name, jobq->pending_cancels,
192         jobq->desired_concurrency);
193   if(jobq->pending_cancels == jobq->desired_concurrency) {
194     /* we're absolutely screwed at this point... it's time to just die */
195     noitL(noit_error, "jobq_queue[%s] induced [%d/%d] game over.\n",
196           jobq->queue_name, jobq->pending_cancels,
197           jobq->desired_concurrency);
198     assert(jobq->pending_cancels != jobq->desired_concurrency);
199   }
200 }
201 void
202 eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job) {
203   job->next = NULL;
204   eventer_jobq_maybe_spawn(jobq);
205   pthread_mutex_lock(&jobq->lock);
206   if(jobq->tailq) {
207     /* If there is a tail (queue has items), just push it on the end. */
208     jobq->tailq->next = job;
209     jobq->tailq = job;
210   }
211   else {
212     /* Otherwise, this is the first and only item on the list. */
213     jobq->headq = jobq->tailq = job;
214   }
215   pthread_mutex_unlock(&jobq->lock);
216   noit_atomic_inc64(&jobq->total_jobs);
217   noit_atomic_inc32(&jobq->backlog);
218
219   /* Signal consumers */
220   sem_post(&jobq->semaphore);
221 }
222
223 static eventer_job_t *
224 __eventer_jobq_dequeue(eventer_jobq_t *jobq, int should_wait) {
225   eventer_job_t *job = NULL;
226
227   /* Wait for a job */
228   if(should_wait) while(sem_wait(&jobq->semaphore) && errno == EINTR);
229   /* Or Try-wait for a job */
230   else if(sem_trywait(&jobq->semaphore)) return NULL;
231
232   pthread_mutex_lock(&jobq->lock);
233   if(jobq->headq) {
234     /* If there are items, pop and advance the header pointer */
235     job = jobq->headq;
236     jobq->headq = jobq->headq->next;
237     if(!jobq->headq) jobq->tailq = NULL;
238   }
239   pthread_mutex_unlock(&jobq->lock);
240
241   if(job) {
242     job->next = NULL; /* To reduce any confusion */
243     noit_atomic_dec32(&jobq->backlog);
244     noit_atomic_inc32(&jobq->inflight);
245   }
246   /* Our semaphores are counting semaphores, not locks. */
247   /* coverity[missing_unlock] */
248   return job;
249 }
250
251 eventer_job_t *
252 eventer_jobq_dequeue(eventer_jobq_t *jobq) {
253   return __eventer_jobq_dequeue(jobq, 1);
254 }
255
256 eventer_job_t *
257 eventer_jobq_dequeue_nowait(eventer_jobq_t *jobq) {
258   return __eventer_jobq_dequeue(jobq, 0);
259 }
260
261 void
262 eventer_jobq_destroy(eventer_jobq_t *jobq) {
263   pthread_mutex_destroy(&jobq->lock);
264   sem_destroy(&jobq->semaphore);
265 }
266 int
267 eventer_jobq_execute_timeout(eventer_t e, int mask, void *closure,
268                              struct timeval *now) {
269   eventer_job_t *job = closure;
270   job->timeout_triggered = 1;
271   job->timeout_event = NULL;
272   noitL(eventer_deb, "%p jobq -> timeout job [%p]\n", pthread_self_ptr(), job);
273   if(job->inflight) {
274     eventer_job_t *jobcopy;
275     if(job->fd_event->mask & (EVENTER_CANCEL)) {
276       struct _event wakeupcopy;
277       eventer_t my_precious = job->fd_event;
278       /* we set this to null so we can't complete on it */
279       job->fd_event = NULL;
280       noitL(eventer_deb, "[inline] timeout cancelling job\n");
281       noit_atomic_inc32(&job->jobq->pending_cancels);
282       pthread_cancel(job->executor);
283       /* complete on it ourselves */
284       if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
285         /* We need to cleanup... we haven't done it yet. */
286         noitL(eventer_deb, "[inline] %p jobq[%s] -> cleanup [%p]\n",
287               pthread_self_ptr(), job->jobq->queue_name, job);
288         /* This is the real question... asynch cleanup is supposed to
289          * be called asynch -- we're going to call it synchronously
290          * I think this is a bad idea, but not cleaning up seems worse.
291          * Because we're synchronous, if we hang, we'll be watchdogged.
292          *
293          * Uncooperative plugins/third-party libs can truly suck
294          * one's soul out.
295          */
296         if(my_precious) {
297           gettimeofday(&job->finish_time, NULL); /* We're done */
298           EVENTER_CALLBACK_ENTRY((void *)my_precious, (void *)my_precious->callback, NULL,
299                                  my_precious->fd, my_precious->mask,
300                                  EVENTER_ASYNCH_CLEANUP);
301           my_precious->callback(my_precious, EVENTER_ASYNCH_CLEANUP,
302                                 my_precious->closure, &job->finish_time);
303           EVENTER_CALLBACK_RETURN((void *)my_precious, (void *)my_precious->callback, NULL, -1);
304         }
305       }
306       jobcopy = malloc(sizeof(*jobcopy));
307       memcpy(jobcopy, job, sizeof(*jobcopy));
308       free(job);
309       jobcopy->fd_event = my_precious;
310       job->finish_hrtime = eventer_gethrtime();
311       eventer_jobq_maybe_spawn(jobcopy->jobq);
312       eventer_jobq_finished_job(jobcopy->jobq, jobcopy);
313       memcpy(&wakeupcopy, jobcopy->fd_event, sizeof(wakeupcopy));
314       eventer_jobq_enqueue(eventer_default_backq(jobcopy->fd_event), jobcopy);
315       eventer_wakeup(&wakeupcopy);
316     }
317     else
318       pthread_kill(job->executor, JOBQ_SIGNAL);
319   }
320   return 0;
321 }
322 int
323 eventer_jobq_consume_available(eventer_t e, int mask, void *closure,
324                                struct timeval *now) {
325   eventer_jobq_t *jobq = closure;
326   eventer_job_t *job;
327   /* This can only be called with a backq jobq
328    * (a standalone queue with no backq itself)
329    */
330   assert(jobq);
331   while((job = eventer_jobq_dequeue_nowait(jobq)) != NULL) {
332     int newmask;
333     noit_memory_begin();
334     EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
335                            (void *)job->fd_event->callback, NULL,
336                            job->fd_event->fd, job->fd_event->mask,
337                            job->fd_event->mask);
338     newmask = job->fd_event->callback(job->fd_event, job->fd_event->mask,
339                                       job->fd_event->closure, now);
340     EVENTER_CALLBACK_RETURN((void *)job->fd_event,
341                             (void *)job->fd_event->callback, NULL, newmask);
342     noit_memory_end();
343     if(!newmask) eventer_free(job->fd_event);
344     else {
345       job->fd_event->mask = newmask;
346       eventer_add(job->fd_event);
347     }
348     job->fd_event = NULL;
349     assert(job->timeout_event == NULL);
350     noit_atomic_dec32(&jobq->inflight);
351     free(job);
352   }
353   return EVENTER_RECURRENT;
354 }
355 static void
356 eventer_jobq_cancel_cleanup(void *vp) {
357   eventer_jobq_t *jobq = vp;
358   noit_atomic_dec32(&jobq->pending_cancels);
359   noit_atomic_dec32(&jobq->concurrency);
360 }
361 void *
362 eventer_jobq_consumer(eventer_jobq_t *jobq) {
363   eventer_job_t *job;
364   int32_t current_count;
365   sigjmp_buf env;
366
367   current_count = noit_atomic_inc32(&jobq->concurrency);
368   noitL(eventer_deb, "jobq[%s] -> %d\n", jobq->queue_name, current_count);
369   if(current_count > jobq->desired_concurrency) {
370     noitL(eventer_deb, "jobq[%s] over provisioned, backing out.",
371           jobq->queue_name);
372     noit_atomic_dec32(&jobq->concurrency);
373     pthread_exit(NULL);
374     return NULL;
375   }
376   /* Each thread can consume from only one queue */
377   pthread_setspecific(threads_jobq, jobq);
378   pthread_setspecific(jobq->threadenv, &env);
379   pthread_cleanup_push(eventer_jobq_cancel_cleanup, jobq);
380
381   noit_memory_begin();
382   while(1) {
383     struct _event wakeupcopy;
384     pthread_setspecific(jobq->activejob, NULL);
385     noit_memory_end();
386     noit_memory_maintenance();
387     job = eventer_jobq_dequeue(jobq);
388     noit_memory_begin();
389     if(!job) continue;
390     if(!job->fd_event) {
391       free(job);
392       break;
393     }
394     pthread_setspecific(jobq->activejob, job);
395     noitL(eventer_deb, "%p jobq[%s] -> running job [%p]\n", pthread_self_ptr(),
396           jobq->queue_name, job);
397
398     /* Mark our commencement */
399     job->start_hrtime = eventer_gethrtime();
400
401     /* Safely check and handle if we've timed out while in queue */
402     pthread_mutex_lock(&job->lock);
403     if(job->timeout_triggered) {
404       struct timeval diff, diff2;
405       eventer_hrtime_t udiff2;
406       /* This happens if the timeout occurred before we even had the change
407        * to pull the job off the queue.  We must be in bad shape here.
408        */
409       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p]\n",
410             pthread_self_ptr(), jobq->queue_name, job);
411       gettimeofday(&job->finish_time, NULL); /* We're done */
412       job->finish_hrtime = eventer_gethrtime();
413       sub_timeval(job->finish_time, job->fd_event->whence, &diff);
414       udiff2 = (job->finish_hrtime - job->create_hrtime)/1000;
415       diff2.tv_sec = udiff2/1000000;
416       diff2.tv_usec = udiff2%1000000;
417       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p] -%0.6f (%0.6f)\n",
418             pthread_self_ptr(), jobq->queue_name, job,
419             (float)diff.tv_sec + (float)diff.tv_usec/1000000.0,
420             (float)diff2.tv_sec + (float)diff2.tv_usec/1000000.0);
421       pthread_mutex_unlock(&job->lock);
422       EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
423                              (void *)job->fd_event->callback, NULL,
424                              job->fd_event->fd, job->fd_event->mask,
425                              EVENTER_ASYNCH_CLEANUP);
426       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
427                               job->fd_event->closure, &job->finish_time);
428       EVENTER_CALLBACK_RETURN((void *)job->fd_event,
429                               (void *)job->fd_event->callback, NULL, -1);
430       eventer_jobq_finished_job(jobq, job);
431       memcpy(&wakeupcopy, job->fd_event, sizeof(wakeupcopy));
432       eventer_jobq_enqueue(eventer_default_backq(job->fd_event), job);
433       eventer_wakeup(&wakeupcopy);
434       continue;
435     }
436     pthread_mutex_unlock(&job->lock);
437
438     /* Run the job, if we timeout, will be killed with a JOBQ_SIGNAL from
439      * the master thread.  We handle the alarm by longjmp'd out back here.
440      */
441     job->executor = pthread_self();
442     if(0 == (job->fd_event->mask & EVENTER_EVIL_BRUTAL) ||
443        sigsetjmp(env, 1) == 0) {
444       /* We could get hit right here... (timeout and terminated from
445        * another thread.  inflight isn't yet set (next line), so it
446        * won't longjmp.  But timeout_triggered will be set... so we
447        * should recheck that after we mark ourselves inflight.
448        */
449       if(noit_atomic_cas32(&job->inflight, 1, 0) == 0) {
450         if(!job->timeout_triggered) {
451           noitL(eventer_deb, "%p jobq[%s] -> executing [%p]\n",
452                 pthread_self_ptr(), jobq->queue_name, job);
453           /* Choose the right cancellation policy (or none) */
454           if(job->fd_event->mask & EVENTER_CANCEL_ASYNCH) {
455             noitL(eventer_deb, "PTHREAD_CANCEL_ASYNCHRONOUS\n");
456             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
457             pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
458           }
459           else if(job->fd_event->mask & EVENTER_CANCEL_DEFERRED) {
460             noitL(eventer_deb, "PTHREAD_CANCEL_DEFERRED\n");
461             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
462             pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
463           }
464           else {
465             noitL(eventer_deb, "PTHREAD_CANCEL_DISABLE\n");
466             pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
467           }
468           /* run the job */
469           struct timeval start_time;
470           gettimeofday(&start_time, NULL);
471           noitL(eventer_deb, "jobq[%s] -> dispatch BEGIN\n", jobq->queue_name);
472           EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
473                                  (void *)job->fd_event->callback, NULL,
474                                  job->fd_event->fd, job->fd_event->mask,
475                                  EVENTER_ASYNCH_WORK);
476           job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_WORK,
477                                   job->fd_event->closure, &start_time);
478           EVENTER_CALLBACK_RETURN((void *)job->fd_event,
479                                   (void *)job->fd_event->callback, NULL, -1);
480           noitL(eventer_deb, "jobq[%s] -> dispatch END\n", jobq->queue_name);
481           if(job->fd_event && job->fd_event->mask & EVENTER_CANCEL)
482             pthread_testcancel();
483           /* reset the cancellation policy */
484           pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
485           pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
486         }
487       }
488     }
489
490     job->inflight = 0;
491     noitL(eventer_deb, "%p jobq[%s] -> finished [%p]\n", pthread_self_ptr(),
492           jobq->queue_name, job);
493     /* No we know we won't have siglongjmp called on us */
494
495     gettimeofday(&job->finish_time, NULL);
496     if(job->timeout_event) {
497       if(eventer_remove(job->timeout_event)) {
498         eventer_free(job->timeout_event);
499       }
500     }
501     job->timeout_event = NULL;
502
503     if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
504       /* We need to cleanup... we haven't done it yet. */
505       noitL(eventer_deb, "%p jobq[%s] -> cleanup [%p]\n", pthread_self_ptr(),
506             jobq->queue_name, job);
507       /* threaded issue, need to recheck. */
508       /* coverity[check_after_deref] */
509       if(job->fd_event) {
510         EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
511                                (void *)job->fd_event->callback, NULL,
512                                job->fd_event->fd, job->fd_event->mask,
513                                EVENTER_ASYNCH_CLEANUP);
514         job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
515                                 job->fd_event->closure, &job->finish_time);
516         EVENTER_CALLBACK_RETURN((void *)job->fd_event,
517                                 (void *)job->fd_event->callback, NULL, -1);
518       }
519     }
520     job->finish_hrtime = eventer_gethrtime();
521     eventer_jobq_finished_job(jobq, job);
522     memcpy(&wakeupcopy, job->fd_event, sizeof(wakeupcopy));
523     eventer_jobq_enqueue(eventer_default_backq(job->fd_event), job);
524     eventer_wakeup(&wakeupcopy);
525   }
526   noit_memory_end();
527   noit_memory_maintenance();
528   pthread_cleanup_pop(0);
529   noit_atomic_dec32(&jobq->inflight);
530   noit_atomic_dec32(&jobq->concurrency);
531   pthread_exit(NULL);
532   return NULL;
533 }
534
535 void eventer_jobq_increase_concurrency(eventer_jobq_t *jobq) {
536   noit_atomic_inc32(&jobq->desired_concurrency);
537 }
538 void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq) {
539   eventer_job_t *job;
540   noit_atomic_dec32(&jobq->desired_concurrency);
541   job = calloc(1, sizeof(*job));
542   eventer_jobq_enqueue(jobq, job);
543 }
544 void eventer_jobq_process_each(void (*func)(eventer_jobq_t *, void *),
545                                void *closure) {
546   const char *key;
547   int klen;
548   void *vjobq;
549   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
550
551   pthread_mutex_lock(&all_queues_lock);
552   while(noit_hash_next(&all_queues, &iter, &key, &klen, &vjobq)) {
553     func((eventer_jobq_t *)vjobq, closure);
554   }
555   pthread_mutex_unlock(&all_queues_lock);
556 }
Note: See TracBrowser for help on using the browser.