root/src/noit_livestream_listener.c

Revision 1d66d17430b90a0b627f20e400aa072a8a611a27, 9.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 1 month ago)

Change asynch loggers to start synchronous and require explicit change.

noit_log_go_a?synch() will make all logs (that support it) go
asynchronous or synchronous. This allows us to start up (parent)
with synchronous logging and then only enable that in the child.
This makes reopening the log file form the parent easier. Also
add support for reopening a single type of log and have the watchdog
reopen the "file" type logs in case they've been rotated out. Attempt
to make newly created log files have the same ownership as the
previous log file.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_memory.h"
38 #include "utils/noit_log.h"
39 #include "utils/noit_sem.h"
40 #include "noit_livestream_listener.h"
41 #include "noit_check.h"
42
43 #include <unistd.h>
44 #include <errno.h>
45
46 static noit_atomic32_t ls_counter = 0;
47
48 struct log_entry {
49   int len;
50   char *buff;
51   struct log_entry *next;
52 };
53
54 typedef struct {
55   u_int32_t period;
56   noit_boolean period_read;
57   struct log_entry *lqueue;
58   struct log_entry *lqueue_end;
59   sem_t lqueue_sem;
60   pthread_mutex_t lqueue_lock;
61   int uuid_read;
62   char uuid_str[37];
63   char *feed;
64   uuid_t uuid;
65   noit_check_t *check;
66   int wants_shutdown;
67 } noit_livestream_closure_t;
68
69 noit_livestream_closure_t *
70 noit_livestream_closure_alloc(void) {
71   noit_livestream_closure_t *jcl;
72   jcl = calloc(1, sizeof(*jcl));
73   pthread_mutex_init(&jcl->lqueue_lock, NULL);
74   sem_init(&jcl->lqueue_sem, 0, 0);
75   return jcl;
76 }
77
78 void
79 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
80   struct log_entry *tofree;
81   while(jcl->lqueue) {
82     tofree = jcl->lqueue;
83     jcl->lqueue = jcl->lqueue->next;
84     free(tofree->buff);
85     free(tofree);
86   }
87   free(jcl);
88 }
89
90 static int
91 noit_livestream_logio_open(noit_log_stream_t ls) {
92   return 0;
93 }
94 static int
95 noit_livestream_logio_reopen(noit_log_stream_t ls) {
96   /* no op */
97   return 0;
98 }
99 static int
100 noit_livestream_logio_write(noit_log_stream_t ls, const struct timeval *whence,
101                             const void *buf, size_t len) {
102   noit_livestream_closure_t *jcl;
103   struct log_entry *le;
104   (void)whence;
105
106   jcl = noit_log_stream_get_ctx(ls);
107   if(!jcl) return 0;
108
109   if(jcl->wants_shutdown) {
110     /* This has been terminated by the client, _fail here_ */
111     return 0;
112   }
113
114   le = calloc(1, sizeof(*le));
115   le->len = len;
116   le->buff = malloc(len);
117   memcpy(le->buff, buf, len);
118   le->next = NULL;
119   pthread_mutex_lock(&jcl->lqueue_lock);
120   if(!jcl->lqueue_end) jcl->lqueue = le;
121   else jcl->lqueue_end->next = le;
122   jcl->lqueue_end = le;
123   pthread_mutex_unlock(&jcl->lqueue_lock);
124   sem_post(&jcl->lqueue_sem);
125   return len;
126 }
127 static int
128 noit_livestream_logio_close(noit_log_stream_t ls) {
129   noit_livestream_closure_t *jcl;
130   jcl = noit_log_stream_get_ctx(ls);
131   if(jcl) noit_livestream_closure_free(jcl);
132   noit_log_stream_set_ctx(ls, NULL);
133   return 0;
134 }
135 static logops_t noit_livestream_logio_ops = {
136   noit_false,
137   noit_livestream_logio_open,
138   noit_livestream_logio_reopen,
139   noit_livestream_logio_write,
140   NULL,
141   noit_livestream_logio_close,
142   NULL,
143   NULL
144 };
145
146 void
147 noit_livestream_listener_init() {
148   noit_register_logops("noit_livestream", &noit_livestream_logio_ops);
149   eventer_name_callback("livestream_transit/1.0", noit_livestream_handler);
150   noit_control_dispatch_delegate(noit_control_dispatch,
151                                  NOIT_LIVESTREAM_DATA_FEED,
152                                  noit_livestream_handler);
153 }
154
155 static int
156 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
157   int w, sofar = 0;
158   while(l > sofar) {
159     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
160     if(w <= 0) return w;
161     sofar += w;
162   }
163   return sofar;
164 }
165 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
166
167 void *
168 noit_livestream_thread_main(void *e_vptr) {
169   int mask;
170   eventer_t e = e_vptr;
171   acceptor_closure_t *ac = e->closure;
172   noit_livestream_closure_t *jcl = ac->service_ctx;
173
174   noit_memory_init_thread();
175   /* Go into blocking mode */
176   if(eventer_set_fd_blocking(e->fd) == -1) {
177     noitL(noit_error, "failed setting livestream to blocking: [%d] [%s]\n",
178           errno, strerror(errno));
179     goto alldone;
180   }
181
182   while(1) {
183     u_int32_t netlen;
184     struct log_entry *le = NULL;
185     int rv;
186    
187     sem_wait(&jcl->lqueue_sem);
188     pthread_mutex_lock(&jcl->lqueue_lock);
189     if(jcl->lqueue) {
190       /* If there are items, pop and advance the header pointer */
191       le = jcl->lqueue;
192       jcl->lqueue = jcl->lqueue->next;
193       if(!jcl->lqueue) jcl->lqueue_end = NULL;
194     }
195     pthread_mutex_unlock(&jcl->lqueue_lock);
196
197     if(!le) continue;
198
199     /* Here we actually push the message */
200     netlen = htonl(le->len);
201     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
202       noitL(noit_error, "Error writing le header over SSL %d != %d\n",
203             rv, (int)sizeof(netlen));
204       goto alldone;
205     }
206     if((rv = Ewrite(le->buff, le->len)) != le->len) {
207       noitL(noit_error, "Error writing livestream message over SSL %d != %d\n",
208             rv, le->len);
209       goto alldone;
210     }
211   }
212
213  alldone:
214   e->opset->close(e->fd, &mask, e);
215   jcl->wants_shutdown = 1;
216   acceptor_closure_free(ac);
217   noit_memory_maintenance();
218   /* Our semaphores are counting semaphores, not locks. */
219   /* coverity[missing_unlock] */
220   return NULL;
221 }
222
223 int
224 noit_livestream_handler(eventer_t e, int mask, void *closure,
225                         struct timeval *now) {
226   eventer_t newe;
227   pthread_t tid;
228   pthread_attr_t tattr;
229   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
230   acceptor_closure_t *ac = closure;
231   noit_livestream_closure_t *jcl = ac->service_ctx;
232
233   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
234 socket_error:
235     /* Exceptions cause us to simply snip the connection */
236     eventer_remove_fd(e->fd);
237     e->opset->close(e->fd, &newmask, e);
238     if(jcl) noit_livestream_closure_free(jcl);
239     acceptor_closure_free(ac);
240     return 0;
241   }
242
243   if(!ac->service_ctx || !jcl->feed) {
244     int len;
245     if(!ac->service_ctx) ac->service_ctx = noit_livestream_closure_alloc();
246     jcl = ac->service_ctx;
247     /* Setup logger to this channel */
248     noitL(noit_debug, "livestream initializing on fd %d\n", e->fd);
249     if(!jcl->period_read) {
250       u_int32_t nperiod;
251       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
252       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
253       if(len != sizeof(nperiod)) goto socket_error;
254       jcl->period = ntohl(nperiod);
255       jcl->period_read = noit_true;
256       noitL(noit_debug, "livestream initializing on fd %d [period %d]\n",
257             e->fd, jcl->period);
258     }
259     while(jcl->uuid_read < 36) {
260       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
261       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
262       if(len == 0) goto socket_error;
263       jcl->uuid_read += len;
264     }
265     jcl->uuid_str[36] = '\0';
266     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
267       noitL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
268       goto socket_error;
269     }
270     noitL(noit_debug, "livestream initializing on fd %d [uuid %s]\n",
271           e->fd, jcl->uuid_str);
272
273     jcl->feed = malloc(32);
274     snprintf(jcl->feed, 32, "livestream/%d", noit_atomic_inc32(&ls_counter));
275     noit_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
276                         jcl, NULL);
277
278
279     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
280     if(!jcl->check) {
281       e->opset->close(e->fd, &newmask, e);
282       return 0;
283     }
284     /* This check must be watched from the livestream */
285     noit_check_transient_add_feed(jcl->check, jcl->feed);
286     /* Note the check */
287     noit_check_log_check(jcl->check);
288     /* kick it off, if it isn't running already */
289     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
290   }
291
292   eventer_remove_fd(e->fd);
293   newe = eventer_alloc();
294   memcpy(newe, e, sizeof(*e));
295   pthread_attr_init(&tattr);
296   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
297   if(pthread_create(&tid, &tattr, noit_livestream_thread_main, newe) == 0) {
298     return 0;
299   }
300
301   noit_check_transient_remove_feed(jcl->check, jcl->feed);
302   noit_livestream_closure_free(jcl);
303   /* Undo our dup */
304   eventer_free(newe);
305   /* Creating the thread failed, close it down and deschedule. */
306   e->opset->close(e->fd, &newmask, e);
307   return 0;
308 }
Note: See TracBrowser for help on using the browser.