root/src/stratcon_realtime_http.c

Revision 7b386ef4c8bb241dc157e198b028d156bf082dbc, 21.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 months ago)

Basic support for reverse tunneling of TCP connections over a noit SSL connection.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "utils/noit_str.h"
39 #include "jlog/jlog.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_listener.h"
42 #include "noit_http.h"
43 #include "noit_rest.h"
44 #include "noit_check.h"
45 #include "noit_check_log_helpers.h"
46 #include "noit_livestream_listener.h"
47 #include "stratcon_realtime_http.h"
48 #include "stratcon_jlog_streamer.h"
49 #include "stratcon_datastore.h"
50
51 #include <ctype.h>
52 #include <unistd.h>
53 #include <assert.h>
54 #include <errno.h>
55 #include <sys/types.h>
56 #include <sys/socket.h>
57 #ifdef HAVE_SYS_FILIO_H
58 #include <sys/filio.h>
59 #endif
60 #include <netinet/in.h>
61 #include <sys/un.h>
62 #include <arpa/inet.h>
63
64 /*
65  * it appears that GCC 4.5.2 incorrectly thinks that FULLREAD uses "mask"
66  * without initializing it, so disable that specific warning for this file
67  * for now
68  */
69
70 #if __GNUC__ == 4 && __GNUC_MINOR__ == 5 && __GNUC_PATCHLEVEL__ == 2
71 #pragma GCC diagnostic ignored "-Wuninitialized"
72 #endif
73
74 typedef struct realtime_recv_ctx_t {
75   int bytes_expected;
76   int bytes_read;
77   int bytes_written;
78   int body_len;
79   char *buffer;         /* These guys are for doing partial reads */
80
81   enum {
82     REALTIME_HTTP_WANT_INITIATE = 0,
83     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
84     REALTIME_HTTP_WANT_SEND_UUID = 2,
85     REALTIME_HTTP_WANT_HEADER = 3,
86     REALTIME_HTTP_WANT_BODY = 4,
87   } state;
88   int count;            /* Number of jlog messages we need to read */
89   u_int32_t hack_inc_id;
90   noit_http_session_ctx *ctx;
91   struct realtime_tracker *rt;
92 } realtime_recv_ctx_t;
93
94 typedef struct realtime_context {
95   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
96   struct realtime_tracker *checklist;
97   char *document_domain;
98 } realtime_context;
99
100 static realtime_context *alloc_realtime_context(const char *domain) {
101   realtime_context *ctx;
102   ctx = calloc(sizeof(*ctx), 1);
103   ctx->document_domain = strdup(domain);
104   return ctx;
105 }
106 static void free_realtime_tracker(struct realtime_tracker *rt) {
107   if(rt->noit) free(rt->noit);
108   free(rt);
109 }
110 static void clear_realtime_context(realtime_context *rc) {
111  rc->setup = RC_INITIAL;
112   while(rc->checklist) {
113     struct realtime_tracker *tofree;
114     tofree = rc->checklist;
115     rc->checklist = tofree->next;
116     free_realtime_tracker(tofree);
117   }
118   if(rc->document_domain) free(rc->document_domain);
119   rc->document_domain = NULL;
120 }
121 int
122 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *in_buff,
123                             u_int32_t *inc_id) {
124   char buffer[1024];
125   char *scp, *ecp, *token, *buff;
126   int i, len, cnt;
127   const char *v, *cb = NULL;
128   noit_hash_table json = NOIT_HASH_EMPTY;
129   noit_http_request *req = noit_http_session_request(ctx);
130   char s_inc_id[42];
131   char **outrows = NULL;
132
133   cb = noit_http_request_querystring(req, "cb");
134   for(v = cb; v && *v; v++)
135     if(!((*v >= '0' && *v <= '9') ||
136          (*v >= 'a' && *v <= 'z') ||
137          (*v >= 'A' && *v <= 'Z') ||
138          (*v == '_') || (*v == '.'))) {
139       cb = NULL;
140       break;
141     }
142   if(!cb) cb = "window.parent.plot_iframe_data";
143
144 #define BAIL_HTTP_WRITE do { \
145   if(outrows) { \
146     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]); \
147     free(outrows); \
148   } \
149   noit_hash_destroy(&json, NULL, free); \
150   noitL(noit_error, "javascript emit failed: %s:%s:%d\n", \
151         __FILE__, __FUNCTION__, __LINE__); \
152   return -1; \
153 } while(0)
154
155 #define PROCESS_NEXT_FIELD(t,l) do { \
156   if(!*scp) goto bad_row; \
157   ecp = strchr(scp, '\t'); \
158   if(!ecp) goto bad_row; \
159   t = scp; \
160   l = (ecp-scp); \
161   scp = ecp + 1; \
162 } while(0)
163 #define PROCESS_LAST_FIELD(t,l) do { \
164   if(!*scp) ecp = scp; \
165   else { \
166     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
167     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
168   } \
169   t = scp; \
170   l = (ecp-scp); \
171 } while(0)
172
173   noitL(noit_error, "recv(%s)\n", in_buff);
174   if(in_buff[0] == 'B' && in_buff[1] != '\0' && in_buff[2] == '\t') {
175     cnt = noit_check_log_b_to_sm(in_buff, strlen(in_buff), &outrows);
176   }
177   else {
178     cnt = 1;
179     outrows = malloc(sizeof(*outrows));
180     outrows[0] = strdup(in_buff);
181   }
182   for(i=0; i<cnt; i++) {
183     buff = outrows[i];
184     if(!buff) continue;
185     noitL(noit_error, "recv_xlt(%s)\n", buff);
186     scp = buff;
187     PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
188     if(buff[1] == '\t' && (buff[0] == 'M' || buff[0] == 'S')) {
189       char target[256], module[256], name[256], uuid_str[UUID_STR_LEN+1];
190       noit_http_request *req = noit_http_session_request(ctx);
191       noit_hash_table *qs;
192       noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
193       const char *key;
194       int klen, i=0;
195       void *vval;
196       char type[2] = { '\0', '\0' };
197       type[0] = buff[0];
198
199 #define ra_write(a,b) if(noit_http_response_append(ctx, a, b) == noit_false) BAIL_HTTP_WRITE
200
201       snprintf(s_inc_id, sizeof(s_inc_id), "script-%08x", (*inc_id)++);
202       snprintf(buffer, sizeof(buffer), "<script id=\"%s\">%s({", s_inc_id, cb);
203       ra_write(buffer, strlen(buffer));
204
205       qs = noit_http_request_querystring_table(req);
206       while(noit_hash_next(qs, &iter, &key, &klen, &vval)) {
207         if(!strcmp(key, "cb")) continue;
208         noit_hash_store(&json, key, klen, strdup(vval ?(char *)vval : "true"));
209       }
210       /* Time */
211       noit_hash_store(&json, "script_id", 9, strdup(s_inc_id));
212       noit_hash_store(&json, "type", 4, strdup(type));
213       PROCESS_NEXT_FIELD(token,len);
214       noit_hash_store(&json, "time", 4, noit__strndup(token, len));
215       /* UUID */
216       PROCESS_NEXT_FIELD(token,len);
217       noit_check_extended_id_split(token, len, target, sizeof(target),
218                                    module, sizeof(module), name, sizeof(name),
219                                    uuid_str, sizeof(uuid_str));
220       if(*uuid_str)
221         noit_hash_store(&json, "id", 2,
222                         noit__strndup(uuid_str, strlen(uuid_str)));
223       if(*target)
224         noit_hash_store(&json, "check_target", 12,
225                         noit__strndup(target, strlen(target)));
226       if(*module)
227         noit_hash_store(&json, "check_module", 12,
228                         noit__strndup(module, strlen(module)));
229       if(*name)
230         noit_hash_store(&json, "check_name", 10,
231                         noit__strndup(name, strlen(name)));
232       if(buff[0] == 'M') {
233         /* name */
234         PROCESS_NEXT_FIELD(token,len);
235         noit_hash_store(&json, "metric_name", 11, noit__strndup(token, len));
236         /* type */
237         PROCESS_NEXT_FIELD(token,len);
238         noit_hash_store(&json, "metric_type", 11, noit__strndup(token, len));
239         /* value */
240         PROCESS_LAST_FIELD(token,len); /* value */
241         noit_hash_store(&json, "value", 5, noit__strndup(token, len));
242       }
243       else if(buff[0] == 'S') {
244         /* state */
245         PROCESS_NEXT_FIELD(token,len);
246         noit_hash_store(&json, "check_state", 11, noit__strndup(token, len));
247         /* availability */
248         PROCESS_NEXT_FIELD(token,len);
249         noit_hash_store(&json, "check_availability", 18, noit__strndup(token, len));
250         /* duration */
251         PROCESS_NEXT_FIELD(token,len);
252         noit_hash_store(&json, "check_duration_ms", 17, noit__strndup(token, len));
253         /* status */
254         PROCESS_LAST_FIELD(token,len);
255         noit_hash_store(&json, "status_message", 14, noit__strndup(token, len));
256       }
257
258       memset(&iter, 0, sizeof(iter));
259       while(noit_hash_next(&json, &iter, &key, &klen, &vval)) {
260         char *val = (char *)vval;
261         if(i++) ra_write(",", 1);
262         ra_write("\"", 1);
263         ra_write(key, klen);
264         ra_write("\":\"", 3);
265         while(*val) {
266           if(*val == '\"' || *val == '\\') {
267             ra_write((char *)"\\", 1);
268           }
269           if(isprint(*val)) {
270             ra_write((char *)val, 1);
271           }
272           else {
273             char od[5];
274             snprintf(od, sizeof(od), "\\%03o", *((unsigned char *)val));
275             ra_write(od, strlen(od));
276           }
277           val++;
278         }
279         ra_write("\"", 1);
280       }
281       snprintf(buffer, sizeof(buffer), "});</script>\n");
282       ra_write(buffer, strlen(buffer));
283
284       if(noit_http_response_flush(ctx, noit_false) == noit_false) BAIL_HTTP_WRITE;
285     }
286
287     noit_hash_destroy(&json, NULL, free);
288     memset(&json, 0, sizeof(json));
289   }
290   if(outrows) {
291     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]);
292     free(outrows);
293   }
294
295   return 0;
296
297  bad_row:
298   BAIL_HTTP_WRITE;
299 }
300 int
301 stratcon_realtime_uri_parse(realtime_context *rc, const char *uri) {
302   int len, cnt = 0;
303   const char *cp, *interest;
304   char *copy, *brk;
305   if(strncmp(uri, "/data/", 6)) return 0;
306   cp = uri + 6;
307   len = strlen(cp);
308   copy = alloca(len + 1);
309   if(!copy) return 0;
310   memcpy(copy, cp, len);
311   copy[len] = '\0';
312
313   for (interest = strtok_r(copy, "/", &brk);
314        interest;
315        interest = strtok_r(NULL, "/", &brk)) {
316     uuid_t in_uuid;
317     struct realtime_tracker *node;
318     char *interval;
319
320     interval = strchr(interest, '@');
321     if(!interval)
322       interval = "5000";
323     else
324       *interval++ = '\0';
325     if(uuid_parse((char *)interest, in_uuid)) continue;
326     node = calloc(1, sizeof(*node));
327     node->rc = rc;
328     uuid_copy(node->checkid, in_uuid);
329     node->interval = atoi(interval);
330     node->next = rc->checklist;
331     rc->checklist = node;
332     cnt++;
333   }
334   return cnt;
335 }
336 static void
337 free_realtime_recv_ctx(void *vctx) {
338   realtime_recv_ctx_t *rrctx = vctx;
339   noit_http_session_ctx *ctx = rrctx->ctx;
340   realtime_context *rc = noit_http_session_dispatcher_closure(ctx);
341
342   if(noit_http_session_ref_dec(ctx) == 1) {
343     noit_http_response_end(ctx);
344     clear_realtime_context(rc);
345     noit_http_session_trigger(ctx, EVENTER_WRITE | EVENTER_EXCEPTION);
346   }
347   free(rrctx);
348 }
349 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
350 static int
351 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
352   int len, mask;
353   while(ctx->bytes_read < ctx->bytes_expected) {
354     len = Eread(ctx->buffer + ctx->bytes_read,
355                 ctx->bytes_expected - ctx->bytes_read);
356     if(len < 0) {
357       *newmask = mask;
358       return -1;
359     }
360     /* if we get 0 inside SSL, and there was a real error, we
361      * will actually get a -1 here.
362      * if(len == 0) return ctx->bytes_read;
363      */
364     ctx->bytes_read += len;
365   }
366   assert(ctx->bytes_read == ctx->bytes_expected);
367   return ctx->bytes_read;
368 }
369 #define FULLREAD(e,ctx,size) do { \
370   int mask, len; \
371   if(!ctx->bytes_expected) { \
372     ctx->bytes_expected = size; \
373     if(ctx->buffer) free(ctx->buffer); \
374     ctx->buffer = malloc(size + 1); \
375     if(ctx->buffer == NULL) { \
376       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
377       goto socket_error; \
378     } \
379     ctx->buffer[size] = '\0'; \
380   } \
381   len = __read_on_ctx(e, ctx, &mask); \
382   if(len < 0) { \
383     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
384     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
385     goto socket_error; \
386   } \
387   ctx->bytes_read = 0; \
388   ctx->bytes_expected = 0; \
389   if(len != size) { \
390     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
391           ctx->state, len, (unsigned long)size); \
392     goto socket_error; \
393   } \
394 } while(0)
395
396 int
397 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
398                                struct timeval *now) {
399   static u_int32_t livestream_cmd = 0;
400   noit_connection_ctx_t *nctx = closure;
401   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
402   int len;
403   u_int32_t nint;
404   char uuid_str[37];
405
406   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
407
408   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
409  socket_error:
410     ctx->state = REALTIME_HTTP_WANT_INITIATE;
411     ctx->count = 0;
412     ctx->bytes_read = 0;
413     ctx->bytes_written = 0;
414     ctx->bytes_expected = 0;
415     if(ctx->buffer) free(ctx->buffer);
416     ctx->buffer = NULL;
417     /* We close the event here and null it in the context
418      * because the noit_connection_ctx_dealloc() will both close
419      * it and free it (which our caller will double free) and
420      * we consider double frees to be harmful.
421      */
422     eventer_remove_fd(e->fd);
423     e->opset->close(e->fd, &mask, e);
424     nctx->e = NULL;
425     noit_connection_ctx_dealloc(nctx);
426     return 0;
427   }
428
429 #define full_nb_write(data, wlen) do { \
430   if(!ctx->bytes_expected) { \
431     ctx->bytes_written = 0; \
432     ctx->bytes_expected = wlen; \
433   } \
434   while(ctx->bytes_written < ctx->bytes_expected) { \
435     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
436                                        ctx->bytes_expected - ctx->bytes_written, \
437                                        &mask, e)) && errno == EINTR); \
438     if(len < 0) { \
439       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
440       goto socket_error; \
441     } \
442     ctx->bytes_written += len; \
443   } \
444   if(ctx->bytes_written != ctx->bytes_expected) { \
445     noitL(noit_error, "short write on initiating stream [%d != %d].\n", \
446           ctx->bytes_written, ctx->bytes_expected); \
447     goto socket_error; \
448   } \
449   ctx->bytes_expected = 0; \
450 } while(0)
451
452   noit_connection_update_timeout(nctx);
453   while(1) {
454     u_int32_t net_body_len;
455
456     switch(ctx->state) {
457       case REALTIME_HTTP_WANT_INITIATE:
458         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
459         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
460         /* FALLTHROUGH */
461       case REALTIME_HTTP_WANT_SEND_INTERVAL:
462         nint = htonl(ctx->rt->interval);
463         full_nb_write(&nint, sizeof(nint));
464         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
465         /* FALLTHROUGH */
466       case REALTIME_HTTP_WANT_SEND_UUID:
467         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
468         full_nb_write(uuid_str, 36);
469         ctx->state = REALTIME_HTTP_WANT_HEADER;
470         /* FALLTHROUGH */
471       case REALTIME_HTTP_WANT_HEADER:
472         FULLREAD(e, ctx, sizeof(u_int32_t));
473         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
474         ctx->body_len = ntohl(net_body_len);
475         free(ctx->buffer); ctx->buffer = NULL;
476         ctx->state = REALTIME_HTTP_WANT_BODY;
477         break;
478       case REALTIME_HTTP_WANT_BODY:
479         FULLREAD(e, ctx, ctx->body_len);
480         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer, &ctx->hack_inc_id)) goto socket_error;
481         free(ctx->buffer); ctx->buffer = NULL;
482         ctx->state = REALTIME_HTTP_WANT_HEADER;
483         break;
484     }
485   }
486
487 }
488
489 int
490 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
491                                    struct timeval *now) {
492   noit_http_session_ctx *ctx = closure;
493   realtime_context *rc = noit_http_session_dispatcher_closure(ctx);
494   struct realtime_tracker *node;
495
496   for(node = rc->checklist; node; node = node->next) {
497     if(node->noit) {
498       realtime_recv_ctx_t *rrctx;
499       rrctx = calloc(1, sizeof(*rrctx));
500       rrctx->ctx = ctx;
501       rrctx->rt = node;
502       stratcon_streamer_connection(NULL, node->noit, "noit",
503                                    stratcon_realtime_recv_handler,
504                                    NULL, rrctx,
505                                    free_realtime_recv_ctx);
506     }
507     else
508       noit_http_session_ref_dec(ctx);
509   }
510   if(noit_http_session_ref_cnt(ctx) == 1) {
511     noit_http_response_end(ctx);
512     clear_realtime_context(rc);
513     noit_http_session_trigger(ctx, EVENTER_WRITE);
514   }
515   return 0;
516 }
517 int
518 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
519   const char *key, *value;
520   realtime_context *rc = noit_http_session_dispatcher_closure(ctx);
521   int klen;
522   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
523   noit_http_request *req = noit_http_session_request(ctx);
524
525   if(rc->setup == RC_INITIAL) {
526     eventer_t completion;
527     struct realtime_tracker *node;
528     char c[1024];
529     int num_interests;
530     const char *uri_str = noit_http_request_uri_str(req);
531     noit_hash_table *headers = noit_http_request_headers_table(req);
532
533     num_interests = stratcon_realtime_uri_parse(rc, uri_str);
534     if(num_interests == 0) {
535       noit_http_response_status_set(ctx, 404, "OK");
536       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
537       noit_http_response_end(ctx);
538       return 0;
539     }
540
541     noitL(noit_error, "http: %s %s %s\n",
542           noit_http_request_method_str(req), uri_str,
543           noit_http_request_protocol_str(req));
544     while(noit_hash_next_str(headers, &iter, &key, &klen, &value)) {
545       noitL(noit_error, "http: [%s: %s]\n", key, value);
546     }
547     noit_http_response_status_set(ctx, 200, "OK");
548     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
549     /*noit_http_response_option_set(ctx, NOIT_HTTP_GZIP);*/
550     /*noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);*/
551     noit_http_response_header_set(ctx, "Content-Type", "text/html");
552
553     snprintf(c, sizeof(c),
554              "<html><head><script>document.domain='%s';</script></head><body>\n",
555              rc->document_domain);
556     noit_http_response_append(ctx, c, strlen(c));
557
558     /* this dumb crap is to make some browsers happy (Safari) */
559     memset(c, ' ', sizeof(c));
560     noit_http_response_append(ctx, c, sizeof(c));
561     noit_http_response_flush(ctx, noit_false);
562
563     rc->setup = RC_REQ_RECV;
564     /* Each interest references the ctx */
565     for(node = rc->checklist; node; node = node->next) {
566       char uuid_str[UUID_STR_LEN+1];
567       noit_http_session_ref_inc(ctx);
568       uuid_unparse_lower(node->checkid, uuid_str);
569       noitL(noit_error, "Resolving uuid: %s\n", uuid_str);
570     }
571     completion = eventer_alloc();
572     completion->mask = EVENTER_TIMER;
573     completion->callback = stratcon_realtime_http_postresolve;
574     completion->closure = ctx;
575     gettimeofday(&completion->whence, NULL);
576     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL,
577                             rc->checklist, completion);
578   }
579   return EVENTER_EXCEPTION;
580 }
581 int
582 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
583                                struct timeval *now) {
584   int done = 0, rv;
585   acceptor_closure_t *ac = closure;
586   noit_http_session_ctx *http_ctx = ac->service_ctx;
587   rv = noit_http_session_drive(e, mask, http_ctx, now, &done);
588   if(done) acceptor_closure_free(ac);
589   return rv;
590 }
591 static int
592 rest_stream_data(noit_http_rest_closure_t *restc,
593                  int npats, char **pats) {
594   /* We're here and want to subvert the rest system */
595   const char *document_domain = NULL;
596   noit_http_session_ctx *ctx = restc->http_ctx;
597   noit_http_connection *conn = noit_http_session_connection(ctx);
598   eventer_t e;
599   acceptor_closure_t *ac = restc->ac;
600
601   /* Rewire the handler */
602   if(ac->service_ctx_free)
603     ac->service_ctx_free(ac->service_ctx);
604   ac->service_ctx = ctx;
605   ac->service_ctx_free = noit_http_ctx_acceptor_free;
606
607   if(!noit_hash_retr_str(ac->config,
608                          "document_domain", strlen("document_domain"),
609                          &document_domain)) {
610     noitL(noit_error, "Document domain not set!  Realtime streaming will be broken\n");
611     document_domain = "";
612   }
613
614   noit_http_process_querystring(noit_http_session_request(ctx));
615   /* Rewire the http context */
616   e = noit_http_connection_event(conn);
617   e->callback = stratcon_realtime_http_handler;
618   noit_http_session_set_dispatcher(ctx, stratcon_request_dispatcher,
619                                    alloc_realtime_context(document_domain));
620   return stratcon_request_dispatcher(ctx);
621 }
622
623 void
624 stratcon_realtime_http_init(const char *toplevel) {
625   eventer_name_callback("stratcon_realtime_http",
626                         stratcon_realtime_http_handler);
627   eventer_name_callback("stratcon_realtime_recv",
628                         stratcon_realtime_recv_handler);
629   assert(noit_http_rest_register_auth(
630     "GET", "/data/",
631            "^((?:" UUID_REGEX "(?:@\\d+)?)(?:/" UUID_REGEX "(?:@\\d+)?)*)$",
632     rest_stream_data, noit_http_rest_client_cert_auth
633   ) == 0);
634   assert(noit_http_rest_register_auth(
635     "GET", "/", "^(.*)$", noit_rest_simple_file_handler,
636            noit_http_rest_client_cert_auth
637   ) == 0);
638 }
Note: See TracBrowser for help on using the browser.