root/src/stratcon_realtime_http.c

Revision 76fe6db0c89a251ee20c5204141ceb0c9c1c9cd3, 17.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

fixes #278

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "utils/noit_str.h"
39 #include "jlog/jlog.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_listener.h"
42 #include "noit_http.h"
43 #include "noit_rest.h"
44 #include "noit_livestream_listener.h"
45 #include "stratcon_realtime_http.h"
46 #include "stratcon_jlog_streamer.h"
47 #include "stratcon_datastore.h"
48
49 #include <ctype.h>
50 #include <unistd.h>
51 #include <assert.h>
52 #include <errno.h>
53 #include <sys/types.h>
54 #include <sys/socket.h>
55 #ifdef HAVE_SYS_FILIO_H
56 #include <sys/filio.h>
57 #endif
58 #include <netinet/in.h>
59 #include <sys/un.h>
60 #include <arpa/inet.h>
61
62
63 typedef struct realtime_recv_ctx_t {
64   int bytes_expected;
65   int bytes_read;
66   int bytes_written;
67   int body_len;
68   char *buffer;         /* These guys are for doing partial reads */
69
70   enum {
71     REALTIME_HTTP_WANT_INITIATE = 0,
72     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
73     REALTIME_HTTP_WANT_SEND_UUID = 2,
74     REALTIME_HTTP_WANT_HEADER = 3,
75     REALTIME_HTTP_WANT_BODY = 4,
76   } state;
77   int count;            /* Number of jlog messages we need to read */
78   u_int32_t hack_inc_id;
79   noit_http_session_ctx *ctx;
80   struct realtime_tracker *rt;
81 } realtime_recv_ctx_t;
82
83 typedef struct realtime_context {
84   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
85   struct realtime_tracker *checklist;
86   char *document_domain;
87 } realtime_context;
88
89 static realtime_context *alloc_realtime_context(const char *domain) {
90   realtime_context *ctx;
91   ctx = calloc(sizeof(*ctx), 1);
92   ctx->document_domain = strdup(domain);
93   return ctx;
94 }
95 static void free_realtime_tracker(struct realtime_tracker *rt) {
96   if(rt->noit) free(rt->noit);
97   free(rt);
98 }
99 static void clear_realtime_context(realtime_context *rc) {
100  rc->setup = RC_INITIAL;
101   while(rc->checklist) {
102     struct realtime_tracker *tofree;
103     tofree = rc->checklist;
104     rc->checklist = tofree->next;
105     free_realtime_tracker(tofree);
106   }
107   if(rc->document_domain) free(rc->document_domain);
108   rc->document_domain = NULL;
109 }
110 int
111 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff,
112                             u_int32_t inc_id) {
113   char buffer[1024];
114   char *scp, *ecp, *token;
115   int len;
116   void *vcb;
117   const char *v, *cb = NULL;
118   noit_hash_table json = NOIT_HASH_EMPTY;
119   char s_inc_id[42];
120
121   snprintf(s_inc_id, sizeof(s_inc_id), "script-%08x", inc_id);
122   if(noit_hash_retrieve(&ctx->req.querystring, "cb", strlen("cb"), &vcb))
123     cb = vcb;
124   for(v = cb; v && *v; v++)
125     if(!((*v >= '0' && *v <= '9') ||
126          (*v >= 'a' && *v <= 'z') ||
127          (*v >= 'A' && *v <= 'Z') ||
128          (*v == '_') || (*v == '.'))) {
129       cb = NULL;
130       break;
131     }
132   if(!cb) cb = "window.parent.plot_iframe_data";
133
134 #define BAIL_HTTP_WRITE do { \
135   noit_hash_destroy(&json, NULL, free); \
136   noitL(noit_error, "javascript emit failed: %s:%s:%d\n", \
137         __FILE__, __FUNCTION__, __LINE__); \
138   return -1; \
139 } while(0)
140
141 #define PROCESS_NEXT_FIELD(t,l) do { \
142   if(!*scp) goto bad_row; \
143   ecp = strchr(scp, '\t'); \
144   if(!ecp) goto bad_row; \
145   t = scp; \
146   l = (ecp-scp); \
147   scp = ecp + 1; \
148 } while(0)
149 #define PROCESS_LAST_FIELD(t,l) do { \
150   if(!*scp) ecp = scp; \
151   else { \
152     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
153     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
154   } \
155   t = scp; \
156   l = (ecp-scp); \
157 } while(0)
158
159   scp = buff;
160   PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
161   if(buff[0] == 'M') {
162     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
163     const char *key;
164     int klen, i=0;
165     void *vval;
166
167 #define ra_write(a,b) if(noit_http_response_append(ctx, a, b) == noit_false) BAIL_HTTP_WRITE
168
169     snprintf(buffer, sizeof(buffer), "<script id=\"%s\">%s({", s_inc_id, cb);
170     ra_write(buffer, strlen(buffer));
171
172     while(noit_hash_next(&ctx->req.querystring, &iter, &key, &klen, &vval)) {
173       if(!strcmp(key, "cb")) continue;
174       noit_hash_store(&json, key, klen, strdup(vval ?(char *)vval : "true"));
175     }
176     /* Time */
177     noit_hash_store(&json, "script_id", 9, strdup(s_inc_id));
178     noit_hash_store(&json, "type", 4, strdup("M"));
179     PROCESS_NEXT_FIELD(token,len);
180     noit_hash_store(&json, "time", 4, noit__strndup(token, len));
181     /* UUID */
182     PROCESS_NEXT_FIELD(token,len);
183     noit_hash_store(&json, "id", 2, noit__strndup(token, len));
184     /* name */
185     PROCESS_NEXT_FIELD(token,len);
186     noit_hash_store(&json, "metric_name", 11, noit__strndup(token, len));
187     /* type */
188     PROCESS_NEXT_FIELD(token,len);
189     noit_hash_store(&json, "metric_type", 11, noit__strndup(token, len));
190     /* value */
191     PROCESS_LAST_FIELD(token,len); /* value */
192     noit_hash_store(&json, "value", 5, noit__strndup(token, len));
193
194     memset(&iter, 0, sizeof(iter));
195     while(noit_hash_next(&json, &iter, &key, &klen, &vval)) {
196       char *val = (char *)vval;
197       if(i++) ra_write(",", 1);
198       ra_write("'", 1);
199       ra_write(key, klen);
200       ra_write("':\"", 3);
201       while(*val) {
202         if(*val == '\"' || *val == '\\') {
203           ra_write((char *)"\\", 1);
204         }
205         if(isprint(*val)) {
206           ra_write((char *)val, 1);
207         }
208         else {
209           char od[5];
210           snprintf(od, sizeof(od), "\\%03o", *((unsigned char *)val));
211           ra_write(od, strlen(od));
212         }
213         val++;
214       }
215       ra_write("\"", 1);
216     }
217     snprintf(buffer, sizeof(buffer), "});</script>\n");
218     ra_write(buffer, strlen(buffer));
219
220     if(noit_http_response_flush(ctx, noit_false) == noit_false) BAIL_HTTP_WRITE;
221   }
222
223   noit_hash_destroy(&json, NULL, free);
224   return 0;
225
226  bad_row:
227   BAIL_HTTP_WRITE;
228 }
229 int
230 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) {
231   int len, cnt = 0;
232   char *cp, *copy, *interest, *brk;
233   if(strncmp(uri, "/data/", 6)) return 0;
234   cp = uri + 6;
235   len = strlen(cp);
236   copy = alloca(len + 1);
237   if(!copy) return 0;
238   memcpy(copy, cp, len);
239   copy[len] = '\0';
240
241   for (interest = strtok_r(copy, "/", &brk);
242        interest;
243        interest = strtok_r(NULL, "/", &brk)) {
244     uuid_t in_uuid;
245     struct realtime_tracker *node;
246     char *interval;
247
248     interval = strchr(interest, '@');
249     if(!interval)
250       interval = "5000";
251     else
252       *interval++ = '\0';
253     if(uuid_parse(interest, in_uuid)) continue;
254     node = calloc(1, sizeof(*node));
255     node->rc = rc;
256     uuid_copy(node->checkid, in_uuid);
257     node->interval = atoi(interval);
258     node->next = rc->checklist;
259     rc->checklist = node;
260     cnt++;
261   }
262   return cnt;
263 }
264 static void
265 free_realtime_recv_ctx(void *vctx) {
266   realtime_recv_ctx_t *rrctx = vctx;
267   noit_http_session_ctx *ctx = rrctx->ctx;
268   realtime_context *rc = ctx->dispatcher_closure;
269
270   if(noit_atomic_dec32(&ctx->ref_cnt) == 1) {
271     noit_http_response_end(ctx);
272     clear_realtime_context(rc);
273     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE | EVENTER_EXCEPTION);
274   }
275   free(rrctx);
276 }
277 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
278 static int
279 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
280   int len, mask;
281   while(ctx->bytes_read < ctx->bytes_expected) {
282     len = Eread(ctx->buffer + ctx->bytes_read,
283                 ctx->bytes_expected - ctx->bytes_read);
284     if(len < 0) {
285       *newmask = mask;
286       return -1;
287     }
288     /* if we get 0 inside SSL, and there was a real error, we
289      * will actually get a -1 here.
290      * if(len == 0) return ctx->bytes_read;
291      */
292     ctx->bytes_read += len;
293   }
294   assert(ctx->bytes_read == ctx->bytes_expected);
295   return ctx->bytes_read;
296 }
297 #define FULLREAD(e,ctx,size) do { \
298   int mask, len; \
299   if(!ctx->bytes_expected) { \
300     ctx->bytes_expected = size; \
301     if(ctx->buffer) free(ctx->buffer); \
302     ctx->buffer = malloc(size + 1); \
303     if(ctx->buffer == NULL) { \
304       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
305       goto socket_error; \
306     } \
307     ctx->buffer[size] = '\0'; \
308   } \
309   len = __read_on_ctx(e, ctx, &mask); \
310   if(len < 0) { \
311     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
312     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
313     goto socket_error; \
314   } \
315   ctx->bytes_read = 0; \
316   ctx->bytes_expected = 0; \
317   if(len != size) { \
318     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
319           ctx->state, len, (unsigned long)size); \
320     goto socket_error; \
321   } \
322 } while(0)
323
324 int
325 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
326                                struct timeval *now) {
327   static u_int32_t livestream_cmd = 0;
328   noit_connection_ctx_t *nctx = closure;
329   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
330   int len;
331   u_int32_t nint;
332   char uuid_str[37];
333
334   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
335
336   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
337  socket_error:
338     ctx->state = REALTIME_HTTP_WANT_INITIATE;
339     ctx->count = 0;
340     ctx->bytes_read = 0;
341     ctx->bytes_written = 0;
342     ctx->bytes_expected = 0;
343     if(ctx->buffer) free(ctx->buffer);
344     ctx->buffer = NULL;
345     noit_connection_ctx_dealloc(nctx);
346     eventer_remove_fd(e->fd);
347     e->opset->close(e->fd, &mask, e);
348     return 0;
349   }
350
351 #define full_nb_write(data, wlen) do { \
352   if(!ctx->bytes_expected) { \
353     ctx->bytes_written = 0; \
354     ctx->bytes_expected = wlen; \
355   } \
356   while(ctx->bytes_written < ctx->bytes_expected) { \
357     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
358                                        ctx->bytes_expected - ctx->bytes_written, \
359                                        &mask, e)) && errno == EINTR); \
360     if(len < 0) { \
361       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
362       goto socket_error; \
363     } \
364     ctx->bytes_written += len; \
365   } \
366   if(ctx->bytes_written != ctx->bytes_expected) { \
367     noitL(noit_error, "short write on initiating stream [%d != %d].\n", \
368           ctx->bytes_written, ctx->bytes_expected); \
369     goto socket_error; \
370   } \
371   ctx->bytes_expected = 0; \
372 } while(0)
373
374   noit_connection_update_timeout(nctx);
375   while(1) {
376     u_int32_t net_body_len;
377
378     switch(ctx->state) {
379       case REALTIME_HTTP_WANT_INITIATE:
380         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
381         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
382       case REALTIME_HTTP_WANT_SEND_INTERVAL:
383         nint = htonl(ctx->rt->interval);
384         full_nb_write(&nint, sizeof(nint));
385         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
386       case REALTIME_HTTP_WANT_SEND_UUID:
387         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
388         full_nb_write(uuid_str, 36);
389         ctx->state = REALTIME_HTTP_WANT_HEADER;
390       case REALTIME_HTTP_WANT_HEADER:
391         FULLREAD(e, ctx, sizeof(u_int32_t));
392         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
393         ctx->body_len = ntohl(net_body_len);
394         free(ctx->buffer); ctx->buffer = NULL;
395         ctx->state = REALTIME_HTTP_WANT_BODY;
396         break;
397       case REALTIME_HTTP_WANT_BODY:
398         FULLREAD(e, ctx, ctx->body_len);
399         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer, ctx->hack_inc_id++)) goto socket_error;
400         free(ctx->buffer); ctx->buffer = NULL;
401         ctx->state = REALTIME_HTTP_WANT_HEADER;
402         break;
403     }
404   }
405
406 }
407
408 int
409 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
410                                    struct timeval *now) {
411   noit_http_session_ctx *ctx = closure;
412   realtime_context *rc = ctx->dispatcher_closure;
413   struct realtime_tracker *node;
414
415   for(node = rc->checklist; node; node = node->next) {
416     if(node->noit) {
417       realtime_recv_ctx_t *rrctx;
418       rrctx = calloc(1, sizeof(*rrctx));
419       rrctx->ctx = ctx;
420       rrctx->rt = node;
421       stratcon_streamer_connection(NULL, node->noit,
422                                    stratcon_realtime_recv_handler,
423                                    NULL, rrctx,
424                                    free_realtime_recv_ctx);
425     }
426     else
427       noit_atomic_dec32(&ctx->ref_cnt);
428   }
429   if(ctx->ref_cnt == 1) {
430     noit_http_response_end(ctx);
431     clear_realtime_context(rc);
432     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
433   }
434   return 0;
435 }
436 int
437 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
438   const char *key, *value;
439   realtime_context *rc = ctx->dispatcher_closure;
440   int klen;
441   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
442   noit_http_request *req = &ctx->req;
443
444   if(rc->setup == RC_INITIAL) {
445     eventer_t completion;
446     struct realtime_tracker *node;
447     char c[1024];
448     int num_interests;
449
450     num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str);
451     if(num_interests == 0) {
452       noit_http_response_status_set(ctx, 404, "OK");
453       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
454       noit_http_response_end(ctx);
455       return 0;
456     }
457
458     noitL(noit_error, "http: %s %s %s\n",
459           req->method_str, req->uri_str, req->protocol_str);
460     while(noit_hash_next_str(&req->headers, &iter, &key, &klen, &value)) {
461       noitL(noit_error, "http: [%s: %s]\n", key, value);
462     }
463     noit_http_response_status_set(ctx, 200, "OK");
464     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
465     /*noit_http_response_option_set(ctx, NOIT_HTTP_GZIP);*/
466     /*noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);*/
467     noit_http_response_header_set(ctx, "Content-Type", "text/html");
468
469     snprintf(c, sizeof(c),
470              "<html><head><script>document.domain='%s';</script></head><body>\n",
471              rc->document_domain);
472     noit_http_response_append(ctx, c, strlen(c));
473
474     /* this dumb crap is to make some browsers happy (Safari) */
475     memset(c, ' ', sizeof(c));
476     noit_http_response_append(ctx, c, sizeof(c));
477     noit_http_response_flush(ctx, noit_false);
478
479     rc->setup = RC_REQ_RECV;
480     /* Each interest references the ctx */
481     for(node = rc->checklist; node; node = node->next) {
482       char uuid_str[UUID_STR_LEN+1];
483       noit_atomic_inc32(&ctx->ref_cnt);
484       uuid_unparse_lower(node->checkid, uuid_str);
485       noitL(noit_error, "Resolving uuid: %s\n", uuid_str);
486     }
487     completion = eventer_alloc();
488     completion->mask = EVENTER_TIMER;
489     completion->callback = stratcon_realtime_http_postresolve;
490     completion->closure = ctx;
491     gettimeofday(&completion->whence, NULL);
492     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL,
493                             rc->checklist, completion);
494   }
495   return EVENTER_EXCEPTION;
496 }
497 int
498 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
499                                struct timeval *now) {
500   acceptor_closure_t *ac = closure;
501   noit_http_session_ctx *http_ctx = ac->service_ctx;
502   return http_ctx->drive(e, mask, http_ctx, now);
503 }
504 static int
505 rest_stream_data(noit_http_rest_closure_t *restc,
506                  int npats, char **pats) {
507   /* We're here and want to subvert the rest system */
508   const char *document_domain = NULL;
509   noit_http_session_ctx *ctx = restc->http_ctx;
510
511   /* Rewire the handler */
512   restc->ac->service_ctx = ctx;
513
514   if(!noit_hash_retr_str(restc->ac->config,
515                          "document_domain", strlen("document_domain"),
516                          &document_domain)) {
517     noitL(noit_error, "Document domain not set!  Realtime streaming will be broken\n");
518     document_domain = "";
519   }
520   noit_http_rest_closure_free(restc);
521
522   noit_http_process_querystring(&ctx->req);
523   /* Rewire the http context */
524   ctx->conn.e->callback = stratcon_realtime_http_handler;
525   ctx->dispatcher = stratcon_request_dispatcher;
526   ctx->dispatcher_closure = alloc_realtime_context(document_domain);
527   //ctx->drive = stratcon_realtime_http_handler;
528   return stratcon_request_dispatcher(ctx);
529 }
530
531 void
532 stratcon_realtime_http_init(const char *toplevel) {
533   eventer_name_callback("stratcon_realtime_http",
534                         stratcon_realtime_http_handler);
535   eventer_name_callback("stratcon_realtime_recv",
536                         stratcon_realtime_recv_handler);
537   assert(noit_http_rest_register_auth(
538     "GET", "/data/",
539            "^((?:" UUID_REGEX "(?:@\\d+)?)(?:/" UUID_REGEX "(?:@\\d+)?)*)$",
540     rest_stream_data, noit_http_rest_access
541   ) == 0);
542   assert(noit_http_rest_register_auth(
543     "GET", "/", "^(.*)$", noit_rest_simple_file_handler, noit_http_rest_access
544   ) == 0);
545 }
Note: See TracBrowser for help on using the browser.