root/src/noit_livestream_listener.c

Revision 2417a3262808d93c08cb01f0c7dfc7a7d40df766, 9.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

Allow livestreaming to attach to the non-transient check.

If a period of 0 is provided, just add this livestream as
a feed on the master check. It won't show up in watches, but
it acts like a normal watch.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "utils/noit_sem.h"
39 #include "noit_livestream_listener.h"
40 #include "noit_check.h"
41
42 #include <unistd.h>
43 #include <errno.h>
44
45 static noit_atomic32_t ls_counter = 0;
46
47 struct log_entry {
48   int len;
49   char *buff;
50   struct log_entry *next;
51 };
52
53 typedef struct {
54   u_int32_t period;
55   noit_boolean period_read;
56   struct log_entry *lqueue;
57   struct log_entry *lqueue_end;
58   sem_t lqueue_sem;
59   pthread_mutex_t lqueue_lock;
60   int uuid_read;
61   char uuid_str[37];
62   char *feed;
63   uuid_t uuid;
64   noit_check_t *check;
65   int wants_shutdown;
66 } noit_livestream_closure_t;
67
68 noit_livestream_closure_t *
69 noit_livestream_closure_alloc(void) {
70   noit_livestream_closure_t *jcl;
71   jcl = calloc(1, sizeof(*jcl));
72   pthread_mutex_init(&jcl->lqueue_lock, NULL);
73   sem_init(&jcl->lqueue_sem, 0, 0);
74   return jcl;
75 }
76
77 void
78 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
79   struct log_entry *tofree;
80   while(jcl->lqueue) {
81     tofree = jcl->lqueue;
82     jcl->lqueue = jcl->lqueue->next;
83     free(tofree->buff);
84     free(tofree);
85   }
86   free(jcl);
87 }
88
89 static int
90 noit_livestream_logio_open(noit_log_stream_t ls) {
91   return 0;
92 }
93 static int
94 noit_livestream_logio_reopen(noit_log_stream_t ls) {
95   /* no op */
96   return 0;
97 }
98 static int
99 noit_livestream_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
100   noit_livestream_closure_t *jcl;
101   struct log_entry *le;
102
103   jcl = noit_log_stream_get_ctx(ls);
104   if(!jcl) return 0;
105
106   if(jcl->wants_shutdown) {
107     /* This has been terminated by the client, _fail here_ */
108     return 0;
109   }
110
111   le = calloc(1, sizeof(*le));
112   le->len = len;
113   le->buff = malloc(len);
114   memcpy(le->buff, buf, len);
115   le->next = NULL;
116   pthread_mutex_lock(&jcl->lqueue_lock);
117   if(!jcl->lqueue_end) jcl->lqueue = le;
118   else jcl->lqueue_end->next = le;
119   jcl->lqueue_end = le;
120   pthread_mutex_unlock(&jcl->lqueue_lock);
121   sem_post(&jcl->lqueue_sem);
122   return len;
123 }
124 static int
125 noit_livestream_logio_close(noit_log_stream_t ls) {
126   noit_livestream_closure_t *jcl;
127   jcl = noit_log_stream_get_ctx(ls);
128   if(jcl) noit_livestream_closure_free(jcl);
129   noit_log_stream_set_ctx(ls, NULL);
130   return 0;
131 }
132 static logops_t noit_livestream_logio_ops = {
133   noit_livestream_logio_open,
134   noit_livestream_logio_reopen,
135   noit_livestream_logio_write,
136   NULL,
137   noit_livestream_logio_close,
138   NULL,
139   NULL
140 };
141
142 void
143 noit_livestream_listener_init() {
144   noit_register_logops("noit_livestream", &noit_livestream_logio_ops);
145   eventer_name_callback("livestream_transit/1.0", noit_livestream_handler);
146   noit_control_dispatch_delegate(noit_control_dispatch,
147                                  NOIT_LIVESTREAM_DATA_FEED,
148                                  noit_livestream_handler);
149 }
150
151 static int
152 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
153   int w, sofar = 0;
154   while(l > sofar) {
155     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
156     if(w <= 0) return w;
157     sofar += w;
158   }
159   return sofar;
160 }
161 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
162
163 void *
164 noit_livestream_thread_main(void *e_vptr) {
165   int mask;
166   eventer_t e = e_vptr;
167   acceptor_closure_t *ac = e->closure;
168   noit_livestream_closure_t *jcl = ac->service_ctx;
169
170   /* Go into blocking mode */
171   if(eventer_set_fd_blocking(e->fd) == -1) {
172     noitL(noit_error, "failed setting livestream to blocking: [%d] [%s]\n",
173           errno, strerror(errno));
174     goto alldone;
175   }
176
177   while(1) {
178     u_int32_t netlen;
179     struct log_entry *le = NULL;
180     int rv;
181    
182     sem_wait(&jcl->lqueue_sem);
183     pthread_mutex_lock(&jcl->lqueue_lock);
184     if(jcl->lqueue) {
185       /* If there are items, pop and advance the header pointer */
186       le = jcl->lqueue;
187       jcl->lqueue = jcl->lqueue->next;
188       if(!jcl->lqueue) jcl->lqueue_end = NULL;
189     }
190     pthread_mutex_unlock(&jcl->lqueue_lock);
191
192     if(!le) continue;
193
194     /* Here we actually push the message */
195     netlen = htonl(le->len);
196     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
197       noitL(noit_error, "Error writing le header over SSL %d != %d\n",
198             rv, (int)sizeof(netlen));
199       goto alldone;
200     }
201     if((rv = Ewrite(le->buff, le->len)) != le->len) {
202       noitL(noit_error, "Error writing livestream message over SSL %d != %d\n",
203             rv, le->len);
204       goto alldone;
205     }
206   }
207
208  alldone:
209   e->opset->close(e->fd, &mask, e);
210   jcl->wants_shutdown = 1;
211   if(ac) acceptor_closure_free(ac);
212   return NULL;
213 }
214
215 int
216 noit_livestream_handler(eventer_t e, int mask, void *closure,
217                         struct timeval *now) {
218   eventer_t newe;
219   pthread_t tid;
220   pthread_attr_t tattr;
221   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
222   acceptor_closure_t *ac = closure;
223   noit_livestream_closure_t *jcl = ac->service_ctx;
224
225   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
226 socket_error:
227     /* Exceptions cause us to simply snip the connection */
228     eventer_remove_fd(e->fd);
229     e->opset->close(e->fd, &newmask, e);
230     if(jcl) noit_livestream_closure_free(jcl);
231     if(ac) acceptor_closure_free(ac);
232     return 0;
233   }
234
235   if(!ac->service_ctx || !jcl->feed) {
236     int len;
237     if(!ac->service_ctx) ac->service_ctx = noit_livestream_closure_alloc();
238     jcl = ac->service_ctx;
239     /* Setup logger to this channel */
240     noitL(noit_debug, "livestream initializing on fd %d\n", e->fd);
241     if(!jcl->period_read) {
242       u_int32_t nperiod;
243       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
244       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
245       if(len != sizeof(nperiod)) goto socket_error;
246       jcl->period = ntohl(nperiod);
247       jcl->period_read = noit_true;
248       noitL(noit_debug, "livestream initializing on fd %d [period %d]\n",
249             e->fd, jcl->period);
250     }
251     while(jcl->uuid_read < 36) {
252       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
253       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
254       if(len == 0) goto socket_error;
255       jcl->uuid_read += len;
256     }
257     jcl->uuid_str[36] = '\0';
258     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
259       noitL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
260       goto socket_error;
261     }
262     noitL(noit_debug, "livestream initializing on fd %d [uuid %s]\n",
263           e->fd, jcl->uuid_str);
264
265     jcl->feed = malloc(32);
266     snprintf(jcl->feed, 32, "livestream/%d", noit_atomic_inc32(&ls_counter));
267     noit_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
268                         jcl, NULL);
269
270
271     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
272     if(!jcl->check) {
273       e->opset->close(e->fd, &newmask, e);
274       return 0;
275     }
276     /* This check must be watched from the livestream */
277     noit_check_transient_add_feed(jcl->check, jcl->feed);
278     /* Note the check */
279     noit_check_log_check(jcl->check);
280     /* kick it off, if it isn't running already */
281     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
282   }
283
284   eventer_remove_fd(e->fd);
285   newe = eventer_alloc();
286   memcpy(newe, e, sizeof(*e));
287   pthread_attr_init(&tattr);
288   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
289   if(pthread_create(&tid, &tattr, noit_livestream_thread_main, newe) == 0) {
290     return 0;
291   }
292
293   noit_check_transient_remove_feed(jcl->check, jcl->feed);
294   noit_livestream_closure_free(jcl);
295   /* Undo our dup */
296   eventer_free(newe);
297   /* Creating the thread failed, close it down and deschedule. */
298   e->opset->close(e->fd, &newmask, e);
299   return 0;
300 }
Note: See TracBrowser for help on using the browser.