root/src/eventer/eventer_jobq.c

Revision 3a48d08926ff8a3ea95003402260d96f360cc064, 20.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 1 week ago)

update eventer callback probe.
Cover all callsites and pass eventer_t as first arg.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "utils/noit_log.h"
35 #include "utils/noit_atomic.h"
36 #include "eventer/eventer.h"
37 #include "dtrace_probes.h"
38 #include <errno.h>
39 #include <setjmp.h>
40 #include <assert.h>
41 #include <signal.h>
42
43 #ifndef JOBQ_SIGNAL
44 #define JOBQ_SIGNAL SIGALRM
45 #endif
46
47 #define pthread_self_ptr() ((void *)(vpsized_int)pthread_self())
48
49 static noit_atomic32_t threads_jobq_inited = 0;
50 static pthread_key_t threads_jobq;
51 static sigset_t alarm_mask;
52 static noit_hash_table all_queues = NOIT_HASH_EMPTY;
53 pthread_mutex_t all_queues_lock;
54
55 static void
56 eventer_jobq_finished_job(eventer_jobq_t *jobq, eventer_job_t *job) {
57   eventer_hrtime_t wait_time = job->start_hrtime - job->create_hrtime;
58   eventer_hrtime_t run_time = job->finish_hrtime - job->start_hrtime;
59   noit_atomic_dec32(&jobq->inflight);
60   if(job->timeout_triggered) noit_atomic_inc64(&jobq->timeouts);
61   while(1) {
62     eventer_hrtime_t newv = jobq->avg_wait_ns * 0.8 + wait_time * 0.2;
63     if(noit_atomic_cas64(&jobq->avg_wait_ns, newv, jobq->avg_wait_ns) == jobq->avg_wait_ns)
64       break;
65   }
66   while(1) {
67     eventer_hrtime_t newv = jobq->avg_run_ns * 0.8 + run_time * 0.2;
68     if(noit_atomic_cas64(&jobq->avg_run_ns, newv, jobq->avg_run_ns) == jobq->avg_run_ns)
69       break;
70   }
71 }
72
73 static void
74 eventer_jobq_handler(int signo)
75 {
76   eventer_jobq_t *jobq;
77   eventer_job_t *job;
78   sigjmp_buf *env;
79
80   jobq = pthread_getspecific(threads_jobq);
81   assert(jobq);
82   env = pthread_getspecific(jobq->threadenv);
83   job = pthread_getspecific(jobq->activejob);
84   if(env && job && job->fd_event->mask & EVENTER_EVIL_BRUTAL)
85     if(noit_atomic_cas32(&job->inflight, 0, 1) == 1)
86        siglongjmp(*env, 1);
87 }
88
89 int
90 eventer_jobq_init(eventer_jobq_t *jobq, const char *queue_name) {
91   pthread_mutexattr_t mutexattr;
92
93   if(noit_atomic_cas32(&threads_jobq_inited, 1, 0) == 0) {
94     struct sigaction act;
95
96     sigemptyset(&alarm_mask);
97     sigaddset(&alarm_mask, JOBQ_SIGNAL);
98     act.sa_handler = eventer_jobq_handler;
99     act.sa_flags = 0;
100     sigemptyset(&act.sa_mask);
101
102     if(sigaction(JOBQ_SIGNAL, &act, NULL) < 0) {
103       noitL(noit_error, "Cannot initialize signal handler: %s\n",
104             strerror(errno));
105       return -1;
106     }
107
108     if(pthread_key_create(&threads_jobq, NULL)) {
109       noitL(noit_error, "Cannot initialize thread-specific jobq: %s\n",
110             strerror(errno));
111       return -1;
112     }
113     if(pthread_mutex_init(&all_queues_lock, NULL)) {
114       noitL(noit_error, "Cannot initialize all_queues mutex: %s\n",
115             strerror(errno));
116       return -1;
117     }
118   }
119
120   memset(jobq, 0, sizeof(*jobq));
121   jobq->queue_name = strdup(queue_name);
122   if(pthread_mutexattr_init(&mutexattr) != 0) {
123     noitL(noit_error, "Cannot initialize lock attributes\n");
124     return -1;
125   }
126   if(pthread_mutex_init(&jobq->lock, &mutexattr) != 0) {
127     noitL(noit_error, "Cannot initialize lock\n");
128     return -1;
129   }
130   if(sem_init(&jobq->semaphore, 0, 0) != 0) {
131     noitL(noit_error, "Cannot initialize semaphore: %s\n",
132           strerror(errno));
133     return -1;
134   }
135   if(pthread_key_create(&jobq->activejob, NULL)) {
136     noitL(noit_error, "Cannot initialize thread-specific activejob: %s\n",
137           strerror(errno));
138     return -1;
139   }
140   if(pthread_key_create(&jobq->threadenv, NULL)) {
141     noitL(noit_error, "Cannot initialize thread-specific sigsetjmp env: %s\n",
142           strerror(errno));
143     return -1;
144   }
145   jobq->backq = eventer_default_backq();
146   pthread_mutex_lock(&all_queues_lock);
147   if(noit_hash_store(&all_queues, jobq->queue_name, strlen(jobq->queue_name),
148                      jobq) == 0) {
149     noitL(noit_error, "Duplicate queue name!\n");
150     pthread_mutex_unlock(&all_queues_lock);
151     return -1;
152   }
153   pthread_mutex_unlock(&all_queues_lock);
154   return 0;
155 }
156
157 eventer_jobq_t *
158 eventer_jobq_retrieve(const char *name) {
159   void *vjq = NULL;
160   pthread_mutex_lock(&all_queues_lock);
161   (void)noit_hash_retrieve(&all_queues, name, strlen(name), &vjq);
162   pthread_mutex_unlock(&all_queues_lock);
163   return vjq;
164 }
165
166 static void *
167 eventer_jobq_consumer_pthreadentry(void *vp) {
168   return eventer_jobq_consumer((eventer_jobq_t *)vp);
169 }
170 static void
171 eventer_jobq_maybe_spawn(eventer_jobq_t *jobq) {
172   int32_t current = jobq->concurrency;
173   /* if we've no desired concurrency, this doesn't apply to us */
174   if(jobq->desired_concurrency == 0) return;
175   /* See if we need to launch one */
176   if(jobq->desired_concurrency > current) {
177     /* we need another thread, maybe... this is a race as we do the
178      * increment in the new thread, but we check there and back it out
179      * if we did something we weren't supposed to. */
180     pthread_t tid;
181     pthread_attr_t tattr;
182     noitL(eventer_deb, "Starting queue[%s] thread now at %d\n",
183           jobq->queue_name, jobq->concurrency);
184     pthread_attr_init(&tattr);
185     pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
186     pthread_create(&tid, &tattr, eventer_jobq_consumer_pthreadentry, jobq);
187   }
188   noitL(eventer_deb, "jobq_queue[%s] pending cancels [%d/%d]\n",
189         jobq->queue_name, jobq->pending_cancels,
190         jobq->desired_concurrency);
191   if(jobq->pending_cancels == jobq->desired_concurrency) {
192     /* we're absolutely screwed at this point... it's time to just die */
193     noitL(noit_error, "jobq_queue[%s] induced [%d/%d] game over.\n",
194           jobq->queue_name, jobq->pending_cancels,
195           jobq->desired_concurrency);
196     assert(jobq->pending_cancels != jobq->desired_concurrency);
197   }
198 }
199 void
200 eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job) {
201   job->next = NULL;
202   eventer_jobq_maybe_spawn(jobq);
203   pthread_mutex_lock(&jobq->lock);
204   if(jobq->tailq) {
205     /* If there is a tail (queue has items), just push it on the end. */
206     jobq->tailq->next = job;
207     jobq->tailq = job;
208   }
209   else {
210     /* Otherwise, this is the first and only item on the list. */
211     jobq->headq = jobq->tailq = job;
212   }
213   pthread_mutex_unlock(&jobq->lock);
214   noit_atomic_inc64(&jobq->total_jobs);
215   noit_atomic_inc32(&jobq->backlog);
216
217   /* Signal consumers */
218   sem_post(&jobq->semaphore);
219 }
220
221 static eventer_job_t *
222 __eventer_jobq_dequeue(eventer_jobq_t *jobq, int should_wait) {
223   eventer_job_t *job = NULL;
224
225   /* Wait for a job */
226   if(should_wait) while(sem_wait(&jobq->semaphore) && errno == EINTR);
227   /* Or Try-wait for a job */
228   else if(sem_trywait(&jobq->semaphore)) return NULL;
229
230   pthread_mutex_lock(&jobq->lock);
231   if(jobq->headq) {
232     /* If there are items, pop and advance the header pointer */
233     job = jobq->headq;
234     jobq->headq = jobq->headq->next;
235     if(!jobq->headq) jobq->tailq = NULL;
236   }
237   pthread_mutex_unlock(&jobq->lock);
238
239   if(job) {
240     job->next = NULL; /* To reduce any confusion */
241     noit_atomic_dec32(&jobq->backlog);
242     noit_atomic_inc32(&jobq->inflight);
243   }
244   /* Our semaphores are counting semaphores, not locks. */
245   /* coverity[missing_unlock] */
246   return job;
247 }
248
249 eventer_job_t *
250 eventer_jobq_dequeue(eventer_jobq_t *jobq) {
251   return __eventer_jobq_dequeue(jobq, 1);
252 }
253
254 eventer_job_t *
255 eventer_jobq_dequeue_nowait(eventer_jobq_t *jobq) {
256   return __eventer_jobq_dequeue(jobq, 0);
257 }
258
259 void
260 eventer_jobq_destroy(eventer_jobq_t *jobq) {
261   pthread_mutex_destroy(&jobq->lock);
262   sem_destroy(&jobq->semaphore);
263 }
264 int
265 eventer_jobq_execute_timeout(eventer_t e, int mask, void *closure,
266                              struct timeval *now) {
267   eventer_job_t *job = closure;
268   job->timeout_triggered = 1;
269   job->timeout_event = NULL;
270   noitL(eventer_deb, "%p jobq -> timeout job [%p]\n", pthread_self_ptr(), job);
271   if(job->inflight) {
272     eventer_job_t *jobcopy;
273     if(job->fd_event->mask & (EVENTER_CANCEL)) {
274       eventer_t my_precious = job->fd_event;
275       /* we set this to null so we can't complete on it */
276       job->fd_event = NULL;
277       noitL(eventer_deb, "[inline] timeout cancelling job\n");
278       noit_atomic_inc32(&job->jobq->pending_cancels);
279       pthread_cancel(job->executor);
280       /* complete on it ourselves */
281       if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
282         /* We need to cleanup... we haven't done it yet. */
283         noitL(eventer_deb, "[inline] %p jobq[%s] -> cleanup [%p]\n",
284               pthread_self_ptr(), job->jobq->queue_name, job);
285         /* This is the real question... asynch cleanup is supposed to
286          * be called asynch -- we're going to call it synchronously
287          * I think this is a bad idea, but not cleaning up seems worse.
288          * Because we're synchronous, if we hang, we'll be watchdogged.
289          *
290          * Uncooperative plugins/third-party libs can truly suck
291          * one's soul out.
292          */
293         if(my_precious) {
294           gettimeofday(&job->finish_time, NULL); /* We're done */
295           EVENTER_CALLBACK_ENTRY((void *)my_precious, (void *)my_precious->callback, NULL,
296                                  my_precious->fd, my_precious->mask,
297                                  EVENTER_ASYNCH_CLEANUP);
298           my_precious->callback(my_precious, EVENTER_ASYNCH_CLEANUP,
299                                 my_precious->closure, &job->finish_time);
300           EVENTER_CALLBACK_RETURN((void *)my_precious, (void *)my_precious->callback, NULL, -1);
301         }
302       }
303       jobcopy = malloc(sizeof(*jobcopy));
304       memcpy(jobcopy, job, sizeof(*jobcopy));
305       free(job);
306       jobcopy->fd_event = my_precious;
307       job->finish_hrtime = eventer_gethrtime();
308       eventer_jobq_maybe_spawn(jobcopy->jobq);
309       eventer_jobq_finished_job(jobcopy->jobq, jobcopy);
310       eventer_jobq_enqueue(jobcopy->jobq->backq, jobcopy);
311       eventer_wakeup();
312     }
313     else
314       pthread_kill(job->executor, JOBQ_SIGNAL);
315   }
316   return 0;
317 }
318 int
319 eventer_jobq_consume_available(eventer_t e, int mask, void *closure,
320                                struct timeval *now) {
321   eventer_jobq_t *jobq = closure;
322   eventer_job_t *job;
323   /* This can only be called with a backq jobq
324    * (a standalone queue with no backq itself)
325    */
326   assert(jobq && !jobq->backq);
327   while((job = eventer_jobq_dequeue_nowait(jobq)) != NULL) {
328     int newmask;
329     EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
330                            (void *)job->fd_event->callback, NULL,
331                            job->fd_event->fd, job->fd_event->mask,
332                            job->fd_event->mask);
333     newmask = job->fd_event->callback(job->fd_event, job->fd_event->mask,
334                                       job->fd_event->closure, now);
335     EVENTER_CALLBACK_RETURN((void *)job->fd_event,
336                             (void *)job->fd_event->callback, NULL, newmask);
337     if(!newmask) eventer_free(job->fd_event);
338     else {
339       job->fd_event->mask = newmask;
340       eventer_add(job->fd_event);
341     }
342     job->fd_event = NULL;
343     assert(job->timeout_event == NULL);
344     noit_atomic_dec32(&jobq->inflight);
345     free(job);
346   }
347   return EVENTER_RECURRENT;
348 }
349 static void
350 eventer_jobq_cancel_cleanup(void *vp) {
351   eventer_jobq_t *jobq = vp;
352   noit_atomic_dec32(&jobq->pending_cancels);
353   noit_atomic_dec32(&jobq->concurrency);
354 }
355 void *
356 eventer_jobq_consumer(eventer_jobq_t *jobq) {
357   eventer_job_t *job;
358   int32_t current_count;
359   sigjmp_buf env;
360
361   assert(jobq->backq);
362   current_count = noit_atomic_inc32(&jobq->concurrency);
363   noitL(eventer_deb, "jobq[%s] -> %d\n", jobq->queue_name, current_count);
364   if(current_count > jobq->desired_concurrency) {
365     noitL(eventer_deb, "jobq[%s] over provisioned, backing out.",
366           jobq->queue_name);
367     noit_atomic_dec32(&jobq->concurrency);
368     pthread_exit(NULL);
369     return NULL;
370   }
371   /* Each thread can consume from only one queue */
372   pthread_setspecific(threads_jobq, jobq);
373   pthread_setspecific(jobq->threadenv, &env);
374   pthread_cleanup_push(eventer_jobq_cancel_cleanup, jobq);
375
376   while(1) {
377     pthread_setspecific(jobq->activejob, NULL);
378     job = eventer_jobq_dequeue(jobq);
379     if(!job) continue;
380     if(!job->fd_event) {
381       free(job);
382       break;
383     }
384     pthread_setspecific(jobq->activejob, job);
385     noitL(eventer_deb, "%p jobq[%s] -> running job [%p]\n", pthread_self_ptr(),
386           jobq->queue_name, job);
387
388     /* Mark our commencement */
389     job->start_hrtime = eventer_gethrtime();
390
391     /* Safely check and handle if we've timed out while in queue */
392     pthread_mutex_lock(&job->lock);
393     if(job->timeout_triggered) {
394       struct timeval diff, diff2;
395       eventer_hrtime_t udiff2;
396       /* This happens if the timeout occurred before we even had the change
397        * to pull the job off the queue.  We must be in bad shape here.
398        */
399       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p]\n",
400             pthread_self_ptr(), jobq->queue_name, job);
401       gettimeofday(&job->finish_time, NULL); /* We're done */
402       job->finish_hrtime = eventer_gethrtime();
403       sub_timeval(job->finish_time, job->fd_event->whence, &diff);
404       udiff2 = (job->finish_hrtime - job->create_hrtime)/1000;
405       diff2.tv_sec = udiff2/1000000;
406       diff2.tv_usec = udiff2%1000000;
407       noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p] -%0.6f (%0.6f)\n",
408             pthread_self_ptr(), jobq->queue_name, job,
409             (float)diff.tv_sec + (float)diff.tv_usec/1000000.0,
410             (float)diff2.tv_sec + (float)diff2.tv_usec/1000000.0);
411       pthread_mutex_unlock(&job->lock);
412       EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
413                              (void *)job->fd_event->callback, NULL,
414                              job->fd_event->fd, job->fd_event->mask,
415                              EVENTER_ASYNCH_CLEANUP);
416       job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
417                               job->fd_event->closure, &job->finish_time);
418       EVENTER_CALLBACK_RETURN((void *)job->fd_event,
419                               (void *)job->fd_event->callback, NULL, -1);
420       eventer_jobq_finished_job(jobq, job);
421       eventer_jobq_enqueue(jobq->backq, job);
422       eventer_wakeup();
423       continue;
424     }
425     pthread_mutex_unlock(&job->lock);
426
427     /* Run the job, if we timeout, will be killed with a JOBQ_SIGNAL from
428      * the master thread.  We handle the alarm by longjmp'd out back here.
429      */
430     job->executor = pthread_self();
431     if(0 == (job->fd_event->mask & EVENTER_EVIL_BRUTAL) ||
432        sigsetjmp(env, 1) == 0) {
433       /* We could get hit right here... (timeout and terminated from
434        * another thread.  inflight isn't yet set (next line), so it
435        * won't longjmp.  But timeout_triggered will be set... so we
436        * should recheck that after we mark ourselves inflight.
437        */
438       if(noit_atomic_cas32(&job->inflight, 1, 0) == 0) {
439         if(!job->timeout_triggered) {
440           noitL(eventer_deb, "%p jobq[%s] -> executing [%p]\n",
441                 pthread_self_ptr(), jobq->queue_name, job);
442           /* Choose the right cancellation policy (or none) */
443           if(job->fd_event->mask & EVENTER_CANCEL_ASYNCH) {
444             noitL(eventer_deb, "PTHREAD_CANCEL_ASYNCHRONOUS\n");
445             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
446             pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
447           }
448           else if(job->fd_event->mask & EVENTER_CANCEL_DEFERRED) {
449             noitL(eventer_deb, "PTHREAD_CANCEL_DEFERRED\n");
450             pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
451             pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
452           }
453           else {
454             noitL(eventer_deb, "PTHREAD_CANCEL_DISABLE\n");
455             pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
456           }
457           /* run the job */
458           struct timeval start_time;
459           gettimeofday(&start_time, NULL);
460           noitL(eventer_deb, "jobq[%s] -> dispatch BEGIN\n", jobq->queue_name);
461           EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
462                                  (void *)job->fd_event->callback, NULL,
463                                  job->fd_event->fd, job->fd_event->mask,
464                                  EVENTER_ASYNCH_WORK);
465           job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_WORK,
466                                   job->fd_event->closure, &start_time);
467           EVENTER_CALLBACK_RETURN((void *)job->fd_event,
468                                   (void *)job->fd_event->callback, NULL, -1);
469           noitL(eventer_deb, "jobq[%s] -> dispatch END\n", jobq->queue_name);
470           if(job->fd_event && job->fd_event->mask & EVENTER_CANCEL)
471             pthread_testcancel();
472           /* reset the cancellation policy */
473           pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
474           pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
475         }
476       }
477     }
478
479     job->inflight = 0;
480     noitL(eventer_deb, "%p jobq[%s] -> finished [%p]\n", pthread_self_ptr(),
481           jobq->queue_name, job);
482     /* No we know we won't have siglongjmp called on us */
483
484     gettimeofday(&job->finish_time, NULL);
485     if(job->timeout_event &&
486        eventer_remove(job->timeout_event)) {
487       eventer_free(job->timeout_event);
488     }
489     job->timeout_event = NULL;
490
491     if(noit_atomic_cas32(&job->has_cleanedup, 1, 0) == 0) {
492       /* We need to cleanup... we haven't done it yet. */
493       noitL(eventer_deb, "%p jobq[%s] -> cleanup [%p]\n", pthread_self_ptr(),
494             jobq->queue_name, job);
495       /* threaded issue, need to recheck. */
496       /* coverity[check_after_deref] */
497       if(job->fd_event) {
498         EVENTER_CALLBACK_ENTRY((void *)job->fd_event,
499                                (void *)job->fd_event->callback, NULL,
500                                job->fd_event->fd, job->fd_event->mask,
501                                EVENTER_ASYNCH_CLEANUP);
502         job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP,
503                                 job->fd_event->closure, &job->finish_time);
504         EVENTER_CALLBACK_RETURN((void *)job->fd_event,
505                                 (void *)job->fd_event->callback, NULL, -1);
506       }
507     }
508     job->finish_hrtime = eventer_gethrtime();
509     eventer_jobq_finished_job(jobq, job);
510     eventer_jobq_enqueue(jobq->backq, job);
511     eventer_wakeup();
512   }
513   pthread_cleanup_pop(0);
514   noit_atomic_dec32(&jobq->inflight);
515   noit_atomic_dec32(&jobq->concurrency);
516   pthread_exit(NULL);
517   return NULL;
518 }
519
520 void eventer_jobq_increase_concurrency(eventer_jobq_t *jobq) {
521   noit_atomic_inc32(&jobq->desired_concurrency);
522 }
523 void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq) {
524   eventer_job_t *job;
525   noit_atomic_dec32(&jobq->desired_concurrency);
526   job = calloc(1, sizeof(*job));
527   eventer_jobq_enqueue(jobq, job);
528 }
529 void eventer_jobq_process_each(void (*func)(eventer_jobq_t *, void *),
530                                void *closure) {
531   const char *key;
532   int klen;
533   void *vjobq;
534   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
535
536   pthread_mutex_lock(&all_queues_lock);
537   while(noit_hash_next(&all_queues, &iter, &key, &klen, &vjobq)) {
538     func((eventer_jobq_t *)vjobq, closure);
539   }
540   pthread_mutex_unlock(&all_queues_lock);
541 }
Note: See TracBrowser for help on using the browser.