Changeset 84d6f13ffc15b3f1bb50df2ac835b56f70179b3e

Show
Ignore:
Timestamp:
02/03/09 01:29:49 (6 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1233624589 +0000
git-parent:

[87de1cfcf05cd971deb05d13c0d1be988ac9d801]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1233624589 +0000
Message:

closes #78

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/eventer/eventer.h

    r04aaf95 r84d6f13  
    113113API_EXPORT(eventer_t) eventer_remove_recurrent(eventer_t e); 
    114114API_EXPORT(void) eventer_add_recurrent(eventer_t e); 
     115API_EXPORT(int) eventer_get_epoch(struct timeval *epoch); 
    115116 
    116117#endif 
  • src/eventer/eventer_impl.c

    r4c963d7 r84d6f13  
    99#include <pthread.h> 
    1010#include <assert.h> 
     11 
     12static struct timeval *eventer_impl_epoch = NULL; 
    1113 
    1214#ifdef HAVE_KQUEUE 
     
    6264} 
    6365 
     66int eventer_get_epoch(struct timeval *epoch) { 
     67  if(!eventer_impl_epoch) return -1; 
     68  memcpy(epoch, eventer_impl_epoch, sizeof(*epoch)); 
     69  return 0; 
     70} 
     71 
    6472int eventer_impl_init() { 
    6573  int i; 
    6674  eventer_t e; 
     75 
     76  eventer_impl_epoch = malloc(sizeof(struct timeval)); 
     77  gettimeofday(eventer_impl_epoch, NULL); 
    6778 
    6879  eventer_err = noit_log_stream_find("error/eventer"); 
     
    7283 
    7384  eventer_ssl_init(); 
    74   eventer_jobq_init(&__global_backq); 
     85  eventer_jobq_init(&__global_backq, "default_back_queue"); 
    7586  e = eventer_alloc(); 
    7687  e->mask = EVENTER_RECURRENT; 
     
    8192  eventer_add_recurrent(e); 
    8293 
    83   eventer_jobq_init(&__default_jobq); 
     94  eventer_jobq_init(&__default_jobq, "default_queue"); 
    8495  __default_jobq.backq = &__global_backq; 
    8596  for(i=0; i<__default_queue_threads; i++) 
  • src/eventer/eventer_jobq.c

    r28f06c3 r84d6f13  
    2020static pthread_key_t threads_jobq; 
    2121static sigset_t alarm_mask; 
     22static noit_hash_table all_queues = NOIT_HASH_EMPTY; 
     23pthread_mutex_t all_queues_lock; 
    2224 
    2325static void 
     
    3840 
    3941int 
    40 eventer_jobq_init(eventer_jobq_t *jobq) { 
     42eventer_jobq_init(eventer_jobq_t *jobq, const char *queue_name) { 
    4143  pthread_mutexattr_t mutexattr; 
    4244 
     
    6163      return -1; 
    6264    } 
     65    if(pthread_mutex_init(&all_queues_lock, NULL)) { 
     66      noitL(noit_error, "Cannot initialize all_queues mutex: %s\n", 
     67            strerror(errno)); 
     68      return -1; 
     69    } 
    6370  } 
    6471 
    6572  memset(jobq, 0, sizeof(*jobq)); 
     73  jobq->queue_name = strdup(queue_name); 
    6674  if(pthread_mutexattr_init(&mutexattr) != 0) { 
    6775    noitL(noit_error, "Cannot initialize lock attributes\n"); 
     
    8795    return -1; 
    8896  } 
     97  pthread_mutex_lock(&all_queues_lock); 
     98  if(noit_hash_store(&all_queues, jobq->queue_name, strlen(jobq->queue_name), 
     99                     jobq) == 0) { 
     100    noitL(noit_error, "Duplicate queue name!\n"); 
     101    pthread_mutex_unlock(&all_queues_lock); 
     102    return -1; 
     103  } 
     104  pthread_mutex_unlock(&all_queues_lock); 
    89105  return 0; 
    90106} 
     
    268284  eventer_jobq_enqueue(jobq, job); 
    269285} 
    270  
     286void eventer_jobq_process_each(void (*func)(eventer_jobq_t *, void *), 
     287                               void *closure) { 
     288  const char *key; 
     289  int klen; 
     290  eventer_jobq_t *jobq; 
     291  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     292 
     293  pthread_mutex_lock(&all_queues_lock); 
     294  while(noit_hash_next(&all_queues, &iter, &key, &klen, (void **)&jobq)) { 
     295    func(jobq, closure); 
     296  } 
     297  pthread_mutex_unlock(&all_queues_lock); 
     298
  • src/eventer/eventer_jobq.h

    r3b3b432 r84d6f13  
    3535 
    3636typedef struct _eventer_jobq_t { 
     37  const char             *queue_name; 
    3738  pthread_mutex_t         lock; 
    3839  sem_t                   semaphore; 
     
    4546} eventer_jobq_t; 
    4647 
    47 int eventer_jobq_init(eventer_jobq_t *jobq); 
     48int eventer_jobq_init(eventer_jobq_t *jobq, const char *queue_name); 
    4849void eventer_jobq_enqueue(eventer_jobq_t *jobq, eventer_job_t *job); 
    4950eventer_job_t *eventer_jobq_dequeue(eventer_jobq_t *jobq); 
     
    5758void eventer_jobq_decrease_concurrency(eventer_jobq_t *jobq); 
    5859void *eventer_jobq_consumer(eventer_jobq_t *jobq); 
     60void eventer_jobq_process_each(void (*func)(eventer_jobq_t *, void *), void *); 
    5961 
    6062#endif 
  • src/modules/Makefile.in

    rd5f3933 r84d6f13  
    2525 
    2626MODULES=ping_icmp.@MODULEEXT@ http.@MODULEEXT@ postgres.@MODULEEXT@ \ 
    27         lua.@MODULEEXT@ dns.@MODULEEXT@
     27        lua.@MODULEEXT@ dns.@MODULEEXT@ selfcheck.@MODULEEXT@
    2828        @BUILD_MODULES@ 
    2929 
     
    3232.xml.xmlh: 
    3333        @$(XML2H) `echo $< | sed -e 's/\.xml$$//;'`_xml_description < $< > $@ 
     34 
     35selfcheck.lo:   selfcheck.xmlh 
    3436 
    3537lua.@MODULEEXT@:        lua.lo lua_noit.lo 
  • src/noit_check.c

    r5d557e7 r84d6f13  
    344344  noit_poller_reload(NULL); 
    345345} 
     346 
     347int 
     348noit_poller_check_count() { 
     349  return polls_by_name.size; 
     350}; 
     351 
     352int 
     353noit_poller_transient_check_count() { 
     354  return watchlist.size; 
     355}; 
    346356 
    347357noit_check_t * 
  • src/noit_check.h

    r7180374 r84d6f13  
    122122 
    123123API_EXPORT(void) noit_poller_init(); 
     124API_EXPORT(int) noit_poller_check_count(); 
     125API_EXPORT(int) noit_poller_transient_check_count(); 
    124126API_EXPORT(void) noit_poller_reload(const char *xpath); /* NULL for all */ 
    125127API_EXPORT(void) noit_poller_process_checks(const char *xpath); 
  • src/noit_console.c

    rdf2e1eb r84d6f13  
    451451  noit_console_logio_write, 
    452452  noit_console_logio_close, 
     453  NULL 
    453454}; 
    454455 
  • src/noit_defines.h

    r5a9b91b r84d6f13  
    7474#define HAVE_IPv6 
    7575 
     76static inline int noit_build_version(char *buff, int len) { 
     77  const char *v = NOIT_HEADURL; 
     78  const char *start, *end, *ns; 
     79  if(NULL != (start = strstr(v, "reconnoiter/"))) { 
     80    start += strlen("reconnoiter/"); 
     81    if(NULL != (end = strstr(start, "/src"))) { 
     82      ns = strchr(start, '/'); /* necessarily non-NULL */ 
     83      ns++; 
     84      if(!strncmp(start, "trunk/", 6)) 
     85        return snprintf(buff, len, "trunk.%s", NOIT_SVNVERSION); 
     86      if(!strncmp(start, "branches/", 9)) 
     87        return snprintf(buff, len, "b_%.*s.%s", end - ns, ns, NOIT_SVNVERSION); 
     88      if(!strncmp(start, "tags/", 5)) 
     89        return snprintf(buff, len, "%.*s.%s", end - ns, ns, NOIT_SVNVERSION); 
     90    } 
     91  } 
     92  return snprintf(buff, len, "unknown.%s", NOIT_SVNVERSION); 
     93} 
     94 
    7695#endif 
  • src/noit_livestream_listener.c

    re7c26b4 r84d6f13  
    105105  noit_livestream_logio_write, 
    106106  noit_livestream_logio_close, 
     107  NULL 
    107108}; 
    108109 
  • src/utils/noit_log.c

    rc8af4b3 r84d6f13  
    6161  return close(fd); 
    6262} 
     63static size_t 
     64posix_logio_size(noit_log_stream_t ls) { 
     65  int fd; 
     66  struct stat sb; 
     67  fd = (int)ls->op_ctx; 
     68  if(fstat(fd, &sb) == 0) { 
     69    return (size_t)sb.st_size; 
     70  } 
     71  return -1; 
     72} 
    6373static logops_t posix_logio_ops = { 
    6474  posix_logio_open, 
     
    6676  posix_logio_write, 
    6777  posix_logio_close, 
     78  posix_logio_size 
    6879}; 
    6980 
     
    135146  return 0; 
    136147} 
     148static size_t 
     149jlog_logio_size(noit_log_stream_t ls) { 
     150  if(!ls->op_ctx) return -1; 
     151  return jlog_raw_size((jlog_ctx *)ls->op_ctx); 
     152} 
    137153static logops_t jlog_logio_ops = { 
    138154  jlog_logio_open, 
     
    140156  jlog_logio_write, 
    141157  jlog_logio_close, 
     158  jlog_logio_size 
    142159}; 
    143160 
     
    297314noit_log_stream_close(noit_log_stream_t ls) { 
    298315  if(ls->ops) ls->ops->closeop(ls); 
     316} 
     317 
     318size_t 
     319noit_log_stream_size(noit_log_stream_t ls) { 
     320  if(ls->ops && ls->ops->sizeop) return ls->ops->sizeop(ls); 
     321  return -1; 
    299322} 
    300323 
  • src/utils/noit_log.h

    rc8af4b3 r84d6f13  
    2222  int (*writeop)(struct _noit_log_stream *, const void *, size_t); 
    2323  int (*closeop)(struct _noit_log_stream *); 
     24  size_t (*sizeop)(struct _noit_log_stream *); 
    2425} logops_t; 
    2526 
     
    5859API_EXPORT(void) noit_log_stream_reopen(noit_log_stream_t ls); 
    5960API_EXPORT(void) noit_log_stream_close(noit_log_stream_t ls); 
     61API_EXPORT(size_t) noit_log_stream_size(noit_log_stream_t ls); 
    6062API_EXPORT(void) noit_log_stream_free(noit_log_stream_t ls); 
    6163API_EXPORT(int) noit_vlog(noit_log_stream_t ls, struct timeval *,