root/src/stratcon_realtime_http.c

Revision 2ff4db5a6730270eb30827e23883ed354c42ddf6, 21.2 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 5 months ago)

Explicitly Initialize Mtev Hash Tables

Rather than using MTEV_HASH_EMPTY or not calling any initialization at
all, explicitly initialize hash tables.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <jlog.h>
37
38 #include <eventer/eventer.h>
39 #include <mtev_conf.h>
40 #include <mtev_listener.h>
41 #include <mtev_http.h>
42 #include <mtev_rest.h>
43 #include <mtev_hash.h>
44 #include <mtev_log.h>
45 #include <mtev_str.h>
46
47 #include "noit_mtev_bridge.h"
48 #include "noit_jlog_listener.h"
49 #include "noit_check.h"
50 #include "noit_check_log_helpers.h"
51 #include "noit_livestream_listener.h"
52 #include "stratcon_realtime_http.h"
53 #include "stratcon_jlog_streamer.h"
54 #include "stratcon_datastore.h"
55
56 #include <ctype.h>
57 #include <unistd.h>
58 #include <errno.h>
59 #include <sys/types.h>
60 #include <sys/socket.h>
61 #ifdef HAVE_SYS_FILIO_H
62 #include <sys/filio.h>
63 #endif
64 #include <netinet/in.h>
65 #include <sys/un.h>
66 #include <arpa/inet.h>
67
68 /*
69  * it appears that GCC 4.5.2 incorrectly thinks that FULLREAD uses "mask"
70  * without initializing it, so disable that specific warning for this file
71  * for now
72  */
73
74 #if __GNUC__ == 4 && __GNUC_MINOR__ == 5 && __GNUC_PATCHLEVEL__ == 2
75 #pragma GCC diagnostic ignored "-Wuninitialized"
76 #endif
77
78 typedef struct realtime_recv_ctx_t {
79   int bytes_expected;
80   int bytes_read;
81   int bytes_written;
82   int body_len;
83   char *buffer;         /* These guys are for doing partial reads */
84
85   enum {
86     REALTIME_HTTP_WANT_INITIATE = 0,
87     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
88     REALTIME_HTTP_WANT_SEND_UUID = 2,
89     REALTIME_HTTP_WANT_HEADER = 3,
90     REALTIME_HTTP_WANT_BODY = 4,
91   } state;
92   int count;            /* Number of jlog messages we need to read */
93   u_int32_t hack_inc_id;
94   mtev_http_session_ctx *ctx;
95   struct realtime_tracker *rt;
96 } realtime_recv_ctx_t;
97
98 typedef struct realtime_context {
99   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
100   struct realtime_tracker *checklist;
101   char *document_domain;
102 } realtime_context;
103
104 static realtime_context *alloc_realtime_context(const char *domain) {
105   realtime_context *ctx;
106   ctx = calloc(sizeof(*ctx), 1);
107   ctx->document_domain = strdup(domain);
108   return ctx;
109 }
110 static void free_realtime_tracker(struct realtime_tracker *rt) {
111   if(rt->noit) free(rt->noit);
112   free(rt);
113 }
114 static void clear_realtime_context(realtime_context *rc) {
115  rc->setup = RC_INITIAL;
116   while(rc->checklist) {
117     struct realtime_tracker *tofree;
118     tofree = rc->checklist;
119     rc->checklist = tofree->next;
120     free_realtime_tracker(tofree);
121   }
122   if(rc->document_domain) free(rc->document_domain);
123   rc->document_domain = NULL;
124 }
125 int
126 stratcon_line_to_javascript(mtev_http_session_ctx *ctx, char *in_buff,
127                             u_int32_t *inc_id) {
128   char buffer[1024];
129   char *scp, *ecp, *token, *buff;
130   int i, len, cnt;
131   const char *v, *cb = NULL;
132   mtev_hash_table json;
133   mtev_http_request *req = mtev_http_session_request(ctx);
134   char s_inc_id[42];
135   char **outrows = NULL;
136
137   mtev_hash_init(&json);
138
139   cb = mtev_http_request_querystring(req, "cb");
140   for(v = cb; v && *v; v++)
141     if(!((*v >= '0' && *v <= '9') ||
142          (*v >= 'a' && *v <= 'z') ||
143          (*v >= 'A' && *v <= 'Z') ||
144          (*v == '_') || (*v == '.'))) {
145       cb = NULL;
146       break;
147     }
148   if(!cb) cb = "window.parent.plot_iframe_data";
149
150 #define BAIL_HTTP_WRITE do { \
151   if(outrows) { \
152     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]); \
153     free(outrows); \
154   } \
155   mtev_hash_destroy(&json, NULL, free); \
156   mtevL(noit_error, "javascript emit failed: %s:%s:%d\n", \
157         __FILE__, __FUNCTION__, __LINE__); \
158   return -1; \
159 } while(0)
160
161 #define PROCESS_NEXT_FIELD(t,l) do { \
162   if(!*scp) goto bad_row; \
163   ecp = strchr(scp, '\t'); \
164   if(!ecp) goto bad_row; \
165   t = scp; \
166   l = (ecp-scp); \
167   scp = ecp + 1; \
168 } while(0)
169 #define PROCESS_LAST_FIELD(t,l) do { \
170   if(!*scp) ecp = scp; \
171   else { \
172     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
173     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
174   } \
175   t = scp; \
176   l = (ecp-scp); \
177 } while(0)
178
179   mtevL(noit_error, "recv(%s)\n", in_buff);
180   if(in_buff[0] == 'B' && in_buff[1] != '\0' && in_buff[2] == '\t') {
181     cnt = noit_check_log_b_to_sm(in_buff, strlen(in_buff), &outrows, 0);
182   }
183   else {
184     cnt = 1;
185     outrows = malloc(sizeof(*outrows));
186     outrows[0] = strdup(in_buff);
187   }
188   for(i=0; i<cnt; i++) {
189     buff = outrows[i];
190     if(!buff) continue;
191     mtevL(noit_error, "recv_xlt(%s)\n", buff);
192     scp = buff;
193     PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
194     if(buff[1] == '\t' && (buff[0] == 'M' || buff[0] == 'S')) {
195       char target[256], module[256], name[256], uuid_str[UUID_STR_LEN+1];
196       mtev_http_request *req = mtev_http_session_request(ctx);
197       mtev_hash_table *qs;
198       mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
199       const char *key;
200       int klen, i=0;
201       void *vval;
202       char type[2] = { '\0', '\0' };
203       type[0] = buff[0];
204
205 #define ra_write(a,b) if(mtev_http_response_append(ctx, a, b) == mtev_false) BAIL_HTTP_WRITE
206
207       snprintf(s_inc_id, sizeof(s_inc_id), "script-%08x", (*inc_id)++);
208       snprintf(buffer, sizeof(buffer), "<script id=\"%s\">%s({", s_inc_id, cb);
209       ra_write(buffer, strlen(buffer));
210
211       qs = mtev_http_request_querystring_table(req);
212       while(mtev_hash_next(qs, &iter, &key, &klen, &vval)) {
213         if(!strcmp(key, "cb")) continue;
214         mtev_hash_store(&json, key, klen, strdup(vval ?(char *)vval : "true"));
215       }
216       /* Time */
217       mtev_hash_store(&json, "script_id", 9, strdup(s_inc_id));
218       mtev_hash_store(&json, "type", 4, strdup(type));
219       PROCESS_NEXT_FIELD(token,len);
220       mtev_hash_store(&json, "time", 4, mtev__strndup(token, len));
221       /* UUID */
222       PROCESS_NEXT_FIELD(token,len);
223       noit_check_extended_id_split(token, len, target, sizeof(target),
224                                    module, sizeof(module), name, sizeof(name),
225                                    uuid_str, sizeof(uuid_str));
226       if(*uuid_str)
227         mtev_hash_store(&json, "id", 2,
228                         mtev__strndup(uuid_str, strlen(uuid_str)));
229       if(*target)
230         mtev_hash_store(&json, "check_target", 12,
231                         mtev__strndup(target, strlen(target)));
232       if(*module)
233         mtev_hash_store(&json, "check_module", 12,
234                         mtev__strndup(module, strlen(module)));
235       if(*name)
236         mtev_hash_store(&json, "check_name", 10,
237                         mtev__strndup(name, strlen(name)));
238       if(buff[0] == 'M') {
239         /* name */
240         PROCESS_NEXT_FIELD(token,len);
241         mtev_hash_store(&json, "metric_name", 11, mtev__strndup(token, len));
242         /* type */
243         PROCESS_NEXT_FIELD(token,len);
244         mtev_hash_store(&json, "metric_type", 11, mtev__strndup(token, len));
245         /* value */
246         PROCESS_LAST_FIELD(token,len); /* value */
247         mtev_hash_store(&json, "value", 5, mtev__strndup(token, len));
248       }
249       else if(buff[0] == 'S') {
250         /* state */
251         PROCESS_NEXT_FIELD(token,len);
252         mtev_hash_store(&json, "check_state", 11, mtev__strndup(token, len));
253         /* availability */
254         PROCESS_NEXT_FIELD(token,len);
255         mtev_hash_store(&json, "check_availability", 18, mtev__strndup(token, len));
256         /* duration */
257         PROCESS_NEXT_FIELD(token,len);
258         mtev_hash_store(&json, "check_duration_ms", 17, mtev__strndup(token, len));
259         /* status */
260         PROCESS_LAST_FIELD(token,len);
261         mtev_hash_store(&json, "status_message", 14, mtev__strndup(token, len));
262       }
263
264       memset(&iter, 0, sizeof(iter));
265       while(mtev_hash_next(&json, &iter, &key, &klen, &vval)) {
266         char *val = (char *)vval;
267         if(i++) ra_write(",", 1);
268         ra_write("\"", 1);
269         ra_write(key, klen);
270         ra_write("\":\"", 3);
271         while(*val) {
272           if(*val == '\"' || *val == '\\') {
273             ra_write((char *)"\\", 1);
274           }
275           if(isprint(*val)) {
276             ra_write((char *)val, 1);
277           }
278           else {
279             char od[5];
280             snprintf(od, sizeof(od), "\\%03o", *((unsigned char *)val));
281             ra_write(od, strlen(od));
282           }
283           val++;
284         }
285         ra_write("\"", 1);
286       }
287       snprintf(buffer, sizeof(buffer), "});</script>\n");
288       ra_write(buffer, strlen(buffer));
289
290       if(mtev_http_response_flush(ctx, mtev_false) == mtev_false) BAIL_HTTP_WRITE;
291     }
292
293     mtev_hash_destroy(&json, NULL, free);
294     memset(&json, 0, sizeof(json));
295   }
296   if(outrows) {
297     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]);
298     free(outrows);
299   }
300
301   return 0;
302
303  bad_row:
304   BAIL_HTTP_WRITE;
305 }
306 int
307 stratcon_realtime_uri_parse(realtime_context *rc, const char *uri) {
308   int len, cnt = 0;
309   const char *cp, *interest;
310   char *copy, *brk;
311   if(strncmp(uri, "/data/", 6)) return 0;
312   cp = uri + 6;
313   len = strlen(cp);
314   copy = alloca(len + 1);
315   if(!copy) return 0;
316   memcpy(copy, cp, len);
317   copy[len] = '\0';
318
319   for (interest = strtok_r(copy, "/", &brk);
320        interest;
321        interest = strtok_r(NULL, "/", &brk)) {
322     uuid_t in_uuid;
323     struct realtime_tracker *node;
324     char *interval;
325
326     interval = strchr(interest, '@');
327     if(!interval)
328       interval = "5000";
329     else
330       *interval++ = '\0';
331     if(uuid_parse((char *)interest, in_uuid)) continue;
332     node = calloc(1, sizeof(*node));
333     node->rc = rc;
334     uuid_copy(node->checkid, in_uuid);
335     node->interval = atoi(interval);
336     node->next = rc->checklist;
337     rc->checklist = node;
338     cnt++;
339   }
340   return cnt;
341 }
342 static void
343 free_realtime_recv_ctx(void *vctx) {
344   realtime_recv_ctx_t *rrctx = vctx;
345   mtev_http_session_ctx *ctx = rrctx->ctx;
346   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
347
348   if(mtev_http_session_ref_dec(ctx) == 1) {
349     mtev_http_response_end(ctx);
350     clear_realtime_context(rc);
351     mtev_http_session_trigger(ctx, EVENTER_WRITE | EVENTER_EXCEPTION);
352   }
353   free(rrctx);
354 }
355 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
356 static int
357 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
358   int len, mask;
359   while(ctx->bytes_read < ctx->bytes_expected) {
360     len = Eread(ctx->buffer + ctx->bytes_read,
361                 ctx->bytes_expected - ctx->bytes_read);
362     if(len < 0) {
363       *newmask = mask;
364       return -1;
365     }
366     /* if we get 0 inside SSL, and there was a real error, we
367      * will actually get a -1 here.
368      * if(len == 0) return ctx->bytes_read;
369      */
370     ctx->bytes_read += len;
371   }
372   mtevAssert(ctx->bytes_read == ctx->bytes_expected);
373   return ctx->bytes_read;
374 }
375 #define FULLREAD(e,ctx,size) do { \
376   int mask, len; \
377   if(!ctx->bytes_expected) { \
378     ctx->bytes_expected = size; \
379     if(ctx->buffer) free(ctx->buffer); \
380     ctx->buffer = malloc(size + 1); \
381     if(ctx->buffer == NULL) { \
382       mtevL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
383       goto socket_error; \
384     } \
385     ctx->buffer[size] = '\0'; \
386   } \
387   len = __read_on_ctx(e, ctx, &mask); \
388   if(len < 0) { \
389     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
390     mtevL(noit_error, "SSL read error: %s\n", strerror(errno)); \
391     goto socket_error; \
392   } \
393   ctx->bytes_read = 0; \
394   ctx->bytes_expected = 0; \
395   if(len != size) { \
396     mtevL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
397           ctx->state, len, (unsigned long)size); \
398     goto socket_error; \
399   } \
400 } while(0)
401
402 int
403 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
404                                struct timeval *now) {
405   static u_int32_t livestream_cmd = 0;
406   mtev_connection_ctx_t *nctx = closure;
407   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
408   int len;
409   u_int32_t nint;
410   char uuid_str[37];
411
412   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
413
414   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
415  socket_error:
416     ctx->state = REALTIME_HTTP_WANT_INITIATE;
417     ctx->count = 0;
418     ctx->bytes_read = 0;
419     ctx->bytes_written = 0;
420     ctx->bytes_expected = 0;
421     if(ctx->buffer) free(ctx->buffer);
422     ctx->buffer = NULL;
423     /* We close the event here and null it in the context
424      * because the mtev_connection_ctx_dealloc() will both close
425      * it and free it (which our caller will double free) and
426      * we consider double frees to be harmful.
427      */
428     eventer_remove_fd(e->fd);
429     e->opset->close(e->fd, &mask, e);
430     nctx->e = NULL;
431     mtev_connection_ctx_dealloc(nctx);
432     return 0;
433   }
434
435 #define full_nb_write(data, wlen) do { \
436   if(!ctx->bytes_expected) { \
437     ctx->bytes_written = 0; \
438     ctx->bytes_expected = wlen; \
439   } \
440   while(ctx->bytes_written < ctx->bytes_expected) { \
441     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
442                                        ctx->bytes_expected - ctx->bytes_written, \
443                                        &mask, e)) && errno == EINTR); \
444     if(len < 0) { \
445       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
446       goto socket_error; \
447     } \
448     ctx->bytes_written += len; \
449   } \
450   if(ctx->bytes_written != ctx->bytes_expected) { \
451     mtevL(noit_error, "short write on initiating stream [%d != %d].\n", \
452           ctx->bytes_written, ctx->bytes_expected); \
453     goto socket_error; \
454   } \
455   ctx->bytes_expected = 0; \
456 } while(0)
457
458   mtev_connection_update_timeout(nctx);
459   while(1) {
460     u_int32_t net_body_len;
461
462     switch(ctx->state) {
463       case REALTIME_HTTP_WANT_INITIATE:
464         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
465         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
466         /* FALLTHROUGH */
467       case REALTIME_HTTP_WANT_SEND_INTERVAL:
468         nint = htonl(ctx->rt->interval);
469         full_nb_write(&nint, sizeof(nint));
470         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
471         /* FALLTHROUGH */
472       case REALTIME_HTTP_WANT_SEND_UUID:
473         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
474         full_nb_write(uuid_str, 36);
475         ctx->state = REALTIME_HTTP_WANT_HEADER;
476         /* FALLTHROUGH */
477       case REALTIME_HTTP_WANT_HEADER:
478         FULLREAD(e, ctx, sizeof(u_int32_t));
479         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
480         ctx->body_len = ntohl(net_body_len);
481         free(ctx->buffer); ctx->buffer = NULL;
482         ctx->state = REALTIME_HTTP_WANT_BODY;
483         break;
484       case REALTIME_HTTP_WANT_BODY:
485         FULLREAD(e, ctx, ctx->body_len);
486         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer, &ctx->hack_inc_id)) goto socket_error;
487         free(ctx->buffer); ctx->buffer = NULL;
488         ctx->state = REALTIME_HTTP_WANT_HEADER;
489         break;
490     }
491   }
492
493 }
494
495 int
496 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
497                                    struct timeval *now) {
498   mtev_http_session_ctx *ctx = closure;
499   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
500   struct realtime_tracker *node;
501
502   for(node = rc->checklist; node; node = node->next) {
503     if(node->noit) {
504       realtime_recv_ctx_t *rrctx;
505       rrctx = calloc(1, sizeof(*rrctx));
506       rrctx->ctx = ctx;
507       rrctx->rt = node;
508       stratcon_streamer_connection(NULL, node->noit, "noit",
509                                    stratcon_realtime_recv_handler,
510                                    NULL, rrctx,
511                                    free_realtime_recv_ctx);
512     }
513     else
514       mtev_http_session_ref_dec(ctx);
515   }
516   if(mtev_http_session_ref_cnt(ctx) == 1) {
517     mtev_http_response_end(ctx);
518     clear_realtime_context(rc);
519     mtev_http_session_trigger(ctx, EVENTER_WRITE);
520   }
521   return 0;
522 }
523 int
524 stratcon_request_dispatcher(mtev_http_session_ctx *ctx) {
525   const char *key, *value;
526   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
527   int klen;
528   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
529   mtev_http_request *req = mtev_http_session_request(ctx);
530
531   if(rc->setup == RC_INITIAL) {
532     eventer_t completion;
533     struct realtime_tracker *node;
534     char c[1024];
535     int num_interests;
536     const char *uri_str = mtev_http_request_uri_str(req);
537     mtev_hash_table *headers = mtev_http_request_headers_table(req);
538
539     num_interests = stratcon_realtime_uri_parse(rc, uri_str);
540     if(num_interests == 0) {
541       mtev_http_response_status_set(ctx, 404, "OK");
542       mtev_http_response_option_set(ctx, MTEV_HTTP_CLOSE);
543       mtev_http_response_end(ctx);
544       return 0;
545     }
546
547     mtevL(noit_error, "http: %s %s %s\n",
548           mtev_http_request_method_str(req), uri_str,
549           mtev_http_request_protocol_str(req));
550     while(mtev_hash_next_str(headers, &iter, &key, &klen, &value)) {
551       mtevL(noit_error, "http: [%s: %s]\n", key, value);
552     }
553     mtev_http_response_status_set(ctx, 200, "OK");
554     mtev_http_response_option_set(ctx, MTEV_HTTP_CHUNKED);
555     /*mtev_http_response_option_set(ctx, MTEV_HTTP_GZIP);*/
556     /*mtev_http_response_option_set(ctx, MTEV_HTTP_DEFLATE);*/
557     mtev_http_response_header_set(ctx, "Content-Type", "text/html");
558
559     snprintf(c, sizeof(c),
560              "<html><head><script>document.domain='%s';</script></head><body>\n",
561              rc->document_domain);
562     mtev_http_response_append(ctx, c, strlen(c));
563
564     /* this dumb crap is to make some browsers happy (Safari) */
565     memset(c, ' ', sizeof(c));
566     mtev_http_response_append(ctx, c, sizeof(c));
567     mtev_http_response_flush(ctx, mtev_false);
568
569     rc->setup = RC_REQ_RECV;
570     /* Each interest references the ctx */
571     for(node = rc->checklist; node; node = node->next) {
572       char uuid_str[UUID_STR_LEN+1];
573       mtev_http_session_ref_inc(ctx);
574       uuid_unparse_lower(node->checkid, uuid_str);
575       mtevL(noit_error, "Resolving uuid: %s\n", uuid_str);
576     }
577     completion = eventer_alloc();
578     completion->mask = EVENTER_TIMER;
579     completion->callback = stratcon_realtime_http_postresolve;
580     completion->closure = ctx;
581     gettimeofday(&completion->whence, NULL);
582     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL,
583                             rc->checklist, completion);
584   }
585   return EVENTER_EXCEPTION;
586 }
587 int
588 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
589                                struct timeval *now) {
590   int done = 0, rv;
591   acceptor_closure_t *ac = closure;
592   mtev_http_session_ctx *http_ctx = ac->service_ctx;
593   rv = mtev_http_session_drive(e, mask, http_ctx, now, &done);
594   if(done) acceptor_closure_free(ac);
595   return rv;
596 }
597 static int
598 rest_stream_data(mtev_http_rest_closure_t *restc,
599                  int npats, char **pats) {
600   /* We're here and want to subvert the rest system */
601   const char *document_domain = NULL;
602   mtev_http_session_ctx *ctx = restc->http_ctx;
603   mtev_http_connection *conn = mtev_http_session_connection(ctx);
604   eventer_t e;
605   acceptor_closure_t *ac = restc->ac;
606
607   /* Rewire the handler */
608   if(ac->service_ctx_free)
609     ac->service_ctx_free(ac->service_ctx);
610   ac->service_ctx = ctx;
611   ac->service_ctx_free = mtev_http_ctx_acceptor_free;
612
613   if(!mtev_hash_retr_str(ac->config,
614                          "document_domain", strlen("document_domain"),
615                          &document_domain)) {
616     mtevL(noit_error, "Document domain not set!  Realtime streaming will be broken\n");
617     document_domain = "";
618   }
619
620   mtev_http_process_querystring(mtev_http_session_request(ctx));
621   /* Rewire the http context */
622   e = mtev_http_connection_event(conn);
623   e->callback = stratcon_realtime_http_handler;
624   mtev_http_session_set_dispatcher(ctx, stratcon_request_dispatcher,
625                                    alloc_realtime_context(document_domain));
626   return stratcon_request_dispatcher(ctx);
627 }
628
629 void
630 stratcon_realtime_http_init(const char *toplevel) {
631   eventer_name_callback("stratcon_realtime_http",
632                         stratcon_realtime_http_handler);
633   eventer_name_callback("stratcon_realtime_recv",
634                         stratcon_realtime_recv_handler);
635   mtevAssert(mtev_http_rest_register_auth(
636     "GET", "/data/",
637            "^((?:" UUID_REGEX "(?:@\\d+)?)(?:/" UUID_REGEX "(?:@\\d+)?)*)$",
638     rest_stream_data, mtev_http_rest_client_cert_auth
639   ) == 0);
640   mtevAssert(mtev_http_rest_register_auth(
641     "GET", "/", "^(.*)$", mtev_rest_simple_file_handler,
642            mtev_http_rest_client_cert_auth
643   ) == 0);
644 }
Note: See TracBrowser for help on using the browser.