root/src/stratcon_realtime_http.c

Revision 47a77d02f1268e637a4807266064909be566e1f7, 15.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

flag-day(stratcond,webconsole,database) get realtime graphs working on uuids

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "noit_jlog_listener.h"
40 #include "noit_listener.h"
41 #include "noit_http.h"
42 #include "noit_rest.h"
43 #include "noit_livestream_listener.h"
44 #include "stratcon_realtime_http.h"
45 #include "stratcon_jlog_streamer.h"
46 #include "stratcon_datastore.h"
47
48 #include <unistd.h>
49 #include <assert.h>
50 #include <errno.h>
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #ifdef HAVE_SYS_FILIO_H
54 #include <sys/filio.h>
55 #endif
56 #include <netinet/in.h>
57 #include <sys/un.h>
58 #include <arpa/inet.h>
59
60
61 typedef struct realtime_recv_ctx_t {
62   int bytes_expected;
63   int bytes_read;
64   int bytes_written;
65   int body_len;
66   char *buffer;         /* These guys are for doing partial reads */
67
68   enum {
69     REALTIME_HTTP_WANT_INITIATE = 0,
70     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
71     REALTIME_HTTP_WANT_SEND_UUID = 2,
72     REALTIME_HTTP_WANT_HEADER = 3,
73     REALTIME_HTTP_WANT_BODY = 4,
74   } state;
75   int count;            /* Number of jlog messages we need to read */
76   noit_http_session_ctx *ctx;
77   struct realtime_tracker *rt;
78 } realtime_recv_ctx_t;
79
80 typedef struct realtime_context {
81   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
82   struct realtime_tracker *checklist;
83   char *document_domain;
84 } realtime_context;
85
86 static realtime_context *alloc_realtime_context(const char *domain) {
87   realtime_context *ctx;
88   ctx = calloc(sizeof(*ctx), 1);
89   ctx->document_domain = strdup(domain);
90   return ctx;
91 }
92 static void free_realtime_tracker(struct realtime_tracker *rt) {
93   if(rt->noit) free(rt->noit);
94   free(rt);
95 }
96 static void clear_realtime_context(realtime_context *rc) {
97  rc->setup = RC_INITIAL;
98   while(rc->checklist) {
99     struct realtime_tracker *tofree;
100     tofree = rc->checklist;
101     rc->checklist = tofree->next;
102     free_realtime_tracker(tofree);
103   }
104   if(rc->document_domain) free(rc->document_domain);
105   rc->document_domain = NULL;
106 }
107 int
108 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) {
109   char buffer[1024];
110   char *scp, *ecp, *token;
111   int len;
112
113 #define BAIL_HTTP_WRITE do { \
114   noitL(noit_error, "javascript emit failed: %s:%s:%d\n", \
115         __FILE__, __FUNCTION__, __LINE__); \
116   return -1; \
117 } while(0)
118
119 #define PROCESS_NEXT_FIELD(t,l) do { \
120   if(!*scp) goto bad_row; \
121   ecp = strchr(scp, '\t'); \
122   if(!ecp) goto bad_row; \
123   t = scp; \
124   l = (ecp-scp); \
125   scp = ecp + 1; \
126 } while(0)
127 #define PROCESS_LAST_FIELD(t,l) do { \
128   if(!*scp) ecp = scp; \
129   else { \
130     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
131     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
132   } \
133   t = scp; \
134   l = (ecp-scp); \
135 } while(0)
136
137   scp = buff;
138   PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
139   if(buff[0] == 'M') {
140     snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('");
141     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
142
143     /* Time */
144     PROCESS_NEXT_FIELD(token,len);
145     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
146
147     snprintf(buffer, sizeof(buffer), "', '");
148     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
149
150     /* UUID */
151     PROCESS_NEXT_FIELD(token,len);
152     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
153
154     snprintf(buffer, sizeof(buffer), "', '");
155     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
156
157     /* name */
158     PROCESS_NEXT_FIELD(token,len);
159     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
160
161     snprintf(buffer, sizeof(buffer), "', '");
162     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
163
164     PROCESS_NEXT_FIELD(token,len); /* skip type */
165     PROCESS_LAST_FIELD(token,len); /* value */
166     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
167
168     snprintf(buffer, sizeof(buffer), "');</script>\n");
169     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
170
171     if(noit_http_response_flush(ctx, noit_false) == noit_false) BAIL_HTTP_WRITE;
172   }
173
174   return 0;
175
176  bad_row:
177   BAIL_HTTP_WRITE;
178   if(0) {
179     noit_http_response_end(ctx);
180     clear_realtime_context(ctx->dispatcher_closure);
181     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
182     return 0;
183   }
184 }
185 int
186 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) {
187   int len, cnt = 0;
188   char *cp, *copy, *interest, *brk;
189   if(strncmp(uri, "/data/", 6)) return 0;
190   cp = uri + 6;
191   len = strlen(cp);
192   copy = alloca(len + 1);
193   if(!copy) return 0;
194   memcpy(copy, cp, len);
195   copy[len] = '\0';
196
197   for (interest = strtok_r(copy, "/", &brk);
198        interest;
199        interest = strtok_r(NULL, "/", &brk)) {
200     uuid_t in_uuid;
201     struct realtime_tracker *node;
202     char *interval;
203
204     interval = strchr(interest, '@');
205     if(!interval)
206       interval = "5000";
207     else
208       *interval++ = '\0';
209     if(uuid_parse(interest, in_uuid)) continue;
210     node = calloc(1, sizeof(*node));
211     node->rc = rc;
212     uuid_copy(node->checkid, in_uuid);
213     node->interval = atoi(interval);
214     node->next = rc->checklist;
215     rc->checklist = node;
216     cnt++;
217   }
218   return cnt;
219 }
220 static void
221 free_realtime_recv_ctx(void *vctx) {
222   realtime_recv_ctx_t *rrctx = vctx;
223   noit_atomic_dec32(&rrctx->ctx->ref_cnt);
224   free(rrctx);
225 }
226 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
227 static int
228 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
229   int len, mask;
230   while(ctx->bytes_read < ctx->bytes_expected) {
231     len = Eread(ctx->buffer + ctx->bytes_read,
232                 ctx->bytes_expected - ctx->bytes_read);
233     if(len < 0) {
234       *newmask = mask;
235       return -1;
236     }
237     /* if we get 0 inside SSL, and there was a real error, we
238      * will actually get a -1 here.
239      * if(len == 0) return ctx->bytes_read;
240      */
241     ctx->bytes_read += len;
242   }
243   assert(ctx->bytes_read == ctx->bytes_expected);
244   return ctx->bytes_read;
245 }
246 #define FULLREAD(e,ctx,size) do { \
247   int mask, len; \
248   if(!ctx->bytes_expected) { \
249     ctx->bytes_expected = size; \
250     if(ctx->buffer) free(ctx->buffer); \
251     ctx->buffer = malloc(size + 1); \
252     if(ctx->buffer == NULL) { \
253       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
254       goto socket_error; \
255     } \
256     ctx->buffer[size] = '\0'; \
257   } \
258   len = __read_on_ctx(e, ctx, &mask); \
259   if(len < 0) { \
260     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
261     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
262     goto socket_error; \
263   } \
264   ctx->bytes_read = 0; \
265   ctx->bytes_expected = 0; \
266   if(len != size) { \
267     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
268           ctx->state, len, (unsigned long)size); \
269     goto socket_error; \
270   } \
271 } while(0)
272
273 int
274 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
275                                struct timeval *now) {
276   static u_int32_t livestream_cmd = 0;
277   noit_connection_ctx_t *nctx = closure;
278   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
279   int len;
280   u_int32_t nint;
281   char uuid_str[37];
282
283   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
284
285   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
286  socket_error:
287     ctx->state = REALTIME_HTTP_WANT_INITIATE;
288     ctx->count = 0;
289     ctx->bytes_read = 0;
290     ctx->bytes_written = 0;
291     ctx->bytes_expected = 0;
292     if(ctx->buffer) free(ctx->buffer);
293     ctx->buffer = NULL;
294     noit_connection_ctx_dealloc(nctx);
295     eventer_remove_fd(e->fd);
296     e->opset->close(e->fd, &mask, e);
297     return 0;
298   }
299
300 #define full_nb_write(data, wlen) do { \
301   if(!ctx->bytes_expected) { \
302     ctx->bytes_written = 0; \
303     ctx->bytes_expected = wlen; \
304   } \
305   while(ctx->bytes_written < ctx->bytes_expected) { \
306     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
307                                        ctx->bytes_expected - ctx->bytes_written, \
308                                        &mask, e)) && errno == EINTR); \
309     if(len < 0) { \
310       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
311       goto socket_error; \
312     } \
313     ctx->bytes_written += len; \
314   } \
315   if(ctx->bytes_written != ctx->bytes_expected) { \
316     noitL(noit_error, "short write on initiating stream [%d != %d].\n", \
317           ctx->bytes_written, ctx->bytes_expected); \
318     goto socket_error; \
319   } \
320   ctx->bytes_expected = 0; \
321 } while(0)
322
323   while(1) {
324     u_int32_t net_body_len;
325
326     switch(ctx->state) {
327       case REALTIME_HTTP_WANT_INITIATE:
328         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
329         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
330       case REALTIME_HTTP_WANT_SEND_INTERVAL:
331         nint = htonl(ctx->rt->interval);
332         full_nb_write(&nint, sizeof(nint));
333         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
334       case REALTIME_HTTP_WANT_SEND_UUID:
335         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
336         full_nb_write(uuid_str, 36);
337         ctx->state = REALTIME_HTTP_WANT_HEADER;
338       case REALTIME_HTTP_WANT_HEADER:
339         FULLREAD(e, ctx, sizeof(u_int32_t));
340         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
341         ctx->body_len = ntohl(net_body_len);
342         free(ctx->buffer); ctx->buffer = NULL;
343         ctx->state = REALTIME_HTTP_WANT_BODY;
344         break;
345       case REALTIME_HTTP_WANT_BODY:
346         FULLREAD(e, ctx, ctx->body_len);
347         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error;
348         free(ctx->buffer); ctx->buffer = NULL;
349         ctx->state = REALTIME_HTTP_WANT_HEADER;
350         break;
351     }
352   }
353
354 }
355
356 int
357 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
358                                    struct timeval *now) {
359   noit_http_session_ctx *ctx = closure;
360   realtime_context *rc = ctx->dispatcher_closure;
361   struct realtime_tracker *node;
362
363   for(node = rc->checklist; node; node = node->next) {
364     if(node->noit) {
365       realtime_recv_ctx_t *rrctx;
366       rrctx = calloc(1, sizeof(*rrctx));
367       rrctx->ctx = ctx;
368       rrctx->rt = node;
369       stratcon_streamer_connection(NULL, node->noit,
370                                    stratcon_realtime_recv_handler,
371                                    NULL, rrctx,
372                                    free_realtime_recv_ctx);
373     }
374     else
375       noit_atomic_dec32(&ctx->ref_cnt);
376   }
377   if(ctx->ref_cnt == 1) {
378     noit_http_response_end(ctx);
379     clear_realtime_context(rc);
380     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
381   }
382   return 0;
383 }
384 int
385 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
386   const char *key, *value;
387   realtime_context *rc = ctx->dispatcher_closure;
388   int klen;
389   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
390   noit_http_request *req = &ctx->req;
391
392   if(rc->setup == RC_INITIAL) {
393     eventer_t completion;
394     struct realtime_tracker *node;
395     char c[1024];
396     int num_interests;
397
398     num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str);
399     if(num_interests == 0) {
400       noit_http_response_status_set(ctx, 404, "OK");
401       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
402       noit_http_response_end(ctx);
403       return 0;
404     }
405
406     noitL(noit_error, "http: %s %s %s\n",
407           req->method_str, req->uri_str, req->protocol_str);
408     while(noit_hash_next_str(&req->headers, &iter, &key, &klen, &value)) {
409       noitL(noit_error, "http: [%s: %s]\n", key, value);
410     }
411     noit_http_response_status_set(ctx, 200, "OK");
412     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
413     /*noit_http_response_option_set(ctx, NOIT_HTTP_GZIP);*/
414     /*noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);*/
415     noit_http_response_header_set(ctx, "Content-Type", "text/html");
416
417     snprintf(c, sizeof(c),
418              "<html><head><script>document.domain='%s';</script></head><body>\n",
419              rc->document_domain);
420     noit_http_response_append(ctx, c, strlen(c));
421
422     /* this dumb crap is to make some browsers happy (Safari) */
423     memset(c, ' ', sizeof(c));
424     noit_http_response_append(ctx, c, sizeof(c));
425     noit_http_response_flush(ctx, noit_false);
426
427     rc->setup = RC_REQ_RECV;
428     /* Each interest references the ctx */
429     for(node = rc->checklist; node; node = node->next) {
430       char uuid_str[UUID_STR_LEN+1];
431       noit_atomic_inc32(&ctx->ref_cnt);
432       uuid_unparse_lower(node->checkid, uuid_str);
433       noitL(noit_error, "Resolving uuid: %s\n", uuid_str);
434     }
435     completion = eventer_alloc();
436     completion->mask = EVENTER_TIMER;
437     completion->callback = stratcon_realtime_http_postresolve;
438     completion->closure = ctx;
439     gettimeofday(&completion->whence, NULL);
440     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL,
441                             rc->checklist, completion);
442   }
443   return EVENTER_EXCEPTION;
444 }
445 int
446 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
447                                struct timeval *now) {
448   acceptor_closure_t *ac = closure;
449   noit_http_session_ctx *http_ctx = ac->service_ctx;
450   return http_ctx->drive(e, mask, http_ctx, now);
451 }
452 static int
453 rest_stream_data(noit_http_rest_closure_t *restc,
454                  int npats, char **pats) {
455   /* We're here and want to subvert the rest system */
456   const char *document_domain = NULL;
457   noit_http_session_ctx *ctx = restc->http_ctx;
458
459   /* Rewire the handler */
460   restc->ac->service_ctx = ctx;
461
462   if(!noit_hash_retr_str(restc->ac->config,
463                          "document_domain", strlen("document_domain"),
464                          &document_domain)) {
465     noitL(noit_error, "Document domain not set!  Realtime streaming will be broken\n");
466     document_domain = "";
467   }
468   noit_http_rest_closure_free(restc);
469
470   /* Rewire the http context */
471   ctx->conn.e->callback = stratcon_realtime_http_handler;
472   ctx->dispatcher = stratcon_request_dispatcher;
473   ctx->dispatcher_closure = alloc_realtime_context(document_domain);
474   //ctx->drive = stratcon_realtime_http_handler;
475   return stratcon_request_dispatcher(ctx);
476 }
477
478 void
479 stratcon_realtime_http_init(const char *toplevel) {
480   eventer_name_callback("stratcon_realtime_http",
481                         stratcon_realtime_http_handler);
482   assert(noit_http_rest_register(
483     "GET", "/data/",
484            "^((?:" UUID_REGEX "(?:@\\d+)?)(?:/" UUID_REGEX "(?:@\\d+)?)*)$",
485     rest_stream_data
486   ) == 0);
487 }
Note: See TracBrowser for help on using the browser.