root/src/noit_livestream_listener.c

Revision 15ce866645e39e58dcb2b87b790417ad2590adcc, 6.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

start working. refs #71.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_listener.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "utils/noit_sem.h"
12 #include "noit_livestream_listener.h"
13 #include "noit_check.h"
14
15 #include <unistd.h>
16 #include <sys/ioctl.h>
17 #include <errno.h>
18
19 struct log_entry {
20   int len;
21   char *buff;
22   struct log_entry *next;
23 };
24
25 typedef struct {
26   u_int32_t period;
27   struct log_entry *lqueue;
28   struct log_entry *lqueue_end;
29   sem_t lqueue_sem;
30   pthread_mutex_t lqueue_lock;
31   int uuid_read;
32   char uuid_str[37];
33   char *feed;
34   uuid_t uuid;
35   noit_check_t *check;
36   int wants_shutdown;
37 } noit_livestream_closure_t;
38
39 static int
40 noit_livestream_logio_open(noit_log_stream_t ls) {
41   return 0;
42 }
43 static int
44 noit_livestream_logio_reopen(noit_log_stream_t ls) {
45   /* no op */
46   return 0;
47 }
48 static int
49 noit_livestream_logio_write(noit_log_stream_t ls, const void *buf, size_t len) {
50   noit_livestream_closure_t *jcl = ls->op_ctx;
51   struct log_entry *le;
52   if(!jcl) return 0;
53
54   le = calloc(1, sizeof(*le));
55   le->len = len;
56   le->buff = malloc(len);
57   memcpy(le->buff, buf, len);
58   le->next = NULL;
59   pthread_mutex_lock(&jcl->lqueue_lock);
60   if(!jcl->lqueue_end) jcl->lqueue = le;
61   else jcl->lqueue_end->next = le;
62   jcl->lqueue_end = le;
63   pthread_mutex_unlock(&jcl->lqueue_lock);
64   sem_post(&jcl->lqueue_sem);
65   return len;
66 }
67 static int
68 noit_livestream_logio_close(noit_log_stream_t ls) {
69   ls->op_ctx = NULL;
70   return 0;
71 }
72 static logops_t noit_livestream_logio_ops = {
73   noit_livestream_logio_open,
74   noit_livestream_logio_reopen,
75   noit_livestream_logio_write,
76   noit_livestream_logio_close,
77 };
78
79 void
80 noit_livestream_listener_init() {
81   noit_register_logops("noit_livestream", &noit_livestream_logio_ops);
82   eventer_name_callback("livestream_transit", noit_livestream_handler);
83   noit_control_dispatch_delegate(noit_control_dispatch,
84                                  NOIT_LIVESTREAM_DATA_FEED,
85                                  noit_livestream_handler);
86 }
87
88 noit_livestream_closure_t *
89 noit_livestream_closure_alloc(void) {
90   noit_livestream_closure_t *jcl;
91   jcl = calloc(1, sizeof(*jcl));
92   return jcl;
93 }
94
95 void
96 noit_livestream_closure_free(noit_livestream_closure_t *jcl) {
97   struct log_entry *tofree;
98   while(jcl->lqueue) {
99     tofree = jcl->lqueue;
100     jcl->lqueue = jcl->lqueue->next;
101     free(tofree->buff);
102     free(tofree);
103   }
104   free(jcl);
105 }
106
107 static int
108 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
109   int w, sofar = 0;
110   while(l > sofar) {
111     w = e->opset->write(e->fd, b + sofar, l - sofar, mask, e);
112     if(w <= 0) return w;
113     sofar += w;
114   }
115   return sofar;
116 }
117 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
118
119 void *
120 noit_livestream_thread_main(void *e_vptr) {
121   int mask;
122   eventer_t e = e_vptr;
123   acceptor_closure_t *ac = e->closure;
124   noit_livestream_closure_t *jcl = ac->service_ctx;
125   long off = 0;
126
127   /* Go into blocking mode */
128   ioctl(e->fd, FIONBIO, &off);
129
130   while(1) {
131     u_int32_t netlen;
132     struct log_entry *le = NULL;
133     int rv;
134    
135     sem_wait(&jcl->lqueue_sem);
136     pthread_mutex_lock(&jcl->lqueue_lock);
137     if(jcl->lqueue) {
138       /* If there are items, pop and advance the header pointer */
139       le = jcl->lqueue;
140       jcl->lqueue = jcl->lqueue->next;
141       if(!jcl->lqueue) jcl->lqueue_end = NULL;
142     }
143     pthread_mutex_unlock(&jcl->lqueue_lock);
144
145     if(!le) continue;
146
147     /* Here we actually push the message */
148     netlen = htonl(le->len);
149     if((rv = Ewrite(&netlen, sizeof(netlen))) != sizeof(netlen)) {
150       noitL(noit_error, "Error writing le header over SSL %d != %d\n",
151             rv, (int)sizeof(netlen));
152       goto alldone;
153     }
154     if((rv = Ewrite(le->buff, le->len)) != le->len) {
155       noitL(noit_error, "Error writing livestream message over SSL %d != %d\n",
156             rv, le->len);
157       goto alldone;
158     }
159   }
160
161  alldone:
162   e->opset->close(e->fd, &mask, e);
163   jcl->wants_shutdown = 1;
164   if(ac) acceptor_closure_free(ac);
165   return NULL;
166 }
167
168 int
169 noit_livestream_handler(eventer_t e, int mask, void *closure,
170                         struct timeval *now) {
171   eventer_t newe;
172   pthread_t tid;
173   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
174   acceptor_closure_t *ac = closure;
175   noit_livestream_closure_t *jcl = ac->service_ctx;
176
177   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
178 socket_error:
179     /* Exceptions cause us to simply snip the connection */
180     eventer_remove_fd(e->fd);
181     e->opset->close(e->fd, &newmask, e);
182     if(jcl) noit_livestream_closure_free(jcl);
183     if(ac) acceptor_closure_free(ac);
184     return 0;
185   }
186
187   if(!ac->service_ctx || !jcl->feed) {
188     int len;
189     jcl = ac->service_ctx = noit_livestream_closure_alloc();
190     /* Setup logger to this channel */
191     if(!jcl->period) {
192       u_int32_t nperiod;
193       len = e->opset->read(e->fd, &nperiod, sizeof(nperiod), &mask, e);
194       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
195       if(len != sizeof(nperiod)) goto socket_error;
196       jcl->period = ntohl(nperiod);
197       if(!jcl->period) {
198         noitL(noit_error, "period of 0 specified in livestream.  not allowed.\n");
199         goto socket_error;
200       }
201     }
202     while(jcl->uuid_read < 36) {
203       len = e->opset->read(e->fd, jcl->uuid_str + jcl->uuid_read, 36 - jcl->uuid_read, &mask, e);
204       if(len == -1 && errno == EAGAIN) return mask | EVENTER_EXCEPTION;
205       if(len == 0) goto socket_error;
206       jcl->uuid_read += len;
207     }
208     jcl->uuid_str[36] = '\0';
209     if(uuid_parse(jcl->uuid_str, jcl->uuid)) {
210       noitL(noit_error, "bad uuid received in livestream handler '%s'\n", jcl->uuid_str);
211       goto socket_error;
212     }
213
214     jcl->feed = malloc(32);
215     snprintf(jcl->feed, 32, "livestream/%d", e->fd);
216     noit_log_stream_new(jcl->feed, "noit_livestream", jcl->feed,
217                         jcl, NULL);
218
219
220     jcl->check = noit_check_watch(jcl->uuid, jcl->period);
221     /* This check must be watched from the livestream */
222     noit_check_transient_add_feed(jcl->check, jcl->feed);
223     /* Note the check */
224     noit_check_log_check(jcl->check);
225     /* kick it off, if it isn't running already */
226     if(!NOIT_CHECK_LIVE(jcl->check)) noit_check_activate(jcl->check);
227   }
228
229   eventer_remove_fd(e->fd);
230   newe = eventer_alloc();
231   memcpy(newe, e, sizeof(*e));
232   if(pthread_create(&tid, NULL, noit_livestream_thread_main, newe) == 0) {
233     return 0;
234   }
235
236   /* Undo our dup */
237   eventer_free(newe);
238   /* Creating the thread failed, close it down and deschedule. */
239   e->opset->close(e->fd, &newmask, e);
240   return 0;
241 }
Note: See TracBrowser for help on using the browser.