root/src/noit_livestream_listener.c

Revision e7c26b45d2a6c8cef064cf2253d77bf8f360bef8, 7.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fix duplicates by not reusing stream names, refs #71

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_listener.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "utils/noit_sem.h"
12 #include "noit_livestream_listener.h"
13 #include "noit_check.h"
14
15 #include <unistd.h>
16 #include <sys/ioctl.h>
17 #include <errno.h>
18
19 static noit_atomic32_t ls_counter = 0;
20
21 struct log_entry {
22   int len;
23   char *buff;
24   struct log_entry *next;
25 };
26
27 typedef struct {
28   u_int32_t period;
29   struct log_entry *lqueue;
30   struct log_entry *lqueue_end;
31   sem_t lqueue_sem;
32   pthread_mutex_t lqueue_lock;
33   int uuid_read;
34   char uuid_str[37];
35   char *feed;
36   uuid_t uuid;
37   noit_check_t *check;
38   int wants_shutdown;
39 } noit_livestream_closure_t;
40
41 noit_livestream_closure_t *
42 noit_livestream_closure_alloc(void) {
43   noit_livestream_closure_t *jcl;
44   jcl = calloc(1, sizeof(*jcl));
45   pthread_mutex_init(&jcl->lqueue_lock, NULL);
46   sem_init(&jcl->lqueue_sem, 0, 0);
47   return jcl;
48 }
49
50 void
51 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
52   struct log_entry *tofree;
53   while(jcl->lqueue) {
54     tofree = jcl->lqueue;
55     jcl->lqueue = jcl->lqueue->next;
56     free(tofree->buff);
57     free(tofree);
58   }
59   free(jcl);
60 }
61
62 static int
63 noit_livestream_logio_open(noit_log_stream_t ls) {
64   return 0;
65 }
66 static int
67 noit_livestream_logio_reopen(noit_log_stream_t ls) {
68   /* no op */
69   return 0;
70 }
71 static int
72 noit_livestream_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
73   noit_livestream_closure_t *jcl = ls->op_ctx;
74   struct log_entry *le;
75   if(!jcl) return 0;
76
77   if(jcl->wants_shutdown) {
78     /* This has been terminated by the client, _fail here_ */
79     return 0;
80   }
81
82   le = calloc(1, sizeof(*le));
83   le->len = len;
84   le->buff = malloc(len);
85   memcpy(le->buff, buf, len);
86   le->next = NULL;
87   pthread_mutex_lock(&jcl->lqueue_lock);
88   if(!jcl->lqueue_end) jcl->lqueue = le;
89   else jcl->lqueue_end->next = le;
90   jcl->lqueue_end = le;
91   pthread_mutex_unlock(&jcl->lqueue_lock);
92   sem_post(&jcl->lqueue_sem);
93   return len;
94 }
95 static int
96 noit_livestream_logio_close(noit_log_stream_t ls) {
97   noit_livestream_closure_t *jcl = ls->op_ctx;
98   if(jcl) noit_livestream_closure_free(jcl);
99   ls->op_ctx = NULL;
100   return 0;
101 }
102 static logops_t noit_livestream_logio_ops = {
103   noit_livestream_logio_open,
104   noit_livestream_logio_reopen,
105   noit_livestream_logio_write,
106   noit_livestream_logio_close,
107 };
108
109 void
110 noit_livestream_listener_init() {
111   noit_register_logops("noit_livestream", &noit_livestream_logio_ops);
112   eventer_name_callback("livestream_transit", noit_livestream_handler);
113   noit_control_dispatch_delegate(noit_control_dispatch,
114                                  NOIT_LIVESTREAM_DATA_FEED,
115                                  noit_livestream_handler);
116 }
117
118 static int
119 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
120   int w, sofar = 0;
121   while(l > sofar) {
122     w = e->opset->write(e->fd, b + sofar, l - sofar, mask, e);
123     if(w <= 0) return w;
124     sofar += w;
125   }
126   return sofar;
127 }
128 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
129
130 void *
131 noit_livestream_thread_main(void *e_vptr) {
132   int mask;
133   eventer_t e = e_vptr;
134   acceptor_closure_t *ac = e->closure;
135   noit_livestream_closure_t *jcl = ac->service_ctx;
136   long off = 0;
137
138   /* Go into blocking mode */
139   if(ioctl(e->fd, FIONBIO, &off) == -1) {
140     noitL(noit_error, "ioctl failed setting livestream to blocking: [%d] [%s]\n",
141           errno, strerror(errno));
142     goto alldone;
143   }
144
145   while(1) {
146     u_int32_t netlen;
147     struct log_entry *le = NULL;
148     int rv;
149    
150     sem_wait(&jcl->lqueue_sem);
151     pthread_mutex_lock(&jcl->lqueue_lock);
152     if(jcl->lqueue) {
153       /* If there are items, pop and advance the header pointer */
154       le = jcl->lqueue;
155       jcl->lqueue = jcl->lqueue->next;
156       if(!jcl->lqueue) jcl->lqueue_end = NULL;
157     }
158     pthread_mutex_unlock(&jcl->lqueue_lock);
159
160     if(!le) continue;
161
162     /* Here we actually push the message */
163     netlen = htonl(le->len);
164     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
165       noitL(noit_error, "Error writing le header over SSL %d != %d\n",
166             rv, (int)sizeof(netlen));
167       goto alldone;
168     }
169     if((rv = Ewrite(le->buff, le->len)) != le->len) {
170       noitL(noit_error, "Error writing livestream message over SSL %d != %d\n",
171             rv, le->len);
172       goto alldone;
173     }
174   }
175
176  alldone:
177   e->opset->close(e->fd, &mask, e);
178   jcl->wants_shutdown = 1;
179   if(ac) acceptor_closure_free(ac);
180   return NULL;
181 }
182
183 int
184 noit_livestream_handler(eventer_t e, int mask, void *closure,
185                         struct timeval *now) {
186   eventer_t newe;
187   pthread_t tid;
188   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
189   acceptor_closure_t *ac = closure;
190   noit_livestream_closure_t *jcl = ac->service_ctx;
191
192   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
193 socket_error:
194     /* Exceptions cause us to simply snip the connection */
195     eventer_remove_fd(e->fd);
196     e->opset->close(e->fd, &newmask, e);
197     if(jcl) noit_livestream_closure_free(jcl);
198     if(ac) acceptor_closure_free(ac);
199     return 0;
200   }
201
202   if(!ac->service_ctx || !jcl->feed) {
203     int len;
204     jcl = ac->service_ctx = noit_livestream_closure_alloc();
205     /* Setup logger to this channel */
206     if(!jcl->period) {
207       u_int32_t nperiod;
208       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
209       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
210       if(len != sizeof(nperiod)) goto socket_error;
211       jcl->period = ntohl(nperiod);
212       if(!jcl->period) {
213         noitL(noit_error, "period of 0 specified in livestream.  not allowed.\n");
214         goto socket_error;
215       }
216     }
217     while(jcl->uuid_read < 36) {
218       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
219       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
220       if(len == 0) goto socket_error;
221       jcl->uuid_read += len;
222     }
223     jcl->uuid_str[36] = '\0';
224     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
225       noitL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
226       goto socket_error;
227     }
228
229     jcl->feed = malloc(32);
230     snprintf(jcl->feed, 32, "livestream/%d", noit_atomic_inc32(&ls_counter));
231     noit_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
232                         jcl, NULL);
233
234
235     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
236     /* This check must be watched from the livestream */
237     noit_check_transient_add_feed(jcl->check, jcl->feed);
238     /* Note the check */
239     noit_check_log_check(jcl->check);
240     /* kick it off, if it isn't running already */
241     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
242   }
243
244   eventer_remove_fd(e->fd);
245   newe = eventer_alloc();
246   memcpy(newe, e, sizeof(*e));
247   if(pthread_create(&tid, NULL, noit_livestream_thread_main, newe) == 0) {
248     return 0;
249   }
250
251   noit_check_transient_remove_feed(jcl->check, jcl->feed);
252   noit_livestream_closure_free(jcl);
253   /* Undo our dup */
254   eventer_free(newe);
255   /* Creating the thread failed, close it down and deschedule. */
256   e->opset->close(e->fd, &newmask, e);
257   return 0;
258 }
Note: See TracBrowser for help on using the browser.