root/src/utils/noit_log.c

Revision b8d1fb45af0974b7c419ec104d6936b3215118dc, 27.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

use pshared locks as there is a rare race if the parent is logging in-flight during the fork.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *    * Redistributions of source code must retain the above copyright
10  *      notice, this list of conditions and the following disclaimer.
11  *    * Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials provided
14  *      with the distribution.
15  *    * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *      of its contributors may be used to endorse or promote products
17  *      derived from this software without specific prior written
18  *      permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #define DEFAULT_JLOG_SUBSCRIBER "stratcon"
34
35 #include "noit_defines.h"
36 #include <stdio.h>
37 #include <fcntl.h>
38 #include <unistd.h>
39 #include <sys/uio.h>
40 #include <sys/time.h>
41 #include <assert.h>
42 #if HAVE_ERRNO_H
43 #include <errno.h>
44 #endif
45 #if HAVE_DIRENT_H
46 #include <dirent.h>
47 #endif
48
49 #define noit_log_impl
50 #include "utils/noit_log.h"
51 #include "utils/noit_hash.h"
52 #include "utils/noit_atomic.h"
53 #include "jlog/jlog.h"
54 #include "jlog/jlog_private.h"
55 #ifdef DTRACE_ENABLED
56 #include "utils/dtrace_probes.h"
57 #else
58 #define NOIT_LOG_LOG(a,b,c,d)
59 #define NOIT_LOG_LOG_ENABLED() 0
60 #endif
61
62 static int DEBUG_LOG_ENABLED() {
63   static int enabled = -1;
64   if(enabled == -1) {
65     char *env = getenv("NOIT_LOG_DEBUG");
66     enabled = env ? atoi(env) : 0;
67   }
68   return enabled;
69 }
70 #define debug_printf(a...) do { \
71   if(DEBUG_LOG_ENABLED()) fprintf(stderr, a); \
72 } while(0)
73
74 struct _noit_log_stream {
75   unsigned enabled:1;
76   unsigned debug:1;
77   unsigned timestamps:1;
78   /* Above is exposed... */
79   char *type;
80   char *name;
81   int mode;
82   char *path;
83   logops_t *ops;
84   void *op_ctx;
85   noit_hash_table *config;
86   struct _noit_log_stream_outlet_list *outlets;
87   pthread_rwlock_t *lock;
88   noit_atomic32_t written;
89   unsigned deps_materialized:1;
90   unsigned debug_below:1;
91   unsigned timestamps_below:1;
92 };
93
94 static noit_hash_table noit_loggers = NOIT_HASH_EMPTY;
95 static noit_hash_table noit_logops = NOIT_HASH_EMPTY;
96 noit_log_stream_t noit_stderr = NULL;
97 noit_log_stream_t noit_error = NULL;
98 noit_log_stream_t noit_debug = NULL;
99
100 int noit_log_global_enabled() {
101   return NOIT_LOG_LOG_ENABLED();
102 }
103
104 #define MATERIALIZE_DEPS(ls) do { \
105   if(!(ls)->deps_materialized) materialize_deps(ls); \
106 } while(0)
107
108 static void materialize_deps(noit_log_stream_t ls) {
109   if(ls->deps_materialized) return;
110   if(ls->debug) ls->debug_below = 1;
111   if(ls->timestamps) ls->timestamps_below = 1;
112   if(ls->debug_below == 0 || ls->timestamps_below == 0) {
113     /* we might have children than need these */
114     struct _noit_log_stream_outlet_list *node;
115     for(node = ls->outlets; node; node = node->next) {
116       MATERIALIZE_DEPS(node->outlet);
117       if(!ls->debug) ls->debug_below = node->outlet->debug_below;
118       if(!ls->timestamps) ls->timestamps_below = node->outlet->timestamps_below;
119     }
120   }
121   ls->deps_materialized = 1;
122 }
123 static int
124 posix_logio_open(noit_log_stream_t ls) {
125   int fd;
126   struct stat sb;
127   ls->mode = 0664;
128   fd = open(ls->path, O_CREAT|O_WRONLY|O_APPEND, ls->mode);
129   debug_printf("opened '%s' => %d\n", ls->path, fd);
130   if(fd < 0) {
131     ls->op_ctx = NULL;
132     return -1;
133   }
134   if(fstat(fd, &sb) == 0) ls->written = (int32_t)sb.st_size;
135   ls->op_ctx = (void *)(vpsized_int)fd;
136   return 0;
137 }
138 static int
139 posix_logio_reopen(noit_log_stream_t ls) {
140   if(ls->path) {
141     pthread_rwlock_t *lock = ls->lock;
142     int newfd, oldfd, rv = -1;
143     if(lock) pthread_rwlock_wrlock(lock);
144     oldfd = (int)(vpsized_int)ls->op_ctx;
145     newfd = open(ls->path, O_CREAT|O_WRONLY|O_APPEND, ls->mode);
146     ls->written = 0;
147     if(newfd >= 0) {
148       struct stat sb;
149       ls->op_ctx = (void *)(vpsized_int)newfd;
150       if(oldfd >= 0) close(oldfd);
151       rv = 0;
152       if(fstat(newfd, &sb) == 0) ls->written = (int32_t)sb.st_size;
153     }
154     if(lock) pthread_rwlock_unlock(lock);
155     return rv;
156   }
157   return -1;
158 }
159 static int
160 posix_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
161   int fd, rv = -1;
162   pthread_rwlock_t *lock = ls->lock;
163   if(lock) pthread_rwlock_rdlock(lock);
164   fd = (int)(vpsized_int)ls->op_ctx;
165   debug_printf("writing to %d\n", fd);
166   if(fd >= 0) rv = write(fd, buf, len);
167   if(lock) pthread_rwlock_unlock(lock);
168   if(rv > 0) noit_atomic_add32(&ls->written, rv);
169   return rv;
170 }
171 static int
172 posix_logio_writev(noit_log_stream_t ls, const struct iovec *iov, int iovcnt) {
173   int fd, rv = -1;
174   pthread_rwlock_t *lock = ls->lock;
175   if(lock) pthread_rwlock_rdlock(lock);
176   fd = (int)(vpsized_int)ls->op_ctx;
177   debug_printf("writ(v)ing to %d\n", fd);
178   if(fd >= 0) rv = writev(fd, iov, iovcnt);
179   if(lock) pthread_rwlock_unlock(lock);
180   if(rv > 0) noit_atomic_add32(&ls->written, rv);
181   return rv;
182 }
183 static int
184 posix_logio_close(noit_log_stream_t ls) {
185   int fd, rv;
186   pthread_rwlock_t *lock = ls->lock;
187   if(lock) pthread_rwlock_wrlock(lock);
188   fd = (int)(vpsized_int)ls->op_ctx;
189   rv = close(fd);
190   if(lock) pthread_rwlock_unlock(lock);
191   return rv;
192 }
193 static size_t
194 posix_logio_size(noit_log_stream_t ls) {
195   int fd;
196   size_t s = (size_t)-1;
197   struct stat sb;
198   pthread_rwlock_t *lock = ls->lock;
199   if(lock) pthread_rwlock_rdlock(lock);
200   fd = (int)(vpsized_int)ls->op_ctx;
201   if(fstat(fd, &sb) == 0) {
202     s = (size_t)sb.st_size;
203   }
204   if(lock) pthread_rwlock_unlock(lock);
205   return s;
206 }
207 static int
208 posix_logio_rename(noit_log_stream_t ls, const char *name) {
209   int rv = 0;
210   char autoname[PATH_MAX];
211   pthread_rwlock_t *lock = ls->lock;
212   if(name == NOIT_LOG_RENAME_AUTOTIME) {
213     time_t now = time(NULL);
214     snprintf(autoname, sizeof(autoname), "%s.%llu",
215              ls->path, (unsigned long long)now);
216     name = autoname;
217   }
218   if(!strcmp(name, ls->path)) return 0; /* noop */
219   if(lock) pthread_rwlock_rdlock(lock);
220   rv = rename(ls->path, name);
221   if(lock) pthread_rwlock_unlock(lock);
222   return -1;
223 }
224 static logops_t posix_logio_ops = {
225   posix_logio_open,
226   posix_logio_reopen,
227   posix_logio_write,
228   posix_logio_writev,
229   posix_logio_close,
230   posix_logio_size,
231   posix_logio_rename
232 };
233
234 typedef struct jlog_line {
235   char *buf;
236   char buf_static[512];
237   char *buf_dynamic;
238   int len;
239   void *next;
240 } jlog_line;
241
242 typedef struct {
243   jlog_ctx *log;
244   pthread_t writer;
245   void *head;
246   noit_atomic32_t gen;  /* generation */
247 } jlog_asynch_ctx;
248
249 static int
250 jlog_lspath_to_fspath(noit_log_stream_t ls, char *buff, int len,
251                       char **subout) {
252   char *sub;
253   if(subout) *subout = NULL;
254   if(!ls->path) return -1;
255   strlcpy(buff, ls->path, len);
256   sub = strchr(buff, '(');
257   if(sub) {
258     char *esub = strchr(sub, ')');
259     if(esub) {
260       *esub = '\0';
261       *sub = '\0';
262       sub += 1;
263       if(subout) *subout = sub;
264     }
265   }
266   return strlen(buff);
267 }
268
269 /* These next functions arr basically cribbed from jlogctl.c */
270 static int
271 is_datafile(const char *f, u_int32_t *logid) {
272   int i;
273   u_int32_t l = 0;
274   for(i=0; i<8; i++) {
275     if((f[i] >= '0' && f[i] <= '9') ||
276        (f[i] >= 'a' && f[i] <= 'f')) {
277       l <<= 4;
278       l |= (f[i] < 'a') ? (f[i] - '0') : (f[i] - 'a' + 10);
279     }
280     else
281       return 0;
282   }
283   if(f[i] != '\0') return 0;
284   if(logid) *logid = l;
285   return 1;
286 }
287
288 static int
289 jlog_logio_cleanse(noit_log_stream_t ls) {
290   jlog_asynch_ctx *actx;
291   jlog_ctx *log;
292   DIR *d;
293   struct dirent *de, *entry;
294   int cnt = 0;
295   char path[PATH_MAX];
296   int size = 0;
297
298   actx = (jlog_asynch_ctx *)ls->op_ctx;
299   if(!actx) return -1;
300   log = actx->log;
301   if(!log) return -1;
302   if(jlog_lspath_to_fspath(ls, path, sizeof(path), NULL) <= 0) return -1;
303   d = opendir(path);
304
305 #ifdef _PC_NAME_MAX
306   size = pathconf(path, _PC_NAME_MAX);
307 #endif
308   size = MIN(size, PATH_MAX + 128);
309   de = alloca(size);
310
311   if(!d) return -1;
312   while(portable_readdir_r(d, de, &entry) == 0 && entry != NULL) {
313     u_int32_t logid;
314     if(is_datafile(entry->d_name, &logid)) {
315       int rv;
316       struct stat st;
317       char fullfile[PATH_MAX];
318       char fullidx[PATH_MAX];
319
320       snprintf(fullfile, sizeof(fullfile), "%s/%s", path, entry->d_name);
321       snprintf(fullidx, sizeof(fullidx), "%s/%s" INDEX_EXT,
322                path, entry->d_name);
323       while((rv = stat(fullfile, &st)) != 0 && errno == EINTR);
324       if(rv == 0) {
325         int readers;
326         readers = __jlog_pending_readers(log, logid);
327         if(readers == 0) {
328           unlink(fullfile);
329           unlink(fullidx);
330         }
331       }
332     }
333   }
334   closedir(d);
335   return cnt;
336 }
337
338 static jlog_line *
339 jlog_asynch_pop(jlog_asynch_ctx *actx, jlog_line **iter) {
340   jlog_line *h = NULL, *rev = NULL;
341
342   if(*iter) { /* we have more on the previous list */
343     h = *iter;
344     *iter = h->next;
345     return h;
346   }
347
348   while(1) {
349     h = (void *)(volatile void *)actx->head;
350     if(noit_atomic_casptr((volatile void **)&actx->head, NULL, h) == h) break;
351     /* TODO: load-load */
352   }
353   while(h) {
354     /* which unshifted things into the queue -- it's backwards, reverse it */
355     jlog_line *tmp = h;
356     h = h->next;
357     tmp->next = rev;
358     rev = tmp;
359   }
360   if(rev) *iter = rev->next;
361   else *iter = NULL;
362   return rev;
363 }
364 void
365 jlog_asynch_push(jlog_asynch_ctx *actx, jlog_line *n) {
366   while(1) {
367     n->next = (void *)(volatile void *)actx->head;
368     if(noit_atomic_casptr((volatile void **)&actx->head, n, n->next) == n->next) return;
369     /* TODO: load-load */
370   }
371 }
372 static void *
373 jlog_logio_asynch_writer(void *vls) {
374   noit_log_stream_t ls = vls;
375   jlog_asynch_ctx *actx = ls->op_ctx;
376   jlog_line *iter = NULL;
377   int gen;
378   gen = noit_atomic_inc32(&actx->gen);
379   noitL(noit_error, "starting asynchronous jlog writer[%d/%p]\n",
380         (int)getpid(), (void *)pthread_self());
381   while(gen == actx->gen) {
382     pthread_rwlock_t *lock;
383     int fast = 0, max = 1000;
384     jlog_line *line;
385     lock = ls->lock;
386     if(lock) pthread_rwlock_rdlock(lock);
387     while(max > 0 && NULL != (line = jlog_asynch_pop(actx, &iter))) {
388       jlog_ctx_write(actx->log, line->buf_dynamic ?
389                                   line->buf_dynamic :
390                                   line->buf_static,
391                      line->len);
392       if(line->buf_dynamic != NULL) free(line->buf_dynamic);
393       free(line);
394       fast = 1;
395       max--;
396     }
397     if(lock) pthread_rwlock_unlock(lock);
398     if(max > 0) {
399       /* we didn't hit our limit... so we ran the queue dry */
400       /* 200ms if there was nothing, 10ms otherwise */
401       usleep(fast ? 10000 : 200000);
402     }
403   }
404   noitL(noit_error, "stopping asynchronous jlog writer[%d/%p]\n",
405         (int)getpid(), (void *)pthread_self());
406   pthread_exit((void *)0);
407 }
408 static int
409 jlog_logio_reopen(noit_log_stream_t ls) {
410   char **subs;
411   jlog_asynch_ctx *actx = ls->op_ctx;
412   pthread_rwlock_t *lock = ls->lock;
413   pthread_attr_t tattr;
414   int i;
415   /* reopening only has the effect of removing temporary subscriptions */
416   /* (they start with ~ in our hair-brained model */
417
418   if(lock) pthread_rwlock_wrlock(lock);
419   if(jlog_ctx_list_subscribers(actx->log, &subs) == -1)
420     goto bail;
421
422   for(i=0;subs[i];i++)
423     if(subs[i][0] == '~')
424       jlog_ctx_remove_subscriber(actx->log, subs[i]);
425
426   jlog_ctx_list_subscribers_dispose(actx->log, subs);
427   jlog_logio_cleanse(ls);
428  bail:
429   if(lock) pthread_rwlock_unlock(lock);
430
431   pthread_attr_init(&tattr);
432   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
433   if(pthread_create(&actx->writer, NULL, jlog_logio_asynch_writer, ls) != 0)
434     return -1;
435  
436   return 0;
437 }
438 static void
439 noit_log_jlog_err(void *ctx, const char *format, ...) {
440   int rv;
441   struct timeval now;
442   va_list arg;
443   va_start(arg, format);
444   gettimeofday(&now, NULL);
445   rv = noit_vlog(noit_error, &now, "jlog.c", 0, format, arg);
446   va_end(arg);
447 }
448 static int
449 jlog_logio_open(noit_log_stream_t ls) {
450   char path[PATH_MAX], *sub, **subs, *p;
451   jlog_asynch_ctx *actx;
452   jlog_ctx *log = NULL;
453   pthread_attr_t tattr;
454   int i, listed, found;
455
456   if(jlog_lspath_to_fspath(ls, path, sizeof(path), &sub) <= 0) return -1;
457   log = jlog_new(path);
458   if(!log) return -1;
459   jlog_set_error_func(log, noit_log_jlog_err, ls);
460   /* Open the writer. */
461   if(jlog_ctx_open_writer(log)) {
462     /* If that fails, we'll give one attempt at initiailizing it. */
463     /* But, since we attempted to open it as a writer, it is tainted. */
464     /* path: close, new, init, close, new, writer, add subscriber */
465     jlog_ctx_close(log);
466     log = jlog_new(path);
467     jlog_set_error_func(log, noit_log_jlog_err, ls);
468     if(jlog_ctx_init(log)) {
469       noitL(noit_error, "Cannot init jlog writer: %s\n",
470             jlog_ctx_err_string(log));
471       jlog_ctx_close(log);
472       return -1;
473     }
474     /* After it is initialized, we can try to reopen it as a writer. */
475     jlog_ctx_close(log);
476     log = jlog_new(path);
477     jlog_set_error_func(log, noit_log_jlog_err, ls);
478     if(jlog_ctx_open_writer(log)) {
479       noitL(noit_error, "Cannot open jlog writer: %s\n",
480             jlog_ctx_err_string(log));
481       jlog_ctx_close(log);
482       return -1;
483     }
484   }
485
486   /* Add or remove subscribers according to the current configuration. */
487   listed = jlog_ctx_list_subscribers(log, &subs);
488   if(listed == -1) {
489     noitL(noit_error, "Cannot list jlog subscribers: %s\n",
490           jlog_ctx_err_string(log));
491     return -1;
492   }
493
494   if(sub) {
495     /* Match all configured subscribers against jlog's list. */
496     for(p=strtok(sub, ",");p;p=strtok(NULL, ",")) {
497       for(i=0;i<listed;i++) {
498         if((subs[i]) && (strcmp(p, subs[i]) == 0)) {
499           free(subs[i]);
500           subs[i] = NULL;
501           break;
502         }
503       }
504       if(i == listed)
505         jlog_ctx_add_subscriber(log, p, JLOG_BEGIN);
506     }
507
508     /* Remove all unmatched subscribers. */
509     for(i=0;i<listed;i++) {
510       if(subs[i]) {
511         jlog_ctx_remove_subscriber(log, subs[i]);
512         free(subs[i]);
513         subs[i] = NULL;
514       }
515     }
516
517     free(subs);
518     subs = NULL;
519   } else {
520     /* Remove all subscribers other than DEFAULT_JLOG_SUBSCRIBER. */
521     found = 0;
522     for(i=0;i<listed;i++) {
523       if((subs[i]) && (strcmp(DEFAULT_JLOG_SUBSCRIBER, subs[i]) == 0)) {
524         found = 1;
525         continue;
526       }
527       jlog_ctx_remove_subscriber(log, subs[i]);
528     }
529
530     /* Add DEFAULT_JLOG_SUBSCRIBER if it wasn't already on the jlog's list. */
531     if(!found)
532       jlog_ctx_add_subscriber(log, DEFAULT_JLOG_SUBSCRIBER, JLOG_BEGIN);
533
534     jlog_ctx_list_subscribers_dispose(log, subs);
535   }
536
537   actx = calloc(1, sizeof(*actx));
538   actx->log = log;
539   ls->op_ctx = actx;
540
541   pthread_attr_init(&tattr);
542   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
543   if(pthread_create(&actx->writer, NULL, jlog_logio_asynch_writer, ls) != 0)
544     return -1;
545
546   /* We do this to clean things up */
547   jlog_logio_reopen(ls);
548   return 0;
549 }
550 static int
551 jlog_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
552   int rv = -1;
553   jlog_asynch_ctx *actx;
554   jlog_line *line;
555   if(!ls->op_ctx) return -1;
556   actx = ls->op_ctx;
557   line = calloc(1, sizeof(*line));
558   if(len > sizeof(line->buf_static)) {
559     line->buf_dynamic = malloc(len);
560     memcpy(line->buf_dynamic, buf, len);
561   }
562   else {
563     memcpy(line->buf_static, buf, len);
564   }
565   line->len = len;
566   jlog_asynch_push(actx, line);
567   return rv;
568 }
569 static int
570 jlog_logio_close(noit_log_stream_t ls) {
571   if(ls->op_ctx) {
572     jlog_asynch_ctx *actx = ls->op_ctx;
573     jlog_ctx_close(actx->log);
574     ls->op_ctx = NULL;
575   }
576   return 0;
577 }
578 static size_t
579 jlog_logio_size(noit_log_stream_t ls) {
580   size_t size;
581   jlog_asynch_ctx *actx;
582   pthread_rwlock_t *lock = ls->lock;
583   if(!ls->op_ctx) return -1;
584   actx = ls->op_ctx;
585   if(lock) pthread_rwlock_rdlock(lock);
586   size = jlog_raw_size(actx->log);
587   if(lock) pthread_rwlock_unlock(lock);
588   return size;
589 }
590 static int
591 jlog_logio_rename(noit_log_stream_t ls, const char *newname) {
592   /* Not supported (and makes no sense) */
593   return -1;
594 }
595 static logops_t jlog_logio_ops = {
596   jlog_logio_open,
597   jlog_logio_reopen,
598   jlog_logio_write,
599   NULL,
600   jlog_logio_close,
601   jlog_logio_size,
602   jlog_logio_rename
603 };
604
605 void
606 noit_log_init() {
607   noit_hash_init(&noit_loggers);
608   noit_hash_init(&noit_logops);
609   noit_register_logops("file", &posix_logio_ops);
610   noit_register_logops("jlog", &jlog_logio_ops);
611   noit_stderr = noit_log_stream_new_on_fd("stderr", 2, NULL);
612   noit_stderr->timestamps = 1;
613   noit_error = noit_log_stream_new("error", NULL, NULL, NULL, NULL);
614   noit_debug = noit_log_stream_new("debug", NULL, NULL, NULL, NULL);
615 }
616
617 void
618 noit_register_logops(const char *name, logops_t *ops) {
619   noit_hash_store(&noit_logops, strdup(name), strlen(name), ops);
620 }
621
622 void *
623 noit_log_stream_get_ctx(noit_log_stream_t ls) {
624   return ls->op_ctx;
625 }
626
627 void
628 noit_log_stream_set_ctx(noit_log_stream_t ls, void *nctx) {
629   ls->op_ctx = nctx;
630 }
631
632 const char *
633 noit_log_stream_get_type(noit_log_stream_t ls) {
634   return ls->type;
635 }
636
637 const char *
638 noit_log_stream_get_name(noit_log_stream_t ls) {
639   return ls->name;
640 }
641
642 const char *
643 noit_log_stream_get_path(noit_log_stream_t ls) {
644   return ls->path;
645 }
646
647 const char *
648 noit_log_stream_get_property(noit_log_stream_t ls,
649                              const char *prop) {
650   const char *v;
651   if(ls && ls->config &&
652      noit_hash_retr_str(ls->config, prop, strlen(prop), &v))
653     return v;
654   return NULL;
655 }
656
657 void
658 noit_log_stream_set_property(noit_log_stream_t ls,
659                              const char *prop, const char *v) {
660   if(!ls) return;
661   if(!ls->config) {
662     ls->config = calloc(1, sizeof(*ls->config));
663     noit_hash_init(ls->config);
664   }
665   noit_hash_replace(ls->config, prop, strlen(prop), (void *)v, free, free);
666 }
667
668 static void
669 noit_log_init_rwlock(noit_log_stream_t ls) {
670   pthread_rwlockattr_t attr;
671   pthread_rwlockattr_init(&attr);
672   pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
673   pthread_rwlock_init(ls->lock, &attr);
674 }
675
676 noit_log_stream_t
677 noit_log_stream_new_on_fd(const char *name, int fd, noit_hash_table *config) {
678   noit_log_stream_t ls;
679   ls = calloc(1, sizeof(*ls));
680   ls->name = strdup(name);
681   ls->ops = &posix_logio_ops;
682   ls->op_ctx = (void *)(vpsized_int)fd;
683   ls->enabled = 1;
684   ls->config = config;
685   ls->lock = calloc(1, sizeof(*ls->lock));
686   noit_log_init_rwlock(ls);
687   /* This double strdup of ls->name is needed, look for the next one
688    * for an explanation.
689    */
690   if(noit_hash_store(&noit_loggers,
691                      strdup(ls->name), strlen(ls->name), ls) == 0) {
692     free(ls->name);
693     free(ls);
694     return NULL;
695   }
696   return ls;
697 }
698
699 noit_log_stream_t
700 noit_log_stream_new_on_file(const char *path, noit_hash_table *config) {
701   return noit_log_stream_new(path, "file", path, NULL, config);
702 }
703
704 noit_log_stream_t
705 noit_log_stream_new(const char *name, const char *type, const char *path,
706                     void *ctx, noit_hash_table *config) {
707   noit_log_stream_t ls, saved;
708   struct _noit_log_stream tmpbuf;
709   void *vops = NULL;
710
711   ls = calloc(1, sizeof(*ls));
712   ls->name = strdup(name);
713   ls->path = path ? strdup(path) : NULL;
714   ls->type = type ? strdup(type) : NULL;
715   ls->enabled = 1;
716   ls->config = config;
717   if(!type)
718     ls->ops = NULL;
719   else if(noit_hash_retrieve(&noit_logops, type, strlen(type),
720                              &vops))
721     ls->ops = vops;
722   else
723     goto freebail;
724  
725   if(ls->ops && ls->ops->openop(ls)) goto freebail;
726
727   saved = noit_log_stream_find(name);
728   if(saved) {
729     pthread_rwlock_t *lock = saved->lock;
730     memcpy(&tmpbuf, saved, sizeof(*saved));
731     memcpy(saved, ls, sizeof(*saved));
732     memcpy(ls, &tmpbuf, sizeof(*saved));
733     saved->lock = lock;
734
735     ls->lock = NULL;
736     noit_log_stream_free(ls);
737     ls = saved;
738   }
739   else {
740     /* We strdup the name *again*.  We'going to kansas city shuffle the
741      * ls later (see memcpy above).  However, if don't strdup, then the
742      * noit_log_stream_free up there will sweep our key right our from
743      * under us.
744      */
745     if(noit_hash_store(&noit_loggers,
746                        strdup(ls->name), strlen(ls->name), ls) == 0)
747       goto freebail;
748     ls->lock = calloc(1, sizeof(*ls->lock));
749     noit_log_init_rwlock(ls);
750   }
751   /* This is for things that don't open on paths */
752   if(ctx) ls->op_ctx = ctx;
753   return ls;
754
755  freebail:
756   fprintf(stderr, "Failed to instantiate logger(%s,%s,%s)\n",
757           name, type ? type : "[null]", path ? path : "[null]");
758   free(ls->name);
759   if(ls->path) free(ls->path);
760   if(ls->type) free(ls->type);
761   free(ls);
762   return NULL;
763 }
764
765 noit_log_stream_t
766 noit_log_stream_find(const char *name) {
767   void *vls;
768   if(noit_hash_retrieve(&noit_loggers, name, strlen(name), &vls)) {
769     return (noit_log_stream_t)vls;
770   }
771   return NULL;
772 }
773
774 void
775 noit_log_stream_remove(const char *name) {
776   noit_hash_delete(&noit_loggers, name, strlen(name), NULL, NULL);
777 }
778
779 void
780 noit_log_stream_add_stream(noit_log_stream_t ls, noit_log_stream_t outlet) {
781   struct _noit_log_stream_outlet_list *newnode;
782   newnode = calloc(1, sizeof(*newnode));
783   newnode->outlet = outlet;
784   newnode->next = ls->outlets;
785   ls->outlets = newnode;
786 }
787
788 noit_log_stream_t
789 noit_log_stream_remove_stream(noit_log_stream_t ls, const char *name) {
790   noit_log_stream_t outlet;
791   struct _noit_log_stream_outlet_list *node, *tmp;
792   if(!ls->outlets) return NULL;
793   if(!strcmp(ls->outlets->outlet->name, name)) {
794     node = ls->outlets;
795     ls->outlets = node->next;
796     outlet = node->outlet;
797     free(node);
798     return outlet;
799   }
800   for(node = ls->outlets; node->next; node = node->next) {
801     if(!strcmp(node->next->outlet->name, name)) {
802       /* splice */
803       tmp = node->next;
804       node->next = tmp->next;
805       /* pluck */
806       outlet = tmp->outlet;
807       /* shed */
808       free(tmp);
809       /* return */
810       return outlet;
811     }
812   }
813   return NULL;
814 }
815
816 void noit_log_stream_reopen(noit_log_stream_t ls) {
817   struct _noit_log_stream_outlet_list *node;
818   if(ls->ops) ls->ops->reopenop(ls);
819   for(node = ls->outlets; node; node = node->next) {
820     noit_log_stream_reopen(node->outlet);
821   }
822 }
823
824 int noit_log_stream_rename(noit_log_stream_t ls, const char *newname) {
825   return (ls->ops && ls->ops->renameop) ? ls->ops->renameop(ls, newname) : -1;
826 }
827
828 void
829 noit_log_stream_close(noit_log_stream_t ls) {
830   if(ls->ops) ls->ops->closeop(ls);
831 }
832
833 size_t
834 noit_log_stream_size(noit_log_stream_t ls) {
835   if(ls->ops && ls->ops->sizeop) return ls->ops->sizeop(ls);
836   return -1;
837 }
838
839 size_t
840 noit_log_stream_written(noit_log_stream_t ls) {
841   return ls->written;
842 }
843
844 void
845 noit_log_stream_free(noit_log_stream_t ls) {
846   if(ls) {
847     struct _noit_log_stream_outlet_list *node;
848     if(ls->name) free(ls->name);
849     if(ls->path) free(ls->path);
850     while(ls->outlets) {
851       node = ls->outlets->next;
852       free(ls->outlets);
853       ls->outlets = node;
854     }
855     if(ls->config) {
856       noit_hash_destroy(ls->config, free, free);
857       free(ls->config);
858     }
859     if(ls->lock) {
860       pthread_rwlock_destroy(ls->lock);
861       free(ls->lock);
862     }
863     free(ls);
864   }
865 }
866
867 static int
868 noit_log_writev(noit_log_stream_t ls, const struct iovec *iov, int iovcnt) {
869   /* This emulates writev into a buffer for ops that don't support it */
870   char stackbuff[4096], *tofree = NULL, *buff = NULL;
871   int i, s = 0, ins = 0;
872
873   if(!ls->ops) return -1;
874   if(ls->ops->writevop) return ls->ops->writevop(ls, iov, iovcnt);
875   if(!ls->ops->writeop) return -1;
876   if(iovcnt == 1) return ls->ops->writeop(ls, iov[0].iov_base, iov[0].iov_len);
877
878   for(i=0;i<iovcnt;i++) s+=iov[i].iov_len;
879   if(s > sizeof(stackbuff)) {
880     tofree = buff = malloc(s);
881     if(tofree == NULL) return -1;
882   }
883   else buff = stackbuff;
884   for(i=0;i<iovcnt;i++) {
885     memcpy(buff + ins, iov[i].iov_base, iov[i].iov_len);
886     ins += iov[i].iov_len;
887   }
888   i = ls->ops->writeop(ls, buff, s);
889   if(tofree) free(tofree);
890   return i;
891 }
892
893 static int
894 noit_log_line(noit_log_stream_t ls,
895               const char *timebuf, int timebuflen,
896               const char *debugbuf, int debugbuflen,
897               const char *buffer, size_t len) {
898   int rv = 0;
899   struct _noit_log_stream_outlet_list *node;
900   if(ls->ops) {
901     int iovcnt = 0;
902     struct iovec iov[3];
903     if(ls->timestamps) {
904       iov[iovcnt].iov_base = (void *)timebuf;
905       iov[iovcnt].iov_len = timebuflen;
906       iovcnt++;
907     }
908     if(ls->debug) {
909       iov[iovcnt].iov_base = (void *)debugbuf;
910       iov[iovcnt].iov_len = debugbuflen;
911       iovcnt++;
912     }
913     iov[iovcnt].iov_base = (void *)buffer;
914     iov[iovcnt].iov_len = len;
915     iovcnt++;
916     rv = noit_log_writev(ls, iov, iovcnt);
917   }
918   for(node = ls->outlets; node; node = node->next) {
919     int srv = 0;
920     debug_printf(" %s -> %s\n", ls->name, node->outlet->name);
921     srv = noit_log_line(node->outlet, timebuf, timebuflen, debugbuf, debugbuflen, buffer, len);
922     if(srv) rv = srv;
923   }
924   return rv;
925 }
926 int
927 noit_vlog(noit_log_stream_t ls, struct timeval *now,
928           const char *file, int line,
929           const char *format, va_list arg) {
930   int rv = 0, allocd = 0;
931   char buffer[4096], *dynbuff = NULL;
932 #ifdef va_copy
933   va_list copy;
934 #endif
935
936   if(ls->enabled || NOIT_LOG_LOG_ENABLED()) {
937     int len;
938     char tbuf[48], dbuf[80];
939     int tbuflen = 0, dbuflen = 0;
940     MATERIALIZE_DEPS(ls);
941     if(ls->timestamps_below) {
942       struct tm _tm, *tm;
943       char tempbuf[32];
944       time_t s = (time_t)now->tv_sec;
945       tm = localtime_r(&s, &_tm);
946       strftime(tempbuf, sizeof(tempbuf), "%Y-%m-%d %H:%M:%S", tm);
947       snprintf(tbuf, sizeof(tbuf), "[%s.%06d] ", tempbuf, (int)now->tv_usec);
948       tbuflen = strlen(tbuf);
949     }
950     else tbuf[0] = '\0';
951     if(ls->debug_below) {
952       snprintf(dbuf, sizeof(dbuf), "[%s:%d] ", file, line);
953       dbuflen = strlen(dbuf);
954     }
955     else dbuf[0] = '\0';
956 #ifdef va_copy
957     va_copy(copy, arg);
958     len = vsnprintf(buffer, sizeof(buffer), format, copy);
959     va_end(copy);
960 #else
961     len = vsnprintf(buffer, sizeof(buffer), format, arg);
962 #endif
963     if(len > sizeof(buffer)) {
964       allocd = sizeof(buffer);
965       while(len > allocd) { /* guaranteed true the first time */
966         while(len > allocd) allocd <<= 2;
967         if(dynbuff) free(dynbuff);
968         dynbuff = malloc(allocd);
969         assert(dynbuff);
970 #ifdef va_copy
971         va_copy(copy, arg);
972         len = vsnprintf(dynbuff, allocd, format, copy);
973         va_end(copy);
974 #else
975         len = vsnprintf(dynbuff, allocd, format, arg);
976 #endif
977       }
978       NOIT_LOG_LOG(ls->name, (char *)file, line, dynbuff);
979       if(ls->enabled)
980         rv = noit_log_line(ls, tbuf, tbuflen, dbuf, dbuflen, dynbuff, len);
981       free(dynbuff);
982     }
983     else {
984       NOIT_LOG_LOG(ls->name, (char *)file, line, buffer);
985       if(ls->enabled)
986         rv = noit_log_line(ls, tbuf, tbuflen, dbuf, dbuflen, buffer, len);
987     }
988     if(rv == len) return 0;
989     return -1;
990   }
991   return 0;
992 }
993
994 int
995 noit_log(noit_log_stream_t ls, struct timeval *now,
996          const char *file, int line, const char *format, ...) {
997   int rv;
998   va_list arg;
999   va_start(arg, format);
1000   rv = noit_vlog(ls, now, file, line, format, arg);
1001   va_end(arg);
1002   return rv;
1003 }
1004
1005 int
1006 noit_log_reopen_all() {
1007   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1008   const char *k;
1009   int klen, rv = 0;
1010   void *data;
1011   noit_log_stream_t ls;
1012
1013   while(noit_hash_next(&noit_loggers, &iter, &k, &klen, &data)) {
1014     ls = data;
1015     if(ls->ops) if(ls->ops->reopenop(ls) < 0) rv = -1;
1016   }
1017   return rv;
1018 }
1019
Note: See TracBrowser for help on using the browser.