root/src/stratcon_realtime_http.c

Revision 304ec80b8cf842fc0abe5f9029790908b6455957, 21.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 months ago)

Convert to libmtev.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <jlog.h>
37
38 #include <eventer/eventer.h>
39 #include <mtev_conf.h>
40 #include <mtev_listener.h>
41 #include <mtev_http.h>
42 #include <mtev_rest.h>
43 #include <mtev_hash.h>
44 #include <mtev_log.h>
45 #include <mtev_str.h>
46
47 #include "noit_mtev_bridge.h"
48 #include "noit_jlog_listener.h"
49 #include "noit_check.h"
50 #include "noit_check_log_helpers.h"
51 #include "noit_livestream_listener.h"
52 #include "stratcon_realtime_http.h"
53 #include "stratcon_jlog_streamer.h"
54 #include "stratcon_datastore.h"
55
56 #include <ctype.h>
57 #include <unistd.h>
58 #include <assert.h>
59 #include <errno.h>
60 #include <sys/types.h>
61 #include <sys/socket.h>
62 #ifdef HAVE_SYS_FILIO_H
63 #include <sys/filio.h>
64 #endif
65 #include <netinet/in.h>
66 #include <sys/un.h>
67 #include <arpa/inet.h>
68
69 /*
70  * it appears that GCC 4.5.2 incorrectly thinks that FULLREAD uses "mask"
71  * without initializing it, so disable that specific warning for this file
72  * for now
73  */
74
75 #if __GNUC__ == 4 && __GNUC_MINOR__ == 5 && __GNUC_PATCHLEVEL__ == 2
76 #pragma GCC diagnostic ignored "-Wuninitialized"
77 #endif
78
79 typedef struct realtime_recv_ctx_t {
80   int bytes_expected;
81   int bytes_read;
82   int bytes_written;
83   int body_len;
84   char *buffer;         /* These guys are for doing partial reads */
85
86   enum {
87     REALTIME_HTTP_WANT_INITIATE = 0,
88     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
89     REALTIME_HTTP_WANT_SEND_UUID = 2,
90     REALTIME_HTTP_WANT_HEADER = 3,
91     REALTIME_HTTP_WANT_BODY = 4,
92   } state;
93   int count;            /* Number of jlog messages we need to read */
94   u_int32_t hack_inc_id;
95   mtev_http_session_ctx *ctx;
96   struct realtime_tracker *rt;
97 } realtime_recv_ctx_t;
98
99 typedef struct realtime_context {
100   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
101   struct realtime_tracker *checklist;
102   char *document_domain;
103 } realtime_context;
104
105 static realtime_context *alloc_realtime_context(const char *domain) {
106   realtime_context *ctx;
107   ctx = calloc(sizeof(*ctx), 1);
108   ctx->document_domain = strdup(domain);
109   return ctx;
110 }
111 static void free_realtime_tracker(struct realtime_tracker *rt) {
112   if(rt->noit) free(rt->noit);
113   free(rt);
114 }
115 static void clear_realtime_context(realtime_context *rc) {
116  rc->setup = RC_INITIAL;
117   while(rc->checklist) {
118     struct realtime_tracker *tofree;
119     tofree = rc->checklist;
120     rc->checklist = tofree->next;
121     free_realtime_tracker(tofree);
122   }
123   if(rc->document_domain) free(rc->document_domain);
124   rc->document_domain = NULL;
125 }
126 int
127 stratcon_line_to_javascript(mtev_http_session_ctx *ctx, char *in_buff,
128                             u_int32_t *inc_id) {
129   char buffer[1024];
130   char *scp, *ecp, *token, *buff;
131   int i, len, cnt;
132   const char *v, *cb = NULL;
133   mtev_hash_table json = MTEV_HASH_EMPTY;
134   mtev_http_request *req = mtev_http_session_request(ctx);
135   char s_inc_id[42];
136   char **outrows = NULL;
137
138   cb = mtev_http_request_querystring(req, "cb");
139   for(v = cb; v && *v; v++)
140     if(!((*v >= '0' && *v <= '9') ||
141          (*v >= 'a' && *v <= 'z') ||
142          (*v >= 'A' && *v <= 'Z') ||
143          (*v == '_') || (*v == '.'))) {
144       cb = NULL;
145       break;
146     }
147   if(!cb) cb = "window.parent.plot_iframe_data";
148
149 #define BAIL_HTTP_WRITE do { \
150   if(outrows) { \
151     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]); \
152     free(outrows); \
153   } \
154   mtev_hash_destroy(&json, NULL, free); \
155   mtevL(noit_error, "javascript emit failed: %s:%s:%d\n", \
156         __FILE__, __FUNCTION__, __LINE__); \
157   return -1; \
158 } while(0)
159
160 #define PROCESS_NEXT_FIELD(t,l) do { \
161   if(!*scp) goto bad_row; \
162   ecp = strchr(scp, '\t'); \
163   if(!ecp) goto bad_row; \
164   t = scp; \
165   l = (ecp-scp); \
166   scp = ecp + 1; \
167 } while(0)
168 #define PROCESS_LAST_FIELD(t,l) do { \
169   if(!*scp) ecp = scp; \
170   else { \
171     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
172     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
173   } \
174   t = scp; \
175   l = (ecp-scp); \
176 } while(0)
177
178   mtevL(noit_error, "recv(%s)\n", in_buff);
179   if(in_buff[0] == 'B' && in_buff[1] != '\0' && in_buff[2] == '\t') {
180     cnt = noit_check_log_b_to_sm(in_buff, strlen(in_buff), &outrows, 0);
181   }
182   else {
183     cnt = 1;
184     outrows = malloc(sizeof(*outrows));
185     outrows[0] = strdup(in_buff);
186   }
187   for(i=0; i<cnt; i++) {
188     buff = outrows[i];
189     if(!buff) continue;
190     mtevL(noit_error, "recv_xlt(%s)\n", buff);
191     scp = buff;
192     PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
193     if(buff[1] == '\t' && (buff[0] == 'M' || buff[0] == 'S')) {
194       char target[256], module[256], name[256], uuid_str[UUID_STR_LEN+1];
195       mtev_http_request *req = mtev_http_session_request(ctx);
196       mtev_hash_table *qs;
197       mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
198       const char *key;
199       int klen, i=0;
200       void *vval;
201       char type[2] = { '\0', '\0' };
202       type[0] = buff[0];
203
204 #define ra_write(a,b) if(mtev_http_response_append(ctx, a, b) == mtev_false) BAIL_HTTP_WRITE
205
206       snprintf(s_inc_id, sizeof(s_inc_id), "script-%08x", (*inc_id)++);
207       snprintf(buffer, sizeof(buffer), "<script id=\"%s\">%s({", s_inc_id, cb);
208       ra_write(buffer, strlen(buffer));
209
210       qs = mtev_http_request_querystring_table(req);
211       while(mtev_hash_next(qs, &iter, &key, &klen, &vval)) {
212         if(!strcmp(key, "cb")) continue;
213         mtev_hash_store(&json, key, klen, strdup(vval ?(char *)vval : "true"));
214       }
215       /* Time */
216       mtev_hash_store(&json, "script_id", 9, strdup(s_inc_id));
217       mtev_hash_store(&json, "type", 4, strdup(type));
218       PROCESS_NEXT_FIELD(token,len);
219       mtev_hash_store(&json, "time", 4, mtev__strndup(token, len));
220       /* UUID */
221       PROCESS_NEXT_FIELD(token,len);
222       noit_check_extended_id_split(token, len, target, sizeof(target),
223                                    module, sizeof(module), name, sizeof(name),
224                                    uuid_str, sizeof(uuid_str));
225       if(*uuid_str)
226         mtev_hash_store(&json, "id", 2,
227                         mtev__strndup(uuid_str, strlen(uuid_str)));
228       if(*target)
229         mtev_hash_store(&json, "check_target", 12,
230                         mtev__strndup(target, strlen(target)));
231       if(*module)
232         mtev_hash_store(&json, "check_module", 12,
233                         mtev__strndup(module, strlen(module)));
234       if(*name)
235         mtev_hash_store(&json, "check_name", 10,
236                         mtev__strndup(name, strlen(name)));
237       if(buff[0] == 'M') {
238         /* name */
239         PROCESS_NEXT_FIELD(token,len);
240         mtev_hash_store(&json, "metric_name", 11, mtev__strndup(token, len));
241         /* type */
242         PROCESS_NEXT_FIELD(token,len);
243         mtev_hash_store(&json, "metric_type", 11, mtev__strndup(token, len));
244         /* value */
245         PROCESS_LAST_FIELD(token,len); /* value */
246         mtev_hash_store(&json, "value", 5, mtev__strndup(token, len));
247       }
248       else if(buff[0] == 'S') {
249         /* state */
250         PROCESS_NEXT_FIELD(token,len);
251         mtev_hash_store(&json, "check_state", 11, mtev__strndup(token, len));
252         /* availability */
253         PROCESS_NEXT_FIELD(token,len);
254         mtev_hash_store(&json, "check_availability", 18, mtev__strndup(token, len));
255         /* duration */
256         PROCESS_NEXT_FIELD(token,len);
257         mtev_hash_store(&json, "check_duration_ms", 17, mtev__strndup(token, len));
258         /* status */
259         PROCESS_LAST_FIELD(token,len);
260         mtev_hash_store(&json, "status_message", 14, mtev__strndup(token, len));
261       }
262
263       memset(&iter, 0, sizeof(iter));
264       while(mtev_hash_next(&json, &iter, &key, &klen, &vval)) {
265         char *val = (char *)vval;
266         if(i++) ra_write(",", 1);
267         ra_write("\"", 1);
268         ra_write(key, klen);
269         ra_write("\":\"", 3);
270         while(*val) {
271           if(*val == '\"' || *val == '\\') {
272             ra_write((char *)"\\", 1);
273           }
274           if(isprint(*val)) {
275             ra_write((char *)val, 1);
276           }
277           else {
278             char od[5];
279             snprintf(od, sizeof(od), "\\%03o", *((unsigned char *)val));
280             ra_write(od, strlen(od));
281           }
282           val++;
283         }
284         ra_write("\"", 1);
285       }
286       snprintf(buffer, sizeof(buffer), "});</script>\n");
287       ra_write(buffer, strlen(buffer));
288
289       if(mtev_http_response_flush(ctx, mtev_false) == mtev_false) BAIL_HTTP_WRITE;
290     }
291
292     mtev_hash_destroy(&json, NULL, free);
293     memset(&json, 0, sizeof(json));
294   }
295   if(outrows) {
296     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]);
297     free(outrows);
298   }
299
300   return 0;
301
302  bad_row:
303   BAIL_HTTP_WRITE;
304 }
305 int
306 stratcon_realtime_uri_parse(realtime_context *rc, const char *uri) {
307   int len, cnt = 0;
308   const char *cp, *interest;
309   char *copy, *brk;
310   if(strncmp(uri, "/data/", 6)) return 0;
311   cp = uri + 6;
312   len = strlen(cp);
313   copy = alloca(len + 1);
314   if(!copy) return 0;
315   memcpy(copy, cp, len);
316   copy[len] = '\0';
317
318   for (interest = strtok_r(copy, "/", &brk);
319        interest;
320        interest = strtok_r(NULL, "/", &brk)) {
321     uuid_t in_uuid;
322     struct realtime_tracker *node;
323     char *interval;
324
325     interval = strchr(interest, '@');
326     if(!interval)
327       interval = "5000";
328     else
329       *interval++ = '\0';
330     if(uuid_parse((char *)interest, in_uuid)) continue;
331     node = calloc(1, sizeof(*node));
332     node->rc = rc;
333     uuid_copy(node->checkid, in_uuid);
334     node->interval = atoi(interval);
335     node->next = rc->checklist;
336     rc->checklist = node;
337     cnt++;
338   }
339   return cnt;
340 }
341 static void
342 free_realtime_recv_ctx(void *vctx) {
343   realtime_recv_ctx_t *rrctx = vctx;
344   mtev_http_session_ctx *ctx = rrctx->ctx;
345   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
346
347   if(mtev_http_session_ref_dec(ctx) == 1) {
348     mtev_http_response_end(ctx);
349     clear_realtime_context(rc);
350     mtev_http_session_trigger(ctx, EVENTER_WRITE | EVENTER_EXCEPTION);
351   }
352   free(rrctx);
353 }
354 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
355 static int
356 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
357   int len, mask;
358   while(ctx->bytes_read < ctx->bytes_expected) {
359     len = Eread(ctx->buffer + ctx->bytes_read,
360                 ctx->bytes_expected - ctx->bytes_read);
361     if(len < 0) {
362       *newmask = mask;
363       return -1;
364     }
365     /* if we get 0 inside SSL, and there was a real error, we
366      * will actually get a -1 here.
367      * if(len == 0) return ctx->bytes_read;
368      */
369     ctx->bytes_read += len;
370   }
371   assert(ctx->bytes_read == ctx->bytes_expected);
372   return ctx->bytes_read;
373 }
374 #define FULLREAD(e,ctx,size) do { \
375   int mask, len; \
376   if(!ctx->bytes_expected) { \
377     ctx->bytes_expected = size; \
378     if(ctx->buffer) free(ctx->buffer); \
379     ctx->buffer = malloc(size + 1); \
380     if(ctx->buffer == NULL) { \
381       mtevL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
382       goto socket_error; \
383     } \
384     ctx->buffer[size] = '\0'; \
385   } \
386   len = __read_on_ctx(e, ctx, &mask); \
387   if(len < 0) { \
388     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
389     mtevL(noit_error, "SSL read error: %s\n", strerror(errno)); \
390     goto socket_error; \
391   } \
392   ctx->bytes_read = 0; \
393   ctx->bytes_expected = 0; \
394   if(len != size) { \
395     mtevL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
396           ctx->state, len, (unsigned long)size); \
397     goto socket_error; \
398   } \
399 } while(0)
400
401 int
402 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
403                                struct timeval *now) {
404   static u_int32_t livestream_cmd = 0;
405   mtev_connection_ctx_t *nctx = closure;
406   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
407   int len;
408   u_int32_t nint;
409   char uuid_str[37];
410
411   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
412
413   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
414  socket_error:
415     ctx->state = REALTIME_HTTP_WANT_INITIATE;
416     ctx->count = 0;
417     ctx->bytes_read = 0;
418     ctx->bytes_written = 0;
419     ctx->bytes_expected = 0;
420     if(ctx->buffer) free(ctx->buffer);
421     ctx->buffer = NULL;
422     /* We close the event here and null it in the context
423      * because the mtev_connection_ctx_dealloc() will both close
424      * it and free it (which our caller will double free) and
425      * we consider double frees to be harmful.
426      */
427     eventer_remove_fd(e->fd);
428     e->opset->close(e->fd, &mask, e);
429     nctx->e = NULL;
430     mtev_connection_ctx_dealloc(nctx);
431     return 0;
432   }
433
434 #define full_nb_write(data, wlen) do { \
435   if(!ctx->bytes_expected) { \
436     ctx->bytes_written = 0; \
437     ctx->bytes_expected = wlen; \
438   } \
439   while(ctx->bytes_written < ctx->bytes_expected) { \
440     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
441                                        ctx->bytes_expected - ctx->bytes_written, \
442                                        &mask, e)) && errno == EINTR); \
443     if(len < 0) { \
444       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
445       goto socket_error; \
446     } \
447     ctx->bytes_written += len; \
448   } \
449   if(ctx->bytes_written != ctx->bytes_expected) { \
450     mtevL(noit_error, "short write on initiating stream [%d != %d].\n", \
451           ctx->bytes_written, ctx->bytes_expected); \
452     goto socket_error; \
453   } \
454   ctx->bytes_expected = 0; \
455 } while(0)
456
457   mtev_connection_update_timeout(nctx);
458   while(1) {
459     u_int32_t net_body_len;
460
461     switch(ctx->state) {
462       case REALTIME_HTTP_WANT_INITIATE:
463         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
464         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
465         /* FALLTHROUGH */
466       case REALTIME_HTTP_WANT_SEND_INTERVAL:
467         nint = htonl(ctx->rt->interval);
468         full_nb_write(&nint, sizeof(nint));
469         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
470         /* FALLTHROUGH */
471       case REALTIME_HTTP_WANT_SEND_UUID:
472         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
473         full_nb_write(uuid_str, 36);
474         ctx->state = REALTIME_HTTP_WANT_HEADER;
475         /* FALLTHROUGH */
476       case REALTIME_HTTP_WANT_HEADER:
477         FULLREAD(e, ctx, sizeof(u_int32_t));
478         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
479         ctx->body_len = ntohl(net_body_len);
480         free(ctx->buffer); ctx->buffer = NULL;
481         ctx->state = REALTIME_HTTP_WANT_BODY;
482         break;
483       case REALTIME_HTTP_WANT_BODY:
484         FULLREAD(e, ctx, ctx->body_len);
485         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer, &ctx->hack_inc_id)) goto socket_error;
486         free(ctx->buffer); ctx->buffer = NULL;
487         ctx->state = REALTIME_HTTP_WANT_HEADER;
488         break;
489     }
490   }
491
492 }
493
494 int
495 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
496                                    struct timeval *now) {
497   mtev_http_session_ctx *ctx = closure;
498   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
499   struct realtime_tracker *node;
500
501   for(node = rc->checklist; node; node = node->next) {
502     if(node->noit) {
503       realtime_recv_ctx_t *rrctx;
504       rrctx = calloc(1, sizeof(*rrctx));
505       rrctx->ctx = ctx;
506       rrctx->rt = node;
507       stratcon_streamer_connection(NULL, node->noit, "noit",
508                                    stratcon_realtime_recv_handler,
509                                    NULL, rrctx,
510                                    free_realtime_recv_ctx);
511     }
512     else
513       mtev_http_session_ref_dec(ctx);
514   }
515   if(mtev_http_session_ref_cnt(ctx) == 1) {
516     mtev_http_response_end(ctx);
517     clear_realtime_context(rc);
518     mtev_http_session_trigger(ctx, EVENTER_WRITE);
519   }
520   return 0;
521 }
522 int
523 stratcon_request_dispatcher(mtev_http_session_ctx *ctx) {
524   const char *key, *value;
525   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
526   int klen;
527   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
528   mtev_http_request *req = mtev_http_session_request(ctx);
529
530   if(rc->setup == RC_INITIAL) {
531     eventer_t completion;
532     struct realtime_tracker *node;
533     char c[1024];
534     int num_interests;
535     const char *uri_str = mtev_http_request_uri_str(req);
536     mtev_hash_table *headers = mtev_http_request_headers_table(req);
537
538     num_interests = stratcon_realtime_uri_parse(rc, uri_str);
539     if(num_interests == 0) {
540       mtev_http_response_status_set(ctx, 404, "OK");
541       mtev_http_response_option_set(ctx, MTEV_HTTP_CLOSE);
542       mtev_http_response_end(ctx);
543       return 0;
544     }
545
546     mtevL(noit_error, "http: %s %s %s\n",
547           mtev_http_request_method_str(req), uri_str,
548           mtev_http_request_protocol_str(req));
549     while(mtev_hash_next_str(headers, &iter, &key, &klen, &value)) {
550       mtevL(noit_error, "http: [%s: %s]\n", key, value);
551     }
552     mtev_http_response_status_set(ctx, 200, "OK");
553     mtev_http_response_option_set(ctx, MTEV_HTTP_CHUNKED);
554     /*mtev_http_response_option_set(ctx, MTEV_HTTP_GZIP);*/
555     /*mtev_http_response_option_set(ctx, MTEV_HTTP_DEFLATE);*/
556     mtev_http_response_header_set(ctx, "Content-Type", "text/html");
557
558     snprintf(c, sizeof(c),
559              "<html><head><script>document.domain='%s';</script></head><body>\n",
560              rc->document_domain);
561     mtev_http_response_append(ctx, c, strlen(c));
562
563     /* this dumb crap is to make some browsers happy (Safari) */
564     memset(c, ' ', sizeof(c));
565     mtev_http_response_append(ctx, c, sizeof(c));
566     mtev_http_response_flush(ctx, mtev_false);
567
568     rc->setup = RC_REQ_RECV;
569     /* Each interest references the ctx */
570     for(node = rc->checklist; node; node = node->next) {
571       char uuid_str[UUID_STR_LEN+1];
572       mtev_http_session_ref_inc(ctx);
573       uuid_unparse_lower(node->checkid, uuid_str);
574       mtevL(noit_error, "Resolving uuid: %s\n", uuid_str);
575     }
576     completion = eventer_alloc();
577     completion->mask = EVENTER_TIMER;
578     completion->callback = stratcon_realtime_http_postresolve;
579     completion->closure = ctx;
580     gettimeofday(&completion->whence, NULL);
581     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL,
582                             rc->checklist, completion);
583   }
584   return EVENTER_EXCEPTION;
585 }
586 int
587 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
588                                struct timeval *now) {
589   int done = 0, rv;
590   acceptor_closure_t *ac = closure;
591   mtev_http_session_ctx *http_ctx = ac->service_ctx;
592   rv = mtev_http_session_drive(e, mask, http_ctx, now, &done);
593   if(done) acceptor_closure_free(ac);
594   return rv;
595 }
596 static int
597 rest_stream_data(mtev_http_rest_closure_t *restc,
598                  int npats, char **pats) {
599   /* We're here and want to subvert the rest system */
600   const char *document_domain = NULL;
601   mtev_http_session_ctx *ctx = restc->http_ctx;
602   mtev_http_connection *conn = mtev_http_session_connection(ctx);
603   eventer_t e;
604   acceptor_closure_t *ac = restc->ac;
605
606   /* Rewire the handler */
607   if(ac->service_ctx_free)
608     ac->service_ctx_free(ac->service_ctx);
609   ac->service_ctx = ctx;
610   ac->service_ctx_free = mtev_http_ctx_acceptor_free;
611
612   if(!mtev_hash_retr_str(ac->config,
613                          "document_domain", strlen("document_domain"),
614                          &document_domain)) {
615     mtevL(noit_error, "Document domain not set!  Realtime streaming will be broken\n");
616     document_domain = "";
617   }
618
619   mtev_http_process_querystring(mtev_http_session_request(ctx));
620   /* Rewire the http context */
621   e = mtev_http_connection_event(conn);
622   e->callback = stratcon_realtime_http_handler;
623   mtev_http_session_set_dispatcher(ctx, stratcon_request_dispatcher,
624                                    alloc_realtime_context(document_domain));
625   return stratcon_request_dispatcher(ctx);
626 }
627
628 void
629 stratcon_realtime_http_init(const char *toplevel) {
630   eventer_name_callback("stratcon_realtime_http",
631                         stratcon_realtime_http_handler);
632   eventer_name_callback("stratcon_realtime_recv",
633                         stratcon_realtime_recv_handler);
634   assert(mtev_http_rest_register_auth(
635     "GET", "/data/",
636            "^((?:" UUID_REGEX "(?:@\\d+)?)(?:/" UUID_REGEX "(?:@\\d+)?)*)$",
637     rest_stream_data, mtev_http_rest_client_cert_auth
638   ) == 0);
639   assert(mtev_http_rest_register_auth(
640     "GET", "/", "^(.*)$", mtev_rest_simple_file_handler,
641            mtev_http_rest_client_cert_auth
642   ) == 0);
643 }
Note: See TracBrowser for help on using the browser.