root/src/noit_livestream_listener.c

Revision 84d6f13ffc15b3f1bb50df2ac835b56f70179b3e, 7.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

closes #78

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_listener.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "utils/noit_sem.h"
12 #include "noit_livestream_listener.h"
13 #include "noit_check.h"
14
15 #include <unistd.h>
16 #include <sys/ioctl.h>
17 #include <errno.h>
18
19 static noit_atomic32_t ls_counter = 0;
20
21 struct log_entry {
22   int len;
23   char *buff;
24   struct log_entry *next;
25 };
26
27 typedef struct {
28   u_int32_t period;
29   struct log_entry *lqueue;
30   struct log_entry *lqueue_end;
31   sem_t lqueue_sem;
32   pthread_mutex_t lqueue_lock;
33   int uuid_read;
34   char uuid_str[37];
35   char *feed;
36   uuid_t uuid;
37   noit_check_t *check;
38   int wants_shutdown;
39 } noit_livestream_closure_t;
40
41 noit_livestream_closure_t *
42 noit_livestream_closure_alloc(void) {
43   noit_livestream_closure_t *jcl;
44   jcl = calloc(1, sizeof(*jcl));
45   pthread_mutex_init(&jcl->lqueue_lock, NULL);
46   sem_init(&jcl->lqueue_sem, 0, 0);
47   return jcl;
48 }
49
50 void
51 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
52   struct log_entry *tofree;
53   while(jcl->lqueue) {
54     tofree = jcl->lqueue;
55     jcl->lqueue = jcl->lqueue->next;
56     free(tofree->buff);
57     free(tofree);
58   }
59   free(jcl);
60 }
61
62 static int
63 noit_livestream_logio_open(noit_log_stream_t ls) {
64   return 0;
65 }
66 static int
67 noit_livestream_logio_reopen(noit_log_stream_t ls) {
68   /* no op */
69   return 0;
70 }
71 static int
72 noit_livestream_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
73   noit_livestream_closure_t *jcl = ls->op_ctx;
74   struct log_entry *le;
75   if(!jcl) return 0;
76
77   if(jcl->wants_shutdown) {
78     /* This has been terminated by the client, _fail here_ */
79     return 0;
80   }
81
82   le = calloc(1, sizeof(*le));
83   le->len = len;
84   le->buff = malloc(len);
85   memcpy(le->buff, buf, len);
86   le->next = NULL;
87   pthread_mutex_lock(&jcl->lqueue_lock);
88   if(!jcl->lqueue_end) jcl->lqueue = le;
89   else jcl->lqueue_end->next = le;
90   jcl->lqueue_end = le;
91   pthread_mutex_unlock(&jcl->lqueue_lock);
92   sem_post(&jcl->lqueue_sem);
93   return len;
94 }
95 static int
96 noit_livestream_logio_close(noit_log_stream_t ls) {
97   noit_livestream_closure_t *jcl = ls->op_ctx;
98   if(jcl) noit_livestream_closure_free(jcl);
99   ls->op_ctx = NULL;
100   return 0;
101 }
102 static logops_t noit_livestream_logio_ops = {
103   noit_livestream_logio_open,
104   noit_livestream_logio_reopen,
105   noit_livestream_logio_write,
106   noit_livestream_logio_close,
107   NULL
108 };
109
110 void
111 noit_livestream_listener_init() {
112   noit_register_logops("noit_livestream", &noit_livestream_logio_ops);
113   eventer_name_callback("livestream_transit", noit_livestream_handler);
114   noit_control_dispatch_delegate(noit_control_dispatch,
115                                  NOIT_LIVESTREAM_DATA_FEED,
116                                  noit_livestream_handler);
117 }
118
119 static int
120 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
121   int w, sofar = 0;
122   while(l > sofar) {
123     w = e->opset->write(e->fd, b + sofar, l - sofar, mask, e);
124     if(w <= 0) return w;
125     sofar += w;
126   }
127   return sofar;
128 }
129 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
130
131 void *
132 noit_livestream_thread_main(void *e_vptr) {
133   int mask;
134   eventer_t e = e_vptr;
135   acceptor_closure_t *ac = e->closure;
136   noit_livestream_closure_t *jcl = ac->service_ctx;
137   long off = 0;
138
139   /* Go into blocking mode */
140   if(ioctl(e->fd, FIONBIO, &off) == -1) {
141     noitL(noit_error, "ioctl failed setting livestream to blocking: [%d] [%s]\n",
142           errno, strerror(errno));
143     goto alldone;
144   }
145
146   while(1) {
147     u_int32_t netlen;
148     struct log_entry *le = NULL;
149     int rv;
150    
151     sem_wait(&jcl->lqueue_sem);
152     pthread_mutex_lock(&jcl->lqueue_lock);
153     if(jcl->lqueue) {
154       /* If there are items, pop and advance the header pointer */
155       le = jcl->lqueue;
156       jcl->lqueue = jcl->lqueue->next;
157       if(!jcl->lqueue) jcl->lqueue_end = NULL;
158     }
159     pthread_mutex_unlock(&jcl->lqueue_lock);
160
161     if(!le) continue;
162
163     /* Here we actually push the message */
164     netlen = htonl(le->len);
165     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
166       noitL(noit_error, "Error writing le header over SSL %d != %d\n",
167             rv, (int)sizeof(netlen));
168       goto alldone;
169     }
170     if((rv = Ewrite(le->buff, le->len)) != le->len) {
171       noitL(noit_error, "Error writing livestream message over SSL %d != %d\n",
172             rv, le->len);
173       goto alldone;
174     }
175   }
176
177  alldone:
178   e->opset->close(e->fd, &mask, e);
179   jcl->wants_shutdown = 1;
180   if(ac) acceptor_closure_free(ac);
181   return NULL;
182 }
183
184 int
185 noit_livestream_handler(eventer_t e, int mask, void *closure,
186                         struct timeval *now) {
187   eventer_t newe;
188   pthread_t tid;
189   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
190   acceptor_closure_t *ac = closure;
191   noit_livestream_closure_t *jcl = ac->service_ctx;
192
193   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
194 socket_error:
195     /* Exceptions cause us to simply snip the connection */
196     eventer_remove_fd(e->fd);
197     e->opset->close(e->fd, &newmask, e);
198     if(jcl) noit_livestream_closure_free(jcl);
199     if(ac) acceptor_closure_free(ac);
200     return 0;
201   }
202
203   if(!ac->service_ctx || !jcl->feed) {
204     int len;
205     jcl = ac->service_ctx = noit_livestream_closure_alloc();
206     /* Setup logger to this channel */
207     if(!jcl->period) {
208       u_int32_t nperiod;
209       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
210       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
211       if(len != sizeof(nperiod)) goto socket_error;
212       jcl->period = ntohl(nperiod);
213       if(!jcl->period) {
214         noitL(noit_error, "period of 0 specified in livestream.  not allowed.\n");
215         goto socket_error;
216       }
217     }
218     while(jcl->uuid_read < 36) {
219       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
220       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
221       if(len == 0) goto socket_error;
222       jcl->uuid_read += len;
223     }
224     jcl->uuid_str[36] = '\0';
225     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
226       noitL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
227       goto socket_error;
228     }
229
230     jcl->feed = malloc(32);
231     snprintf(jcl->feed, 32, "livestream/%d", noit_atomic_inc32(&ls_counter));
232     noit_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
233                         jcl, NULL);
234
235
236     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
237     /* This check must be watched from the livestream */
238     noit_check_transient_add_feed(jcl->check, jcl->feed);
239     /* Note the check */
240     noit_check_log_check(jcl->check);
241     /* kick it off, if it isn't running already */
242     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
243   }
244
245   eventer_remove_fd(e->fd);
246   newe = eventer_alloc();
247   memcpy(newe, e, sizeof(*e));
248   if(pthread_create(&tid, NULL, noit_livestream_thread_main, newe) == 0) {
249     return 0;
250   }
251
252   noit_check_transient_remove_feed(jcl->check, jcl->feed);
253   noit_livestream_closure_free(jcl);
254   /* Undo our dup */
255   eventer_free(newe);
256   /* Creating the thread failed, close it down and deschedule. */
257   e->opset->close(e->fd, &newmask, e);
258   return 0;
259 }
Note: See TracBrowser for help on using the browser.