root/src/utils/noit_log.c

Revision f1d1c50a1a17f53c3b042e1a7cdf8b6b62629ee4, 26.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

make asynch jlog thread start/stop a bit clearer

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #define DEFAULT_JLOG_SUBSCRIBER "stratcon"
34
35 #include "noit_defines.h"
36 #include <stdio.h>
37 #include <fcntl.h>
38 #include <unistd.h>
39 #include <sys/uio.h>
40 #include <sys/time.h>
41 #include <assert.h>
42 #if HAVE_ERRNO_H
43 #include <errno.h>
44 #endif
45 #if HAVE_DIRENT_H
46 #include <dirent.h>
47 #endif
48
49 #define noit_log_impl
50 #include "utils/noit_log.h"
51 #include "utils/noit_hash.h"
52 #include "utils/noit_atomic.h"
53 #include "jlog/jlog.h"
54 #include "jlog/jlog_private.h"
55 #ifdef DTRACE_ENABLED
56 #include "utils/dtrace_probes.h"
57 #else
58 #define NOIT_LOG_LOG(a,b,c,d)
59 #define NOIT_LOG_LOG_ENABLED() 0
60 #endif
61
62 static int DEBUG_LOG_ENABLED() {
63   static int enabled = -1;
64   if(enabled == -1) {
65     char *env = getenv("NOIT_LOG_DEBUG");
66     enabled = env ? atoi(env) : 0;
67   }
68   return enabled;
69 }
70 #define debug_printf(a...) do { \
71   if(DEBUG_LOG_ENABLED()) fprintf(stderr, a); \
72 } while(0)
73
74 struct _noit_log_stream {
75   unsigned enabled:1;
76   unsigned debug:1;
77   unsigned timestamps:1;
78   /* Above is exposed... */
79   char *type;
80   char *name;
81   int mode;
82   char *path;
83   logops_t *ops;
84   void *op_ctx;
85   noit_hash_table *config;
86   struct _noit_log_stream_outlet_list *outlets;
87   pthread_rwlock_t *lock;
88   noit_atomic32_t written;
89   unsigned deps_materialized:1;
90   unsigned debug_below:1;
91   unsigned timestamps_below:1;
92 };
93
94 static noit_hash_table noit_loggers = NOIT_HASH_EMPTY;
95 static noit_hash_table noit_logops = NOIT_HASH_EMPTY;
96 noit_log_stream_t noit_stderr = NULL;
97 noit_log_stream_t noit_error = NULL;
98 noit_log_stream_t noit_debug = NULL;
99
100 int noit_log_global_enabled() {
101   return NOIT_LOG_LOG_ENABLED();
102 }
103
104 #define MATERIALIZE_DEPS(ls) do { \
105   if(!(ls)->deps_materialized) materialize_deps(ls); \
106 } while(0)
107
108 static void materialize_deps(noit_log_stream_t ls) {
109   if(ls->deps_materialized) return;
110   if(ls->debug) ls->debug_below = 1;
111   if(ls->timestamps) ls->timestamps_below = 1;
112   if(ls->debug_below == 0 || ls->timestamps_below == 0) {
113     /* we might have children than need these */
114     struct _noit_log_stream_outlet_list *node;
115     for(node = ls->outlets; node; node = node->next) {
116       MATERIALIZE_DEPS(node->outlet);
117       if(!ls->debug) ls->debug_below = node->outlet->debug_below;
118       if(!ls->timestamps) ls->timestamps_below = node->outlet->timestamps_below;
119     }
120   }
121   ls->deps_materialized = 1;
122 }
123 static int
124 posix_logio_open(noit_log_stream_t ls) {
125   int fd;
126   struct stat sb;
127   ls->mode = 0664;
128   fd = open(ls->path, O_CREAT|O_WRONLY|O_APPEND, ls->mode);
129   debug_printf("opened '%s' => %d\n", ls->path, fd);
130   if(fd < 0) {
131     ls->op_ctx = NULL;
132     return -1;
133   }
134   if(fstat(fd, &sb) == 0) ls->written = (int32_t)sb.st_size;
135   ls->op_ctx = (void *)(vpsized_int)fd;
136   return 0;
137 }
138 static int
139 posix_logio_reopen(noit_log_stream_t ls) {
140   if(ls->path) {
141     pthread_rwlock_t *lock = ls->lock;
142     int newfd, oldfd, rv = -1;
143     if(lock) pthread_rwlock_wrlock(lock);
144     oldfd = (int)(vpsized_int)ls->op_ctx;
145     newfd = open(ls->path, O_CREAT|O_WRONLY|O_APPEND, ls->mode);
146     ls->written = 0;
147     if(newfd >= 0) {
148       struct stat sb;
149       ls->op_ctx = (void *)(vpsized_int)newfd;
150       if(oldfd >= 0) close(oldfd);
151       rv = 0;
152       if(fstat(newfd, &sb) == 0) ls->written = (int32_t)sb.st_size;
153     }
154     if(lock) pthread_rwlock_unlock(lock);
155     return rv;
156   }
157   return -1;
158 }
159 static int
160 posix_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
161   int fd, rv = -1;
162   pthread_rwlock_t *lock = ls->lock;
163   if(lock) pthread_rwlock_rdlock(lock);
164   fd = (int)(vpsized_int)ls->op_ctx;
165   debug_printf("writing to %d\n", fd);
166   if(fd >= 0) rv = write(fd, buf, len);
167   if(lock) pthread_rwlock_unlock(lock);
168   if(rv > 0) noit_atomic_add32(&ls->written, rv);
169   return rv;
170 }
171 static int
172 posix_logio_writev(noit_log_stream_t ls, const struct iovec *iov, int iovcnt) {
173   int fd, rv = -1;
174   pthread_rwlock_t *lock = ls->lock;
175   if(lock) pthread_rwlock_rdlock(lock);
176   fd = (int)(vpsized_int)ls->op_ctx;
177   debug_printf("writ(v)ing to %d\n", fd);
178   if(fd >= 0) rv = writev(fd, iov, iovcnt);
179   if(lock) pthread_rwlock_unlock(lock);
180   if(rv > 0) noit_atomic_add32(&ls->written, rv);
181   return rv;
182 }
183 static int
184 posix_logio_close(noit_log_stream_t ls) {
185   int fd, rv;
186   pthread_rwlock_t *lock = ls->lock;
187   if(lock) pthread_rwlock_wrlock(lock);
188   fd = (int)(vpsized_int)ls->op_ctx;
189   rv = close(fd);
190   if(lock) pthread_rwlock_unlock(lock);
191   return rv;
192 }
193 static size_t
194 posix_logio_size(noit_log_stream_t ls) {
195   int fd;
196   size_t s = (size_t)-1;
197   struct stat sb;
198   pthread_rwlock_t *lock = ls->lock;
199   if(lock) pthread_rwlock_rdlock(lock);
200   fd = (int)(vpsized_int)ls->op_ctx;
201   if(fstat(fd, &sb) == 0) {
202     s = (size_t)sb.st_size;
203   }
204   if(lock) pthread_rwlock_unlock(lock);
205   return s;
206 }
207 static int
208 posix_logio_rename(noit_log_stream_t ls, const char *name) {
209   int rv = 0;
210   char autoname[PATH_MAX];
211   pthread_rwlock_t *lock = ls->lock;
212   if(name == NOIT_LOG_RENAME_AUTOTIME) {
213     time_t now = time(NULL);
214     snprintf(autoname, sizeof(autoname), "%s.%llu",
215              ls->path, (unsigned long long)now);
216     name = autoname;
217   }
218   if(!strcmp(name, ls->path)) return 0; /* noop */
219   if(lock) pthread_rwlock_rdlock(lock);
220   rv = rename(ls->path, name);
221   if(lock) pthread_rwlock_unlock(lock);
222   return -1;
223 }
224 static logops_t posix_logio_ops = {
225   posix_logio_open,
226   posix_logio_reopen,
227   posix_logio_write,
228   posix_logio_writev,
229   posix_logio_close,
230   posix_logio_size,
231   posix_logio_rename
232 };
233
234 typedef struct jlog_line {
235   char *buf;
236   char buf_static[512];
237   char *buf_dynamic;
238   int len;
239   void *next;
240 } jlog_line;
241
242 typedef struct {
243   jlog_ctx *log;
244   pthread_t writer;
245   void *head;
246   int gen;  /* generation */
247 } jlog_asynch_ctx;
248
249 static int
250 jlog_lspath_to_fspath(noit_log_stream_t ls, char *buff, int len,
251                       char **subout) {
252   char *sub;
253   if(subout) *subout = NULL;
254   if(!ls->path) return -1;
255   strlcpy(buff, ls->path, len);
256   sub = strchr(buff, '(');
257   if(sub) {
258     char *esub = strchr(sub, ')');
259     if(esub) {
260       *esub = '\0';
261       *sub = '\0';
262       sub += 1;
263       if(subout) *subout = sub;
264     }
265   }
266   return strlen(buff);
267 }
268
269 /* These next functions arr basically cribbed from jlogctl.c */
270 static int
271 is_datafile(const char *f, u_int32_t *logid) {
272   int i;
273   u_int32_t l = 0;
274   for(i=0; i<8; i++) {
275     if((f[i] >= '0' && f[i] <= '9') ||
276        (f[i] >= 'a' && f[i] <= 'f')) {
277       l <<= 4;
278       l |= (f[i] < 'a') ? (f[i] - '0') : (f[i] - 'a' + 10);
279     }
280     else
281       return 0;
282   }
283   if(f[i] != '\0') return 0;
284   if(logid) *logid = l;
285   return 1;
286 }
287
288 static int
289 jlog_logio_cleanse(noit_log_stream_t ls) {
290   jlog_asynch_ctx *actx;
291   jlog_ctx *log;
292   DIR *d;
293   struct dirent *de, *entry;
294   int cnt = 0;
295   char path[PATH_MAX];
296   int size = 0;
297
298   actx = (jlog_asynch_ctx *)ls->op_ctx;
299   if(!actx) return -1;
300   log = actx->log;
301   if(!log) return -1;
302   if(jlog_lspath_to_fspath(ls, path, sizeof(path), NULL) <= 0) return -1;
303   d = opendir(path);
304
305 #ifdef _PC_NAME_MAX
306   size = pathconf(path, _PC_NAME_MAX);
307 #endif
308   size = MIN(size, PATH_MAX + 128);
309   de = alloca(size);
310
311   if(!d) return -1;
312   while(portable_readdir_r(d, de, &entry) == 0 && entry != NULL) {
313     u_int32_t logid;
314     if(is_datafile(entry->d_name, &logid)) {
315       int rv;
316       struct stat st;
317       char fullfile[PATH_MAX];
318       char fullidx[PATH_MAX];
319
320       snprintf(fullfile, sizeof(fullfile), "%s/%s", path, entry->d_name);
321       snprintf(fullidx, sizeof(fullidx), "%s/%s" INDEX_EXT,
322                path, entry->d_name);
323       while((rv = stat(fullfile, &st)) != 0 && errno == EINTR);
324       if(rv == 0) {
325         int readers;
326         readers = __jlog_pending_readers(log, logid);
327         if(readers == 0) {
328           unlink(fullfile);
329           unlink(fullidx);
330         }
331       }
332     }
333   }
334   closedir(d);
335   return cnt;
336 }
337
338 static jlog_line *
339 jlog_asynch_pop(jlog_asynch_ctx *actx, jlog_line **iter) {
340   jlog_line *h = NULL, *rev = NULL;
341
342   if(*iter) { /* we have more on the previous list */
343     h = *iter;
344     *iter = h->next;
345     return h;
346   }
347
348   while(1) {
349     h = (void *)(volatile void *)actx->head;
350     if(noit_atomic_casptr((volatile void **)&actx->head, NULL, h) == h) break;
351     /* TODO: load-load */
352   }
353   while(h) {
354     /* which unshifted things into the queue -- it's backwards, reverse it */
355     jlog_line *tmp = h;
356     h = h->next;
357     tmp->next = rev;
358     rev = tmp;
359   }
360   if(rev) *iter = rev->next;
361   else *iter = NULL;
362   return rev;
363 }
364 void
365 jlog_asynch_push(jlog_asynch_ctx *actx, jlog_line *n) {
366   while(1) {
367     n->next = (void *)(volatile void *)actx->head;
368     if(noit_atomic_casptr((volatile void **)&actx->head, n, n->next) == n->next) return;
369     /* TODO: load-load */
370   }
371 }
372 static void *
373 jlog_logio_asynch_writer(void *vls) {
374   noit_log_stream_t ls = vls;
375   jlog_asynch_ctx *actx = ls->op_ctx;
376   jlog_line *iter = NULL;
377   int gen = actx->gen;
378   noitL(noit_error, "starting asynchronous jlog writer[%d/%p]\n",
379         (int)getpid(), (void *)pthread_self());
380   while(gen == actx->gen) {
381     pthread_rwlock_t *lock;
382     int fast = 0, max = 1000;
383     jlog_line *line;
384     lock = ls->lock;
385     if(lock) pthread_rwlock_rdlock(lock);
386     while(max > 0 && NULL != (line = jlog_asynch_pop(actx, &iter))) {
387       jlog_ctx_write(actx->log, line->buf_dynamic ?
388                                   line->buf_dynamic :
389                                   line->buf_static,
390                      line->len);
391       if(line->buf_dynamic != NULL) free(line->buf_dynamic);
392       free(line);
393       fast = 1;
394       max--;
395     }
396     if(lock) pthread_rwlock_unlock(lock);
397     if(max > 0) {
398       /* we didn't hit our limit... so we ran the queue dry */
399       /* 200ms if there was nothing, 10ms otherwise */
400       usleep(fast ? 10000 : 200000);
401     }
402   }
403   noitL(noit_error, "stopping asynchronous jlog writer[%d/%p]\n",
404         (int)getpid(), (void *)pthread_self());
405   pthread_exit((void *)0);
406 }
407 static int
408 jlog_logio_reopen(noit_log_stream_t ls) {
409   void *unused;
410   char **subs;
411   jlog_asynch_ctx *actx = ls->op_ctx;
412   pthread_rwlock_t *lock = ls->lock;
413   int i;
414   /* reopening only has the effect of removing temporary subscriptions */
415   /* (they start with ~ in our hair-brained model */
416
417   if(lock) pthread_rwlock_wrlock(lock);
418   if(jlog_ctx_list_subscribers(actx->log, &subs) == -1)
419     goto bail;
420
421   for(i=0;subs[i];i++)
422     if(subs[i][0] == '~')
423       jlog_ctx_remove_subscriber(actx->log, subs[i]);
424
425   jlog_ctx_list_subscribers_dispose(actx->log, subs);
426   jlog_logio_cleanse(ls);
427  bail:
428   if(lock) pthread_rwlock_unlock(lock);
429
430   actx->gen++;
431   pthread_join(actx->writer, &unused);
432   if(pthread_create(&actx->writer, NULL, jlog_logio_asynch_writer, ls) != 0)
433     return -1;
434  
435   return 0;
436 }
437 static int
438 jlog_logio_open(noit_log_stream_t ls) {
439   char path[PATH_MAX], *sub, **subs, *p;
440   jlog_asynch_ctx *actx;
441   jlog_ctx *log = NULL;
442   int i, listed, found;
443
444   if(jlog_lspath_to_fspath(ls, path, sizeof(path), &sub) <= 0) return -1;
445   log = jlog_new(path);
446   if(!log) return -1;
447   /* Open the writer. */
448   if(jlog_ctx_open_writer(log)) {
449     /* If that fails, we'll give one attempt at initiailizing it. */
450     /* But, since we attempted to open it as a writer, it is tainted. */
451     /* path: close, new, init, close, new, writer, add subscriber */
452     jlog_ctx_close(log);
453     log = jlog_new(path);
454     if(jlog_ctx_init(log)) {
455       noitL(noit_error, "Cannot init jlog writer: %s\n",
456             jlog_ctx_err_string(log));
457       jlog_ctx_close(log);
458       return -1;
459     }
460     /* After it is initialized, we can try to reopen it as a writer. */
461     jlog_ctx_close(log);
462     log = jlog_new(path);
463     if(jlog_ctx_open_writer(log)) {
464       noitL(noit_error, "Cannot open jlog writer: %s\n",
465             jlog_ctx_err_string(log));
466       jlog_ctx_close(log);
467       return -1;
468     }
469   }
470
471   /* Add or remove subscribers according to the current configuration. */
472   listed = jlog_ctx_list_subscribers(log, &subs);
473   if(listed == -1) {
474     noitL(noit_error, "Cannot list jlog subscribers: %s\n",
475           jlog_ctx_err_string(log));
476     return -1;
477   }
478
479   if(sub) {
480     /* Match all configured subscribers against jlog's list. */
481     for(p=strtok(sub, ",");p;p=strtok(NULL, ",")) {
482       for(i=0;i<listed;i++) {
483         if((subs[i]) && (strcmp(p, subs[i]) == 0)) {
484           free(subs[i]);
485           subs[i] = NULL;
486           break;
487         }
488       }
489       if(i == listed)
490         jlog_ctx_add_subscriber(log, p, JLOG_BEGIN);
491     }
492
493     /* Remove all unmatched subscribers. */
494     for(i=0;i<listed;i++) {
495       if(subs[i]) {
496         jlog_ctx_remove_subscriber(log, subs[i]);
497         free(subs[i]);
498         subs[i] = NULL;
499       }
500     }
501
502     free(subs);
503     subs = NULL;
504   } else {
505     /* Remove all subscribers other than DEFAULT_JLOG_SUBSCRIBER. */
506     found = 0;
507     for(i=0;i<listed;i++) {
508       if((subs[i]) && (strcmp(DEFAULT_JLOG_SUBSCRIBER, subs[i]) == 0)) {
509         found = 1;
510         continue;
511       }
512       jlog_ctx_remove_subscriber(log, subs[i]);
513     }
514
515     /* Add DEFAULT_JLOG_SUBSCRIBER if it wasn't already on the jlog's list. */
516     if(!found)
517       jlog_ctx_add_subscriber(log, DEFAULT_JLOG_SUBSCRIBER, JLOG_BEGIN);
518
519     jlog_ctx_list_subscribers_dispose(log, subs);
520   }
521
522   actx = calloc(1, sizeof(*actx));
523   actx->log = log;
524   ls->op_ctx = actx;
525
526   if(pthread_create(&actx->writer, NULL, jlog_logio_asynch_writer, ls) != 0)
527     return -1;
528
529   /* We do this to clean things up */
530   jlog_logio_reopen(ls);
531   return 0;
532 }
533 static int
534 jlog_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
535   int rv = -1;
536   jlog_asynch_ctx *actx;
537   jlog_line *line;
538   if(!ls->op_ctx) return -1;
539   actx = ls->op_ctx;
540   line = calloc(1, sizeof(*line));
541   if(len > sizeof(line->buf_static)) {
542     line->buf_dynamic = malloc(len);
543     memcpy(line->buf_dynamic, buf, len);
544   }
545   else {
546     memcpy(line->buf_static, buf, len);
547   }
548   line->len = len;
549   jlog_asynch_push(actx, line);
550   return rv;
551 }
552 static int
553 jlog_logio_close(noit_log_stream_t ls) {
554   if(ls->op_ctx) {
555     jlog_asynch_ctx *actx = ls->op_ctx;
556     jlog_ctx_close(actx->log);
557     ls->op_ctx = NULL;
558   }
559   return 0;
560 }
561 static size_t
562 jlog_logio_size(noit_log_stream_t ls) {
563   size_t size;
564   jlog_asynch_ctx *actx;
565   pthread_rwlock_t *lock = ls->lock;
566   if(!ls->op_ctx) return -1;
567   actx = ls->op_ctx;
568   if(lock) pthread_rwlock_rdlock(lock);
569   size = jlog_raw_size(actx->log);
570   if(lock) pthread_rwlock_unlock(lock);
571   return size;
572 }
573 static int
574 jlog_logio_rename(noit_log_stream_t ls, const char *newname) {
575   /* Not supported (and makes no sense) */
576   return -1;
577 }
578 static logops_t jlog_logio_ops = {
579   jlog_logio_open,
580   jlog_logio_reopen,
581   jlog_logio_write,
582   NULL,
583   jlog_logio_close,
584   jlog_logio_size,
585   jlog_logio_rename
586 };
587
588 void
589 noit_log_init() {
590   noit_hash_init(&noit_loggers);
591   noit_hash_init(&noit_logops);
592   noit_register_logops("file", &posix_logio_ops);
593   noit_register_logops("jlog", &jlog_logio_ops);
594   noit_stderr = noit_log_stream_new_on_fd("stderr", 2, NULL);
595   noit_stderr->timestamps = 1;
596   noit_error = noit_log_stream_new("error", NULL, NULL, NULL, NULL);
597   noit_debug = noit_log_stream_new("debug", NULL, NULL, NULL, NULL);
598 }
599
600 void
601 noit_register_logops(const char *name, logops_t *ops) {
602   noit_hash_store(&noit_logops, strdup(name), strlen(name), ops);
603 }
604
605 void *
606 noit_log_stream_get_ctx(noit_log_stream_t ls) {
607   return ls->op_ctx;
608 }
609
610 void
611 noit_log_stream_set_ctx(noit_log_stream_t ls, void *nctx) {
612   ls->op_ctx = nctx;
613 }
614
615 const char *
616 noit_log_stream_get_type(noit_log_stream_t ls) {
617   return ls->type;
618 }
619
620 const char *
621 noit_log_stream_get_name(noit_log_stream_t ls) {
622   return ls->name;
623 }
624
625 const char *
626 noit_log_stream_get_path(noit_log_stream_t ls) {
627   return ls->path;
628 }
629
630 const char *
631 noit_log_stream_get_property(noit_log_stream_t ls,
632                              const char *prop) {
633   const char *v;
634   if(ls && ls->config &&
635      noit_hash_retr_str(ls->config, prop, strlen(prop), &v))
636     return v;
637   return NULL;
638 }
639
640 void
641 noit_log_stream_set_property(noit_log_stream_t ls,
642                              const char *prop, const char *v) {
643   if(!ls) return;
644   if(!ls->config) {
645     ls->config = calloc(1, sizeof(*ls->config));
646     noit_hash_init(ls->config);
647   }
648   noit_hash_replace(ls->config, prop, strlen(prop), (void *)v, free, free);
649 }
650
651 noit_log_stream_t
652 noit_log_stream_new_on_fd(const char *name, int fd, noit_hash_table *config) {
653   noit_log_stream_t ls;
654   ls = calloc(1, sizeof(*ls));
655   ls->name = strdup(name);
656   ls->ops = &posix_logio_ops;
657   ls->op_ctx = (void *)(vpsized_int)fd;
658   ls->enabled = 1;
659   ls->config = config;
660   ls->lock = calloc(1, sizeof(*ls->lock));
661   pthread_rwlock_init(ls->lock, NULL);
662   /* This double strdup of ls->name is needed, look for the next one
663    * for an explanation.
664    */
665   if(noit_hash_store(&noit_loggers,
666                      strdup(ls->name), strlen(ls->name), ls) == 0) {
667     free(ls->name);
668     free(ls);
669     return NULL;
670   }
671   return ls;
672 }
673
674 noit_log_stream_t
675 noit_log_stream_new_on_file(const char *path, noit_hash_table *config) {
676   return noit_log_stream_new(path, "file", path, NULL, config);
677 }
678
679 noit_log_stream_t
680 noit_log_stream_new(const char *name, const char *type, const char *path,
681                     void *ctx, noit_hash_table *config) {
682   noit_log_stream_t ls, saved;
683   struct _noit_log_stream tmpbuf;
684   void *vops = NULL;
685
686   ls = calloc(1, sizeof(*ls));
687   ls->name = strdup(name);
688   ls->path = path ? strdup(path) : NULL;
689   ls->type = type ? strdup(type) : NULL;
690   ls->enabled = 1;
691   ls->config = config;
692   if(!type)
693     ls->ops = NULL;
694   else if(noit_hash_retrieve(&noit_logops, type, strlen(type),
695                              &vops))
696     ls->ops = vops;
697   else
698     goto freebail;
699  
700   if(ls->ops && ls->ops->openop(ls)) goto freebail;
701
702   saved = noit_log_stream_find(name);
703   if(saved) {
704     pthread_rwlock_t *lock = saved->lock;
705     memcpy(&tmpbuf, saved, sizeof(*saved));
706     memcpy(saved, ls, sizeof(*saved));
707     memcpy(ls, &tmpbuf, sizeof(*saved));
708     saved->lock = lock;
709
710     ls->lock = NULL;
711     noit_log_stream_free(ls);
712     ls = saved;
713   }
714   else {
715     /* We strdup the name *again*.  We'going to kansas city shuffle the
716      * ls later (see memcpy above).  However, if don't strdup, then the
717      * noit_log_stream_free up there will sweep our key right our from
718      * under us.
719      */
720     if(noit_hash_store(&noit_loggers,
721                        strdup(ls->name), strlen(ls->name), ls) == 0)
722       goto freebail;
723     ls->lock = calloc(1, sizeof(*ls->lock));
724     pthread_rwlock_init(ls->lock, NULL);
725   }
726   /* This is for things that don't open on paths */
727   if(ctx) ls->op_ctx = ctx;
728   return ls;
729
730  freebail:
731   fprintf(stderr, "Failed to instantiate logger(%s,%s,%s)\n",
732           name, type ? type : "[null]", path ? path : "[null]");
733   free(ls->name);
734   if(ls->path) free(ls->path);
735   if(ls->type) free(ls->type);
736   free(ls);
737   return NULL;
738 }
739
740 noit_log_stream_t
741 noit_log_stream_find(const char *name) {
742   void *vls;
743   if(noit_hash_retrieve(&noit_loggers, name, strlen(name), &vls)) {
744     return (noit_log_stream_t)vls;
745   }
746   return NULL;
747 }
748
749 void
750 noit_log_stream_remove(const char *name) {
751   noit_hash_delete(&noit_loggers, name, strlen(name), NULL, NULL);
752 }
753
754 void
755 noit_log_stream_add_stream(noit_log_stream_t ls, noit_log_stream_t outlet) {
756   struct _noit_log_stream_outlet_list *newnode;
757   newnode = calloc(1, sizeof(*newnode));
758   newnode->outlet = outlet;
759   newnode->next = ls->outlets;
760   ls->outlets = newnode;
761 }
762
763 noit_log_stream_t
764 noit_log_stream_remove_stream(noit_log_stream_t ls, const char *name) {
765   noit_log_stream_t outlet;
766   struct _noit_log_stream_outlet_list *node, *tmp;
767   if(!ls->outlets) return NULL;
768   if(!strcmp(ls->outlets->outlet->name, name)) {
769     node = ls->outlets;
770     ls->outlets = node->next;
771     outlet = node->outlet;
772     free(node);
773     return outlet;
774   }
775   for(node = ls->outlets; node->next; node = node->next) {
776     if(!strcmp(node->next->outlet->name, name)) {
777       /* splice */
778       tmp = node->next;
779       node->next = tmp->next;
780       /* pluck */
781       outlet = tmp->outlet;
782       /* shed */
783       free(tmp);
784       /* return */
785       return outlet;
786     }
787   }
788   return NULL;
789 }
790
791 void noit_log_stream_reopen(noit_log_stream_t ls) {
792   struct _noit_log_stream_outlet_list *node;
793   if(ls->ops) ls->ops->reopenop(ls);
794   for(node = ls->outlets; node; node = node->next) {
795     noit_log_stream_reopen(node->outlet);
796   }
797 }
798
799 int noit_log_stream_rename(noit_log_stream_t ls, const char *newname) {
800   return (ls->ops && ls->ops->renameop) ? ls->ops->renameop(ls, newname) : -1;
801 }
802
803 void
804 noit_log_stream_close(noit_log_stream_t ls) {
805   if(ls->ops) ls->ops->closeop(ls);
806 }
807
808 size_t
809 noit_log_stream_size(noit_log_stream_t ls) {
810   if(ls->ops && ls->ops->sizeop) return ls->ops->sizeop(ls);
811   return -1;
812 }
813
814 size_t
815 noit_log_stream_written(noit_log_stream_t ls) {
816   return ls->written;
817 }
818
819 void
820 noit_log_stream_free(noit_log_stream_t ls) {
821   if(ls) {
822     struct _noit_log_stream_outlet_list *node;
823     if(ls->name) free(ls->name);
824     if(ls->path) free(ls->path);
825     while(ls->outlets) {
826       node = ls->outlets->next;
827       free(ls->outlets);
828       ls->outlets = node;
829     }
830     if(ls->config) {
831       noit_hash_destroy(ls->config, free, free);
832       free(ls->config);
833     }
834     if(ls->lock) {
835       pthread_rwlock_destroy(ls->lock);
836       free(ls->lock);
837     }
838     free(ls);
839   }
840 }
841
842 static int
843 noit_log_writev(noit_log_stream_t ls, const struct iovec *iov, int iovcnt) {
844   /* This emulates writev into a buffer for ops that don't support it */
845   char stackbuff[4096], *tofree = NULL, *buff = NULL;
846   int i, s = 0, ins = 0;
847
848   if(!ls->ops) return -1;
849   if(ls->ops->writevop) return ls->ops->writevop(ls, iov, iovcnt);
850   if(!ls->ops->writeop) return -1;
851   if(iovcnt == 1) return ls->ops->writeop(ls, iov[0].iov_base, iov[0].iov_len);
852
853   for(i=0;i<iovcnt;i++) s+=iov[i].iov_len;
854   if(s > sizeof(stackbuff)) {
855     tofree = buff = malloc(s);
856     if(tofree == NULL) return -1;
857   }
858   else buff = stackbuff;
859   for(i=0;i<iovcnt;i++) {
860     memcpy(buff + ins, iov[i].iov_base, iov[i].iov_len);
861     ins += iov[i].iov_len;
862   }
863   i = ls->ops->writeop(ls, buff, s);
864   if(tofree) free(tofree);
865   return i;
866 }
867
868 static int
869 noit_log_line(noit_log_stream_t ls,
870               const char *timebuf, int timebuflen,
871               const char *debugbuf, int debugbuflen,
872               const char *buffer, size_t len) {
873   int rv = 0;
874   struct _noit_log_stream_outlet_list *node;
875   if(ls->ops) {
876     int iovcnt = 0;
877     struct iovec iov[3];
878     if(ls->timestamps) {
879       iov[iovcnt].iov_base = (void *)timebuf;
880       iov[iovcnt].iov_len = timebuflen;
881       iovcnt++;
882     }
883     if(ls->debug) {
884       iov[iovcnt].iov_base = (void *)debugbuf;
885       iov[iovcnt].iov_len = debugbuflen;
886       iovcnt++;
887     }
888     iov[iovcnt].iov_base = (void *)buffer;
889     iov[iovcnt].iov_len = len;
890     iovcnt++;
891     rv = noit_log_writev(ls, iov, iovcnt);
892   }
893   for(node = ls->outlets; node; node = node->next) {
894     int srv = 0;
895     debug_printf(" %s -> %s\n", ls->name, node->outlet->name);
896     srv = noit_log_line(node->outlet, timebuf, timebuflen, debugbuf, debugbuflen, buffer, len);
897     if(srv) rv = srv;
898   }
899   return rv;
900 }
901 int
902 noit_vlog(noit_log_stream_t ls, struct timeval *now,
903           const char *file, int line,
904           const char *format, va_list arg) {
905   int rv = 0, allocd = 0;
906   char buffer[4096], *dynbuff = NULL;
907 #ifdef va_copy
908   va_list copy;
909 #endif
910
911   if(ls->enabled || NOIT_LOG_LOG_ENABLED()) {
912     int len;
913     char tbuf[48], dbuf[80];
914     int tbuflen = 0, dbuflen = 0;
915     MATERIALIZE_DEPS(ls);
916     if(ls->timestamps_below) {
917       struct tm _tm, *tm;
918       char tempbuf[32];
919       time_t s = (time_t)now->tv_sec;
920       tm = localtime_r(&s, &_tm);
921       strftime(tempbuf, sizeof(tempbuf), "%Y-%m-%d %H:%M:%S", tm);
922       snprintf(tbuf, sizeof(tbuf), "[%s.%06d] ", tempbuf, (int)now->tv_usec);
923       tbuflen = strlen(tbuf);
924     }
925     else tbuf[0] = '\0';
926     if(ls->debug_below) {
927       snprintf(dbuf, sizeof(dbuf), "[%s:%d] ", file, line);
928       dbuflen = strlen(dbuf);
929     }
930     else dbuf[0] = '\0';
931 #ifdef va_copy
932     va_copy(copy, arg);
933     len = vsnprintf(buffer, sizeof(buffer), format, copy);
934     va_end(copy);
935 #else
936     len = vsnprintf(buffer, sizeof(buffer), format, arg);
937 #endif
938     if(len > sizeof(buffer)) {
939       allocd = sizeof(buffer);
940       while(len > allocd) { /* guaranteed true the first time */
941         while(len > allocd) allocd <<= 2;
942         if(dynbuff) free(dynbuff);
943         dynbuff = malloc(allocd);
944         assert(dynbuff);
945 #ifdef va_copy
946         va_copy(copy, arg);
947         len = vsnprintf(dynbuff, allocd, format, copy);
948         va_end(copy);
949 #else
950         len = vsnprintf(dynbuff, allocd, format, arg);
951 #endif
952       }
953       NOIT_LOG_LOG(ls->name, (char *)file, line, dynbuff);
954       if(ls->enabled)
955         rv = noit_log_line(ls, tbuf, tbuflen, dbuf, dbuflen, dynbuff, len);
956       free(dynbuff);
957     }
958     else {
959       NOIT_LOG_LOG(ls->name, (char *)file, line, buffer);
960       if(ls->enabled)
961         rv = noit_log_line(ls, tbuf, tbuflen, dbuf, dbuflen, buffer, len);
962     }
963     if(rv == len) return 0;
964     return -1;
965   }
966   return 0;
967 }
968
969 int
970 noit_log(noit_log_stream_t ls, struct timeval *now,
971          const char *file, int line, const char *format, ...) {
972   int rv;
973   va_list arg;
974   va_start(arg, format);
975   rv = noit_vlog(ls, now, file, line, format, arg);
976   va_end(arg);
977   return rv;
978 }
979
980 int
981 noit_log_reopen_all() {
982   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
983   const char *k;
984   int klen, rv = 0;
985   void *data;
986   noit_log_stream_t ls;
987
988   while(noit_hash_next(&noit_loggers, &iter, &k, &klen, &data)) {
989     ls = data;
990     if(ls->ops) if(ls->ops->reopenop(ls) < 0) rv = -1;
991   }
992   return rv;
993 }
994
Note: See TracBrowser for help on using the browser.