root/src/noit_livestream_listener.c

Revision 44b47a085a3312bcaf73253fde40e4f9781f12a1, 9.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

If the livestream request data wasn't read in one go (i.e. EAGAIN was
hit), then it would recreate the context from scratch and attempt to
reread the whole request which would hang indefinitely and leak a jcl
context. This only seemed to present itself with hangle on with
localhost. Bug fixed.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "utils/noit_sem.h"
39 #include "noit_livestream_listener.h"
40 #include "noit_check.h"
41
42 #include <unistd.h>
43 #include <errno.h>
44
45 static noit_atomic32_t ls_counter = 0;
46
47 struct log_entry {
48   int len;
49   char *buff;
50   struct log_entry *next;
51 };
52
53 typedef struct {
54   u_int32_t period;
55   struct log_entry *lqueue;
56   struct log_entry *lqueue_end;
57   sem_t lqueue_sem;
58   pthread_mutex_t lqueue_lock;
59   int uuid_read;
60   char uuid_str[37];
61   char *feed;
62   uuid_t uuid;
63   noit_check_t *check;
64   int wants_shutdown;
65 } noit_livestream_closure_t;
66
67 noit_livestream_closure_t *
68 noit_livestream_closure_alloc(void) {
69   noit_livestream_closure_t *jcl;
70   jcl = calloc(1, sizeof(*jcl));
71   pthread_mutex_init(&jcl->lqueue_lock, NULL);
72   sem_init(&jcl->lqueue_sem, 0, 0);
73   return jcl;
74 }
75
76 void
77 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
78   struct log_entry *tofree;
79   while(jcl->lqueue) {
80     tofree = jcl->lqueue;
81     jcl->lqueue = jcl->lqueue->next;
82     free(tofree->buff);
83     free(tofree);
84   }
85   free(jcl);
86 }
87
88 static int
89 noit_livestream_logio_open(noit_log_stream_t ls) {
90   return 0;
91 }
92 static int
93 noit_livestream_logio_reopen(noit_log_stream_t ls) {
94   /* no op */
95   return 0;
96 }
97 static int
98 noit_livestream_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
99   noit_livestream_closure_t *jcl;
100   struct log_entry *le;
101
102   jcl = noit_log_stream_get_ctx(ls);
103   if(!jcl) return 0;
104
105   if(jcl->wants_shutdown) {
106     /* This has been terminated by the client, _fail here_ */
107     return 0;
108   }
109
110   le = calloc(1, sizeof(*le));
111   le->len = len;
112   le->buff = malloc(len);
113   memcpy(le->buff, buf, len);
114   le->next = NULL;
115   pthread_mutex_lock(&jcl->lqueue_lock);
116   if(!jcl->lqueue_end) jcl->lqueue = le;
117   else jcl->lqueue_end->next = le;
118   jcl->lqueue_end = le;
119   pthread_mutex_unlock(&jcl->lqueue_lock);
120   sem_post(&jcl->lqueue_sem);
121   return len;
122 }
123 static int
124 noit_livestream_logio_close(noit_log_stream_t ls) {
125   noit_livestream_closure_t *jcl;
126   jcl = noit_log_stream_get_ctx(ls);
127   if(jcl) noit_livestream_closure_free(jcl);
128   noit_log_stream_set_ctx(ls, NULL);
129   return 0;
130 }
131 static logops_t noit_livestream_logio_ops = {
132   noit_livestream_logio_open,
133   noit_livestream_logio_reopen,
134   noit_livestream_logio_write,
135   NULL,
136   noit_livestream_logio_close,
137   NULL,
138   NULL
139 };
140
141 void
142 noit_livestream_listener_init() {
143   noit_register_logops("noit_livestream", &noit_livestream_logio_ops);
144   eventer_name_callback("livestream_transit/1.0", noit_livestream_handler);
145   noit_control_dispatch_delegate(noit_control_dispatch,
146                                  NOIT_LIVESTREAM_DATA_FEED,
147                                  noit_livestream_handler);
148 }
149
150 static int
151 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
152   int w, sofar = 0;
153   while(l > sofar) {
154     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
155     if(w <= 0) return w;
156     sofar += w;
157   }
158   return sofar;
159 }
160 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
161
162 void *
163 noit_livestream_thread_main(void *e_vptr) {
164   int mask;
165   eventer_t e = e_vptr;
166   acceptor_closure_t *ac = e->closure;
167   noit_livestream_closure_t *jcl = ac->service_ctx;
168
169   /* Go into blocking mode */
170   if(eventer_set_fd_blocking(e->fd) == -1) {
171     noitL(noit_error, "failed setting livestream to blocking: [%d] [%s]\n",
172           errno, strerror(errno));
173     goto alldone;
174   }
175
176   while(1) {
177     u_int32_t netlen;
178     struct log_entry *le = NULL;
179     int rv;
180    
181     sem_wait(&jcl->lqueue_sem);
182     pthread_mutex_lock(&jcl->lqueue_lock);
183     if(jcl->lqueue) {
184       /* If there are items, pop and advance the header pointer */
185       le = jcl->lqueue;
186       jcl->lqueue = jcl->lqueue->next;
187       if(!jcl->lqueue) jcl->lqueue_end = NULL;
188     }
189     pthread_mutex_unlock(&jcl->lqueue_lock);
190
191     if(!le) continue;
192
193     /* Here we actually push the message */
194     netlen = htonl(le->len);
195     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
196       noitL(noit_error, "Error writing le header over SSL %d != %d\n",
197             rv, (int)sizeof(netlen));
198       goto alldone;
199     }
200     if((rv = Ewrite(le->buff, le->len)) != le->len) {
201       noitL(noit_error, "Error writing livestream message over SSL %d != %d\n",
202             rv, le->len);
203       goto alldone;
204     }
205   }
206
207  alldone:
208   e->opset->close(e->fd, &mask, e);
209   jcl->wants_shutdown = 1;
210   if(ac) acceptor_closure_free(ac);
211   return NULL;
212 }
213
214 int
215 noit_livestream_handler(eventer_t e, int mask, void *closure,
216                         struct timeval *now) {
217   eventer_t newe;
218   pthread_t tid;
219   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
220   acceptor_closure_t *ac = closure;
221   noit_livestream_closure_t *jcl = ac->service_ctx;
222
223   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
224 socket_error:
225     /* Exceptions cause us to simply snip the connection */
226     eventer_remove_fd(e->fd);
227     e->opset->close(e->fd, &newmask, e);
228     if(jcl) noit_livestream_closure_free(jcl);
229     if(ac) acceptor_closure_free(ac);
230     return 0;
231   }
232
233   if(!ac->service_ctx || !jcl->feed) {
234     int len;
235     if(!ac->service_ctx) ac->service_ctx = noit_livestream_closure_alloc();
236     jcl = ac->service_ctx;
237     /* Setup logger to this channel */
238     noitL(noit_debug, "livestream initializing on fd %d\n", e->fd);
239     if(!jcl->period) {
240       u_int32_t nperiod;
241       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
242       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
243       if(len != sizeof(nperiod)) goto socket_error;
244       jcl->period = ntohl(nperiod);
245       if(!jcl->period) {
246         noitL(noit_error, "period of 0 specified in livestream.  not allowed.\n");
247         goto socket_error;
248       }
249       noitL(noit_debug, "livestream initializing on fd %d [period %d]\n",
250             e->fd, jcl->period);
251     }
252     while(jcl->uuid_read < 36) {
253       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
254       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
255       if(len == 0) goto socket_error;
256       jcl->uuid_read += len;
257     }
258     jcl->uuid_str[36] = '\0';
259     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
260       noitL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
261       goto socket_error;
262     }
263     noitL(noit_debug, "livestream initializing on fd %d [uuid %s]\n",
264           e->fd, jcl->uuid_str);
265
266     jcl->feed = malloc(32);
267     snprintf(jcl->feed, 32, "livestream/%d", noit_atomic_inc32(&ls_counter));
268     noit_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
269                         jcl, NULL);
270
271
272     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
273     if(!jcl->check) {
274       e->opset->close(e->fd, &newmask, e);
275       return 0;
276     }
277     /* This check must be watched from the livestream */
278     noit_check_transient_add_feed(jcl->check, jcl->feed);
279     /* Note the check */
280     noit_check_log_check(jcl->check);
281     /* kick it off, if it isn't running already */
282     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
283   }
284
285   eventer_remove_fd(e->fd);
286   newe = eventer_alloc();
287   memcpy(newe, e, sizeof(*e));
288   if(pthread_create(&tid, NULL, noit_livestream_thread_main, newe) == 0) {
289     return 0;
290   }
291
292   noit_check_transient_remove_feed(jcl->check, jcl->feed);
293   noit_livestream_closure_free(jcl);
294   /* Undo our dup */
295   eventer_free(newe);
296   /* Creating the thread failed, close it down and deschedule. */
297   e->opset->close(e->fd, &newmask, e);
298   return 0;
299 }
Note: See TracBrowser for help on using the browser.