root/src/stratcon_realtime_http.c

Revision 88a71780101cbf23034aa0cb840f9f0368fda2dd, 14.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fixes #126

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "noit_jlog_listener.h"
40 #include "noit_listener.h"
41 #include "noit_http.h"
42 #include "noit_livestream_listener.h"
43 #include "stratcon_realtime_http.h"
44 #include "stratcon_jlog_streamer.h"
45 #include "stratcon_datastore.h"
46
47 #include <unistd.h>
48 #include <assert.h>
49 #include <errno.h>
50 #include <sys/types.h>
51 #include <sys/socket.h>
52 #ifdef HAVE_SYS_FILIO_H
53 #include <sys/filio.h>
54 #endif
55 #include <netinet/in.h>
56 #include <sys/un.h>
57 #include <arpa/inet.h>
58
59
60 typedef struct realtime_recv_ctx_t {
61   int bytes_expected;
62   int bytes_read;
63   int bytes_written;
64   int body_len;
65   char *buffer;         /* These guys are for doing partial reads */
66
67   enum {
68     REALTIME_HTTP_WANT_INITIATE = 0,
69     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
70     REALTIME_HTTP_WANT_SEND_UUID = 2,
71     REALTIME_HTTP_WANT_HEADER = 3,
72     REALTIME_HTTP_WANT_BODY = 4,
73   } state;
74   int count;            /* Number of jlog messages we need to read */
75   noit_http_session_ctx *ctx;
76   struct realtime_tracker *rt;
77 } realtime_recv_ctx_t;
78
79 typedef struct realtime_context {
80   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
81   struct realtime_tracker *checklist;
82 } realtime_context;
83
84 static realtime_context *alloc_realtime_context() {
85   realtime_context *ctx;
86   return calloc(sizeof(*ctx), 1);
87 }
88 static void free_realtime_tracker(struct realtime_tracker *rt) {
89   if(rt->noit) free(rt->noit);
90   free(rt);
91 }
92 static void clear_realtime_context(realtime_context *rc) {
93   while(rc->checklist) {
94     struct realtime_tracker *tofree;
95     tofree = rc->checklist;
96     rc->checklist = tofree->next;
97     free_realtime_tracker(tofree);
98   }
99 }
100 int
101 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) {
102   char buffer[1024];
103   char *scp, *ecp, *token;
104   int len;
105
106 #define BAIL_HTTP_WRITE do { \
107   noitL(noit_error, "javascript emit failed: %s:%s:%d\n", \
108         __FILE__, __FUNCTION__, __LINE__); \
109   return -1; \
110 } while(0)
111
112 #define PROCESS_NEXT_FIELD(t,l) do { \
113   if(!*scp) goto bad_row; \
114   ecp = strchr(scp, '\t'); \
115   if(!ecp) goto bad_row; \
116   t = scp; \
117   l = (ecp-scp); \
118   scp = ecp + 1; \
119 } while(0)
120 #define PROCESS_LAST_FIELD(t,l) do { \
121   if(!*scp) ecp = scp; \
122   else { \
123     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
124     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
125   } \
126   t = scp; \
127   l = (ecp-scp); \
128 } while(0)
129
130   scp = buff;
131   PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
132   if(buff[0] == 'M') {
133     snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('");
134     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
135
136     /* Time */
137     PROCESS_NEXT_FIELD(token,len);
138     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
139
140     snprintf(buffer, sizeof(buffer), "', '");
141     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
142
143     /* UUID */
144     PROCESS_NEXT_FIELD(token,len);
145     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
146
147     snprintf(buffer, sizeof(buffer), "', '");
148     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
149
150     /* name */
151     PROCESS_NEXT_FIELD(token,len);
152     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
153
154     snprintf(buffer, sizeof(buffer), "', '");
155     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
156
157     PROCESS_NEXT_FIELD(token,len); /* skip type */
158     PROCESS_LAST_FIELD(token,len); /* value */
159     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
160
161     snprintf(buffer, sizeof(buffer), "');</script>\n");
162     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
163
164     if(noit_http_response_flush(ctx, noit_false) == noit_false) BAIL_HTTP_WRITE;
165   }
166
167   return 0;
168
169  bad_row:
170   BAIL_HTTP_WRITE;
171   if(0) {
172     noit_http_response_end(ctx);
173     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
174     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
175     return 0;
176   }
177 }
178 int
179 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) {
180   int len, cnt = 0;
181   char *cp, *copy, *interest, *brk;
182   if(strncmp(uri, "/data/", 6)) return 0;
183   cp = uri + 6;
184   len = strlen(cp);
185   copy = alloca(len + 1);
186   if(!copy) return 0;
187   memcpy(copy, cp, len);
188   copy[len] = '\0';
189
190   for (interest = strtok_r(copy, "/", &brk);
191        interest;
192        interest = strtok_r(NULL, "/", &brk)) {
193     struct realtime_tracker *node;
194     char *interval;
195
196     interval = strchr(interest, '@');
197     if(!interval)
198       interval = "5000";
199     else
200       *interval++ = '\0';
201     node = calloc(1, sizeof(*node));
202     node->rc = rc;
203     node->sid = atoi(interest);
204     node->interval = atoi(interval);
205     node->next = rc->checklist;
206     rc->checklist = node;
207     cnt++;
208   }
209   return cnt;
210 }
211 static void
212 free_realtime_recv_ctx(void *vctx) {
213   realtime_recv_ctx_t *rrctx = vctx;
214   noit_atomic_dec32(&rrctx->ctx->ref_cnt);
215   free(rrctx);
216 }
217 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
218 static int
219 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
220   int len, mask;
221   while(ctx->bytes_read < ctx->bytes_expected) {
222     len = Eread(ctx->buffer + ctx->bytes_read,
223                 ctx->bytes_expected - ctx->bytes_read);
224     if(len < 0) {
225       *newmask = mask;
226       return -1;
227     }
228     /* if we get 0 inside SSL, and there was a real error, we
229      * will actually get a -1 here.
230      * if(len == 0) return ctx->bytes_read;
231      */
232     ctx->bytes_read += len;
233   }
234   assert(ctx->bytes_read == ctx->bytes_expected);
235   return ctx->bytes_read;
236 }
237 #define FULLREAD(e,ctx,size) do { \
238   int mask, len; \
239   if(!ctx->bytes_expected) { \
240     ctx->bytes_expected = size; \
241     if(ctx->buffer) free(ctx->buffer); \
242     ctx->buffer = malloc(size + 1); \
243     if(ctx->buffer == NULL) { \
244       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
245       goto socket_error; \
246     } \
247     ctx->buffer[size] = '\0'; \
248   } \
249   len = __read_on_ctx(e, ctx, &mask); \
250   if(len < 0) { \
251     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
252     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
253     goto socket_error; \
254   } \
255   ctx->bytes_read = 0; \
256   ctx->bytes_expected = 0; \
257   if(len != size) { \
258     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
259           ctx->state, len, (unsigned long)size); \
260     goto socket_error; \
261   } \
262 } while(0)
263
264 int
265 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
266                                struct timeval *now) {
267   static u_int32_t livestream_cmd = 0;
268   noit_connection_ctx_t *nctx = closure;
269   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
270   int len;
271   u_int32_t nint;
272   char uuid_str[37];
273
274   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
275
276   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
277  socket_error:
278     ctx->state = REALTIME_HTTP_WANT_INITIATE;
279     ctx->count = 0;
280     ctx->bytes_read = 0;
281     ctx->bytes_written = 0;
282     ctx->bytes_expected = 0;
283     if(ctx->buffer) free(ctx->buffer);
284     ctx->buffer = NULL;
285     free_realtime_recv_ctx(ctx);
286     eventer_remove_fd(e->fd);
287     e->opset->close(e->fd, &mask, e);
288     return 0;
289   }
290
291 #define full_nb_write(data, wlen) do { \
292   if(!ctx->bytes_expected) { \
293     ctx->bytes_written = 0; \
294     ctx->bytes_expected = wlen; \
295   } \
296   while(ctx->bytes_written < ctx->bytes_expected) { \
297     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
298                                        ctx->bytes_expected - ctx->bytes_written, \
299                                        &mask, e)) && errno == EINTR); \
300     if(len < 0) { \
301       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
302       goto socket_error; \
303     } \
304     ctx->bytes_written += len; \
305   } \
306   if(ctx->bytes_written != ctx->bytes_expected) { \
307     noitL(noit_error, "short write on initiating stream [%d != %d].\n", \
308           ctx->bytes_written, ctx->bytes_expected); \
309     goto socket_error; \
310   } \
311   ctx->bytes_expected = 0; \
312 } while(0)
313
314   while(1) {
315     u_int32_t net_body_len;
316
317     switch(ctx->state) {
318       case REALTIME_HTTP_WANT_INITIATE:
319         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
320         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
321       case REALTIME_HTTP_WANT_SEND_INTERVAL:
322         nint = htonl(ctx->rt->interval);
323         full_nb_write(&nint, sizeof(nint));
324         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
325       case REALTIME_HTTP_WANT_SEND_UUID:
326         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
327         full_nb_write(uuid_str, 36);
328         ctx->state = REALTIME_HTTP_WANT_HEADER;
329       case REALTIME_HTTP_WANT_HEADER:
330         FULLREAD(e, ctx, sizeof(u_int32_t));
331         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
332         ctx->body_len = ntohl(net_body_len);
333         free(ctx->buffer); ctx->buffer = NULL;
334         ctx->state = REALTIME_HTTP_WANT_BODY;
335         break;
336       case REALTIME_HTTP_WANT_BODY:
337         FULLREAD(e, ctx, ctx->body_len);
338         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error;
339         free(ctx->buffer); ctx->buffer = NULL;
340         ctx->state = REALTIME_HTTP_WANT_HEADER;
341         break;
342     }
343   }
344
345 }
346
347 int
348 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
349                                    struct timeval *now) {
350   noit_http_session_ctx *ctx = closure;
351   realtime_context *rc = ctx->dispatcher_closure;
352   struct realtime_tracker *node;
353
354   for(node = rc->checklist; node; node = node->next) {
355     if(node->noit) {
356       realtime_recv_ctx_t *rrctx;
357       rrctx = calloc(1, sizeof(*rrctx));
358       rrctx->ctx = ctx;
359       rrctx->rt = node;
360       stratcon_streamer_connection(NULL, node->noit,
361                                    stratcon_realtime_recv_handler,
362                                    NULL, rrctx,
363                                    free_realtime_recv_ctx);
364     }
365     else
366       noit_atomic_dec32(&ctx->ref_cnt);
367   }
368   if(ctx->ref_cnt == 1) {
369     noit_http_response_end(ctx);
370     clear_realtime_context(rc);
371     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
372     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
373   }
374   return 0;
375 }
376 int
377 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
378   const char *key, *value;
379   realtime_context *rc = ctx->dispatcher_closure;
380   int klen;
381   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
382   noit_http_request *req = &ctx->req;
383
384   if(rc->setup == RC_INITIAL) {
385     eventer_t completion;
386     struct realtime_tracker *node;
387     char c[1024];
388     int num_interests;
389
390     num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str);
391     if(num_interests == 0) {
392       noit_http_response_status_set(ctx, 404, "OK");
393       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
394       noit_http_response_end(ctx);
395       return 0;
396     }
397
398     noitL(noit_error, "http: %s %s %s\n",
399           req->method_str, req->uri_str, req->protocol_str);
400     while(noit_hash_next_str(&req->headers, &iter, &key, &klen, &value)) {
401       noitL(noit_error, "http: [%s: %s]\n", key, value);
402     }
403     noit_http_response_status_set(ctx, 200, "OK");
404     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
405     /*noit_http_response_option_set(ctx, NOIT_HTTP_GZIP);*/
406     /*noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);*/
407     noit_http_response_header_set(ctx, "Content-Type", "text/html");
408
409     snprintf(c, sizeof(c), "<html><head><script>document.domain='omniti.com';</script></head><body>\n");
410     noit_http_response_append(ctx, c, strlen(c));
411
412     /* this dumb crap is to make some browsers happy (Safari) */
413     memset(c, ' ', sizeof(c));
414     noit_http_response_append(ctx, c, sizeof(c));
415     noit_http_response_flush(ctx, noit_false);
416
417     rc->setup = RC_REQ_RECV;
418     /* Each interest references the ctx */
419     for(node = rc->checklist; node; node = node->next) {
420       noit_atomic_inc32(&ctx->ref_cnt);
421       stratcon_datastore_push(DS_OP_FIND, NULL, node);
422       noitL(noit_error, "Resolving sid: %d\n", node->sid);
423     }
424     completion = eventer_alloc();
425     completion->mask = EVENTER_TIMER;
426     completion->callback = stratcon_realtime_http_postresolve;
427     completion->closure = ctx;
428     gettimeofday(&completion->whence, NULL);
429     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion);
430   }
431   return EVENTER_EXCEPTION;
432 }
433
434 int
435 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
436                                struct timeval *now) {
437   acceptor_closure_t *ac = closure;
438   noit_http_session_ctx *http_ctx = ac->service_ctx;
439   if(!http_ctx) {
440     http_ctx = ac->service_ctx =
441       noit_http_session_ctx_new(stratcon_request_dispatcher,
442                                 alloc_realtime_context(),
443                                 e);
444   }
445   return http_ctx->drive(e, mask, http_ctx, now);
446 }
447
448 void
449 stratcon_realtime_http_init(const char *toplevel) {
450   eventer_name_callback("stratcon_realtime_http",
451                         stratcon_realtime_http_handler);
452 }
Note: See TracBrowser for help on using the browser.