root/src/noit_livestream_listener.c

Revision c730adec9b98429ce1368131ab29151fde18d34b, 9.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 1 year ago)

Add a "memory" logger type and expose it via console and rest.
This logger is allocation free on the write path. Logging was
refactored to pass the timeval all the way into the write(v)
operations; it is ignored by previous loggers.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "utils/noit_sem.h"
39 #include "noit_livestream_listener.h"
40 #include "noit_check.h"
41
42 #include <unistd.h>
43 #include <errno.h>
44
45 static noit_atomic32_t ls_counter = 0;
46
47 struct log_entry {
48   int len;
49   char *buff;
50   struct log_entry *next;
51 };
52
53 typedef struct {
54   u_int32_t period;
55   noit_boolean period_read;
56   struct log_entry *lqueue;
57   struct log_entry *lqueue_end;
58   sem_t lqueue_sem;
59   pthread_mutex_t lqueue_lock;
60   int uuid_read;
61   char uuid_str[37];
62   char *feed;
63   uuid_t uuid;
64   noit_check_t *check;
65   int wants_shutdown;
66 } noit_livestream_closure_t;
67
68 noit_livestream_closure_t *
69 noit_livestream_closure_alloc(void) {
70   noit_livestream_closure_t *jcl;
71   jcl = calloc(1, sizeof(*jcl));
72   pthread_mutex_init(&jcl->lqueue_lock, NULL);
73   sem_init(&jcl->lqueue_sem, 0, 0);
74   return jcl;
75 }
76
77 void
78 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
79   struct log_entry *tofree;
80   while(jcl->lqueue) {
81     tofree = jcl->lqueue;
82     jcl->lqueue = jcl->lqueue->next;
83     free(tofree->buff);
84     free(tofree);
85   }
86   free(jcl);
87 }
88
89 static int
90 noit_livestream_logio_open(noit_log_stream_t ls) {
91   return 0;
92 }
93 static int
94 noit_livestream_logio_reopen(noit_log_stream_t ls) {
95   /* no op */
96   return 0;
97 }
98 static int
99 noit_livestream_logio_write(noit_log_stream_t ls, const struct timeval *whence,
100                             const void *buf, size_t len) {
101   noit_livestream_closure_t *jcl;
102   struct log_entry *le;
103   (void)whence;
104
105   jcl = noit_log_stream_get_ctx(ls);
106   if(!jcl) return 0;
107
108   if(jcl->wants_shutdown) {
109     /* This has been terminated by the client, _fail here_ */
110     return 0;
111   }
112
113   le = calloc(1, sizeof(*le));
114   le->len = len;
115   le->buff = malloc(len);
116   memcpy(le->buff, buf, len);
117   le->next = NULL;
118   pthread_mutex_lock(&jcl->lqueue_lock);
119   if(!jcl->lqueue_end) jcl->lqueue = le;
120   else jcl->lqueue_end->next = le;
121   jcl->lqueue_end = le;
122   pthread_mutex_unlock(&jcl->lqueue_lock);
123   sem_post(&jcl->lqueue_sem);
124   return len;
125 }
126 static int
127 noit_livestream_logio_close(noit_log_stream_t ls) {
128   noit_livestream_closure_t *jcl;
129   jcl = noit_log_stream_get_ctx(ls);
130   if(jcl) noit_livestream_closure_free(jcl);
131   noit_log_stream_set_ctx(ls, NULL);
132   return 0;
133 }
134 static logops_t noit_livestream_logio_ops = {
135   noit_livestream_logio_open,
136   noit_livestream_logio_reopen,
137   noit_livestream_logio_write,
138   NULL,
139   noit_livestream_logio_close,
140   NULL,
141   NULL
142 };
143
144 void
145 noit_livestream_listener_init() {
146   noit_register_logops("noit_livestream", &noit_livestream_logio_ops);
147   eventer_name_callback("livestream_transit/1.0", noit_livestream_handler);
148   noit_control_dispatch_delegate(noit_control_dispatch,
149                                  NOIT_LIVESTREAM_DATA_FEED,
150                                  noit_livestream_handler);
151 }
152
153 static int
154 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
155   int w, sofar = 0;
156   while(l > sofar) {
157     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
158     if(w <= 0) return w;
159     sofar += w;
160   }
161   return sofar;
162 }
163 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
164
165 void *
166 noit_livestream_thread_main(void *e_vptr) {
167   int mask;
168   eventer_t e = e_vptr;
169   acceptor_closure_t *ac = e->closure;
170   noit_livestream_closure_t *jcl = ac->service_ctx;
171
172   /* Go into blocking mode */
173   if(eventer_set_fd_blocking(e->fd) == -1) {
174     noitL(noit_error, "failed setting livestream to blocking: [%d] [%s]\n",
175           errno, strerror(errno));
176     goto alldone;
177   }
178
179   while(1) {
180     u_int32_t netlen;
181     struct log_entry *le = NULL;
182     int rv;
183    
184     sem_wait(&jcl->lqueue_sem);
185     pthread_mutex_lock(&jcl->lqueue_lock);
186     if(jcl->lqueue) {
187       /* If there are items, pop and advance the header pointer */
188       le = jcl->lqueue;
189       jcl->lqueue = jcl->lqueue->next;
190       if(!jcl->lqueue) jcl->lqueue_end = NULL;
191     }
192     pthread_mutex_unlock(&jcl->lqueue_lock);
193
194     if(!le) continue;
195
196     /* Here we actually push the message */
197     netlen = htonl(le->len);
198     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
199       noitL(noit_error, "Error writing le header over SSL %d != %d\n",
200             rv, (int)sizeof(netlen));
201       goto alldone;
202     }
203     if((rv = Ewrite(le->buff, le->len)) != le->len) {
204       noitL(noit_error, "Error writing livestream message over SSL %d != %d\n",
205             rv, le->len);
206       goto alldone;
207     }
208   }
209
210  alldone:
211   e->opset->close(e->fd, &mask, e);
212   jcl->wants_shutdown = 1;
213   acceptor_closure_free(ac);
214   /* Our semaphores are counting semaphores, not locks. */
215   /* coverity[missing_unlock] */
216   return NULL;
217 }
218
219 int
220 noit_livestream_handler(eventer_t e, int mask, void *closure,
221                         struct timeval *now) {
222   eventer_t newe;
223   pthread_t tid;
224   pthread_attr_t tattr;
225   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
226   acceptor_closure_t *ac = closure;
227   noit_livestream_closure_t *jcl = ac->service_ctx;
228
229   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
230 socket_error:
231     /* Exceptions cause us to simply snip the connection */
232     eventer_remove_fd(e->fd);
233     e->opset->close(e->fd, &newmask, e);
234     if(jcl) noit_livestream_closure_free(jcl);
235     acceptor_closure_free(ac);
236     return 0;
237   }
238
239   if(!ac->service_ctx || !jcl->feed) {
240     int len;
241     if(!ac->service_ctx) ac->service_ctx = noit_livestream_closure_alloc();
242     jcl = ac->service_ctx;
243     /* Setup logger to this channel */
244     noitL(noit_debug, "livestream initializing on fd %d\n", e->fd);
245     if(!jcl->period_read) {
246       u_int32_t nperiod;
247       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
248       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
249       if(len != sizeof(nperiod)) goto socket_error;
250       jcl->period = ntohl(nperiod);
251       jcl->period_read = noit_true;
252       noitL(noit_debug, "livestream initializing on fd %d [period %d]\n",
253             e->fd, jcl->period);
254     }
255     while(jcl->uuid_read < 36) {
256       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
257       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
258       if(len == 0) goto socket_error;
259       jcl->uuid_read += len;
260     }
261     jcl->uuid_str[36] = '\0';
262     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
263       noitL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
264       goto socket_error;
265     }
266     noitL(noit_debug, "livestream initializing on fd %d [uuid %s]\n",
267           e->fd, jcl->uuid_str);
268
269     jcl->feed = malloc(32);
270     snprintf(jcl->feed, 32, "livestream/%d", noit_atomic_inc32(&ls_counter));
271     noit_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
272                         jcl, NULL);
273
274
275     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
276     if(!jcl->check) {
277       e->opset->close(e->fd, &newmask, e);
278       return 0;
279     }
280     /* This check must be watched from the livestream */
281     noit_check_transient_add_feed(jcl->check, jcl->feed);
282     /* Note the check */
283     noit_check_log_check(jcl->check);
284     /* kick it off, if it isn't running already */
285     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
286   }
287
288   eventer_remove_fd(e->fd);
289   newe = eventer_alloc();
290   memcpy(newe, e, sizeof(*e));
291   pthread_attr_init(&tattr);
292   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
293   if(pthread_create(&tid, &tattr, noit_livestream_thread_main, newe) == 0) {
294     return 0;
295   }
296
297   noit_check_transient_remove_feed(jcl->check, jcl->feed);
298   noit_livestream_closure_free(jcl);
299   /* Undo our dup */
300   eventer_free(newe);
301   /* Creating the thread failed, close it down and deschedule. */
302   e->opset->close(e->fd, &newmask, e);
303   return 0;
304 }
Note: See TracBrowser for help on using the browser.