root/src/noit_livestream_listener.c

Revision e777c361cd646308f382dc6e94c33adc3d8efda0, 9.7 kB (checked in by GitHub <noreply@github.com>, 2 months ago)

Websockets and externalization of histogram code into libcircllhist (#278)

* WIP, livestreaming over websocket

* fix livestream mtev_log_stream_t leak

* move histogram impl out of reconnoiter tree into libcircllhist repo, tests for websockets

* whitespace cleanup

* fix comment

* output any websocket errors as json

* fix integer names

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35 #include <eventer/eventer.h>
36 #include <mtev_listener.h>
37 #include <mtev_memory.h>
38 #include <mtev_sem.h>
39
40 #include "noit_mtev_bridge.h"
41 #include "noit_livestream_listener.h"
42 #include "noit_check.h"
43
44 #include <unistd.h>
45 #include <errno.h>
46
47 static mtev_atomic32_t ls_counter = 0;
48
49 struct log_entry {
50   int len;
51   char *buff;
52   struct log_entry *next;
53 };
54
55 typedef struct {
56   u_int32_t period;
57   mtev_boolean period_read;
58   struct log_entry *lqueue;
59   struct log_entry *lqueue_end;
60   sem_t lqueue_sem;
61   pthread_mutex_t lqueue_lock;
62   int uuid_read;
63   char uuid_str[37];
64   char *feed;
65   uuid_t uuid;
66   noit_check_t *check;
67   int wants_shutdown;
68   mtev_log_stream_t log_stream;
69 } noit_livestream_closure_t;
70
71 noit_livestream_closure_t *
72 noit_livestream_closure_alloc(void) {
73   noit_livestream_closure_t *jcl;
74   jcl = calloc(1, sizeof(*jcl));
75   pthread_mutex_init(&jcl->lqueue_lock, NULL);
76   sem_init(&jcl->lqueue_sem, 0, 0);
77   return jcl;
78 }
79
80 void
81 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
82   struct log_entry *tofree;
83   while(jcl->lqueue) {
84     tofree = jcl->lqueue;
85     jcl->lqueue = jcl->lqueue->next;
86     free(tofree->buff);
87     free(tofree);
88   }
89   free(jcl);
90 }
91
92 static int
93 noit_livestream_logio_open(mtev_log_stream_t ls) {
94   return 0;
95 }
96 static int
97 noit_livestream_logio_reopen(mtev_log_stream_t ls) {
98   /* no op */
99   return 0;
100 }
101 static int
102 noit_livestream_logio_write(mtev_log_stream_t ls, const struct timeval *whence,
103                             const void *buf, size_t len) {
104   noit_livestream_closure_t *jcl;
105   struct log_entry *le;
106   (void)whence;
107
108   jcl = mtev_log_stream_get_ctx(ls);
109   if(!jcl) return 0;
110
111   if(jcl->wants_shutdown) {
112     /* This has been terminated by the client, _fail here_ */
113     return 0;
114   }
115
116   le = calloc(1, sizeof(*le));
117   le->len = len;
118   le->buff = malloc(len);
119   memcpy(le->buff, buf, len);
120   le->next = NULL;
121   pthread_mutex_lock(&jcl->lqueue_lock);
122   if(!jcl->lqueue_end) jcl->lqueue = le;
123   else jcl->lqueue_end->next = le;
124   jcl->lqueue_end = le;
125   pthread_mutex_unlock(&jcl->lqueue_lock);
126   sem_post(&jcl->lqueue_sem);
127   return len;
128 }
129 static int
130 noit_livestream_logio_close(mtev_log_stream_t ls) {
131   noit_livestream_closure_t *jcl;
132   jcl = mtev_log_stream_get_ctx(ls);
133   if(jcl) noit_livestream_closure_free(jcl);
134   mtev_log_stream_set_ctx(ls, NULL);
135   return 0;
136 }
137 static logops_t noit_livestream_logio_ops = {
138   mtev_false,
139   noit_livestream_logio_open,
140   noit_livestream_logio_reopen,
141   noit_livestream_logio_write,
142   NULL,
143   noit_livestream_logio_close,
144   NULL,
145   NULL
146 };
147
148 void
149 noit_livestream_listener_init() {
150   mtev_register_logops("noit_livestream", &noit_livestream_logio_ops);
151   eventer_name_callback("livestream_transit/1.0", noit_livestream_handler);
152   mtev_control_dispatch_delegate(mtev_control_dispatch,
153                                  NOIT_LIVESTREAM_DATA_FEED,
154                                  noit_livestream_handler);
155 }
156
157 static int
158 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
159   int w, sofar = 0;
160   while(l > sofar) {
161     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
162     if(w <= 0) return w;
163     sofar += w;
164   }
165   return sofar;
166 }
167 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
168
169 void *
170 noit_livestream_thread_main(void *e_vptr) {
171   int mask;
172   eventer_t e = e_vptr;
173   acceptor_closure_t *ac = e->closure;
174   noit_livestream_closure_t *jcl = ac->service_ctx;
175   struct log_entry *le = NULL;
176
177   mtev_memory_init_thread();
178   /* Go into blocking mode */
179   if(eventer_set_fd_blocking(e->fd) == -1) {
180     mtevL(noit_error, "failed setting livestream to blocking: [%d] [%s]\n",
181           errno, strerror(errno));
182     goto alldone;
183   }
184
185   while(1) {
186     u_int32_t netlen;
187     int rv;
188     le = NULL;
189
190     sem_wait(&jcl->lqueue_sem);
191     pthread_mutex_lock(&jcl->lqueue_lock);
192     if(jcl->lqueue) {
193       /* If there are items, pop and advance the header pointer */
194       le = jcl->lqueue;
195       jcl->lqueue = jcl->lqueue->next;
196       if(!jcl->lqueue) jcl->lqueue_end = NULL;
197     }
198     pthread_mutex_unlock(&jcl->lqueue_lock);
199
200     if(!le) continue;
201
202     /* Here we actually push the message */
203     netlen = htonl(le->len);
204     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
205       mtevL(noit_error, "Error writing le header over SSL %d != %d\n",
206             rv, (int)sizeof(netlen));
207       goto alldone;
208     }
209     if((rv = Ewrite(le->buff, le->len)) != le->len) {
210       mtevL(noit_error, "Error writing livestream message over SSL %d != %d\n",
211             rv, le->len);
212       goto alldone;
213     }
214     if (le->buff) free(le->buff);
215     free(le);
216   }
217
218  alldone:
219   if (le) {
220     if (le->buff) free(le->buff);
221     free(le);
222   }
223   e->opset->close(e->fd, &mask, e);
224   jcl->wants_shutdown = 1;
225   acceptor_closure_free(ac);
226   mtev_memory_maintenance();
227   /* Our semaphores are counting semaphores, not locks. */
228   /* coverity[missing_unlock] */
229   return NULL;
230 }
231
232 int
233 noit_livestream_handler(eventer_t e, int mask, void *closure,
234                         struct timeval *now) {
235   eventer_t newe;
236   pthread_t tid;
237   pthread_attr_t tattr;
238   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
239   acceptor_closure_t *ac = closure;
240   noit_livestream_closure_t *jcl = ac->service_ctx;
241
242   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
243 socket_error:
244     /* Exceptions cause us to simply snip the connection */
245     eventer_remove_fd(e->fd);
246     e->opset->close(e->fd, &newmask, e);
247     mtev_log_stream_t ls = jcl->log_stream;
248     /* will free the noit_livestream_closure_t */
249     mtev_log_stream_close(ls);
250     mtev_log_stream_free(ls);
251     ac->service_ctx = NULL;
252     acceptor_closure_free(ac);
253     return 0;
254   }
255
256   if(!ac->service_ctx || !jcl->feed) {
257     int len;
258     if(!ac->service_ctx) ac->service_ctx = noit_livestream_closure_alloc();
259     jcl = ac->service_ctx;
260     /* Setup logger to this channel */
261     mtevL(noit_debug, "livestream initializing on fd %d\n", e->fd);
262     if(!jcl->period_read) {
263       u_int32_t nperiod;
264       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
265       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
266       if(len != sizeof(nperiod)) goto socket_error;
267       jcl->period = ntohl(nperiod);
268       jcl->period_read = mtev_true;
269       mtevL(noit_debug, "livestream initializing on fd %d [period %d]\n",
270             e->fd, jcl->period);
271     }
272     while(jcl->uuid_read < 36) {
273       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
274       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
275       if(len == 0) goto socket_error;
276       jcl->uuid_read += len;
277     }
278     jcl->uuid_str[36] = '\0';
279     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
280       mtevL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
281       goto socket_error;
282     }
283     mtevL(noit_debug, "livestream initializing on fd %d [uuid %s]\n",
284           e->fd, jcl->uuid_str);
285
286     jcl->feed = malloc(32);
287     snprintf(jcl->feed, 32, "livestream/%d", mtev_atomic_inc32(&ls_counter));
288     jcl->log_stream = mtev_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
289                                           jcl, NULL);
290
291
292     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
293     if(!jcl->check) {
294       e->opset->close(e->fd, &newmask, e);
295       return 0;
296     }
297     /* This check must be watched from the livestream */
298     noit_check_transient_add_feed(jcl->check, jcl->feed);
299     /* Note the check */
300     noit_check_log_check(jcl->check);
301     /* kick it off, if it isn't running already */
302     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
303   }
304
305   eventer_remove_fd(e->fd);
306   newe = eventer_alloc();
307   memcpy(newe, e, sizeof(*e));
308   pthread_attr_init(&tattr);
309   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
310   if(pthread_create(&tid, &tattr, noit_livestream_thread_main, newe) == 0) {
311     return 0;
312   }
313
314   noit_check_transient_remove_feed(jcl->check, jcl->feed);
315   mtev_log_stream_t ls = jcl->log_stream;
316   mtev_log_stream_close(ls);
317   mtev_log_stream_free(ls);
318   ac->service_ctx = NULL;
319   /* Undo our dup */
320   eventer_free(newe);
321   /* Creating the thread failed, close it down and deschedule. */
322   e->opset->close(e->fd, &newmask, e);
323   return 0;
324 }
Note: See TracBrowser for help on using the browser.