root/src/stratcon_realtime_http.c

Revision 70c30ef22f6d1312375e0392164d755db5824cb5, 21.2 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 5 days ago)

Use mtevAssert and mtevFatal Instead Of assert() and abort()

Use libmtev calls to safely flush logs and abort rather than calling
the assert and abort calls directly.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <jlog.h>
37
38 #include <eventer/eventer.h>
39 #include <mtev_conf.h>
40 #include <mtev_listener.h>
41 #include <mtev_http.h>
42 #include <mtev_rest.h>
43 #include <mtev_hash.h>
44 #include <mtev_log.h>
45 #include <mtev_str.h>
46
47 #include "noit_mtev_bridge.h"
48 #include "noit_jlog_listener.h"
49 #include "noit_check.h"
50 #include "noit_check_log_helpers.h"
51 #include "noit_livestream_listener.h"
52 #include "stratcon_realtime_http.h"
53 #include "stratcon_jlog_streamer.h"
54 #include "stratcon_datastore.h"
55
56 #include <ctype.h>
57 #include <unistd.h>
58 #include <errno.h>
59 #include <sys/types.h>
60 #include <sys/socket.h>
61 #ifdef HAVE_SYS_FILIO_H
62 #include <sys/filio.h>
63 #endif
64 #include <netinet/in.h>
65 #include <sys/un.h>
66 #include <arpa/inet.h>
67
68 /*
69  * it appears that GCC 4.5.2 incorrectly thinks that FULLREAD uses "mask"
70  * without initializing it, so disable that specific warning for this file
71  * for now
72  */
73
74 #if __GNUC__ == 4 && __GNUC_MINOR__ == 5 && __GNUC_PATCHLEVEL__ == 2
75 #pragma GCC diagnostic ignored "-Wuninitialized"
76 #endif
77
78 typedef struct realtime_recv_ctx_t {
79   int bytes_expected;
80   int bytes_read;
81   int bytes_written;
82   int body_len;
83   char *buffer;         /* These guys are for doing partial reads */
84
85   enum {
86     REALTIME_HTTP_WANT_INITIATE = 0,
87     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
88     REALTIME_HTTP_WANT_SEND_UUID = 2,
89     REALTIME_HTTP_WANT_HEADER = 3,
90     REALTIME_HTTP_WANT_BODY = 4,
91   } state;
92   int count;            /* Number of jlog messages we need to read */
93   u_int32_t hack_inc_id;
94   mtev_http_session_ctx *ctx;
95   struct realtime_tracker *rt;
96 } realtime_recv_ctx_t;
97
98 typedef struct realtime_context {
99   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
100   struct realtime_tracker *checklist;
101   char *document_domain;
102 } realtime_context;
103
104 static realtime_context *alloc_realtime_context(const char *domain) {
105   realtime_context *ctx;
106   ctx = calloc(sizeof(*ctx), 1);
107   ctx->document_domain = strdup(domain);
108   return ctx;
109 }
110 static void free_realtime_tracker(struct realtime_tracker *rt) {
111   if(rt->noit) free(rt->noit);
112   free(rt);
113 }
114 static void clear_realtime_context(realtime_context *rc) {
115  rc->setup = RC_INITIAL;
116   while(rc->checklist) {
117     struct realtime_tracker *tofree;
118     tofree = rc->checklist;
119     rc->checklist = tofree->next;
120     free_realtime_tracker(tofree);
121   }
122   if(rc->document_domain) free(rc->document_domain);
123   rc->document_domain = NULL;
124 }
125 int
126 stratcon_line_to_javascript(mtev_http_session_ctx *ctx, char *in_buff,
127                             u_int32_t *inc_id) {
128   char buffer[1024];
129   char *scp, *ecp, *token, *buff;
130   int i, len, cnt;
131   const char *v, *cb = NULL;
132   mtev_hash_table json = MTEV_HASH_EMPTY;
133   mtev_http_request *req = mtev_http_session_request(ctx);
134   char s_inc_id[42];
135   char **outrows = NULL;
136
137   cb = mtev_http_request_querystring(req, "cb");
138   for(v = cb; v && *v; v++)
139     if(!((*v >= '0' && *v <= '9') ||
140          (*v >= 'a' && *v <= 'z') ||
141          (*v >= 'A' && *v <= 'Z') ||
142          (*v == '_') || (*v == '.'))) {
143       cb = NULL;
144       break;
145     }
146   if(!cb) cb = "window.parent.plot_iframe_data";
147
148 #define BAIL_HTTP_WRITE do { \
149   if(outrows) { \
150     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]); \
151     free(outrows); \
152   } \
153   mtev_hash_destroy(&json, NULL, free); \
154   mtevL(noit_error, "javascript emit failed: %s:%s:%d\n", \
155         __FILE__, __FUNCTION__, __LINE__); \
156   return -1; \
157 } while(0)
158
159 #define PROCESS_NEXT_FIELD(t,l) do { \
160   if(!*scp) goto bad_row; \
161   ecp = strchr(scp, '\t'); \
162   if(!ecp) goto bad_row; \
163   t = scp; \
164   l = (ecp-scp); \
165   scp = ecp + 1; \
166 } while(0)
167 #define PROCESS_LAST_FIELD(t,l) do { \
168   if(!*scp) ecp = scp; \
169   else { \
170     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
171     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
172   } \
173   t = scp; \
174   l = (ecp-scp); \
175 } while(0)
176
177   mtevL(noit_error, "recv(%s)\n", in_buff);
178   if(in_buff[0] == 'B' && in_buff[1] != '\0' && in_buff[2] == '\t') {
179     cnt = noit_check_log_b_to_sm(in_buff, strlen(in_buff), &outrows, 0);
180   }
181   else {
182     cnt = 1;
183     outrows = malloc(sizeof(*outrows));
184     outrows[0] = strdup(in_buff);
185   }
186   for(i=0; i<cnt; i++) {
187     buff = outrows[i];
188     if(!buff) continue;
189     mtevL(noit_error, "recv_xlt(%s)\n", buff);
190     scp = buff;
191     PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
192     if(buff[1] == '\t' && (buff[0] == 'M' || buff[0] == 'S')) {
193       char target[256], module[256], name[256], uuid_str[UUID_STR_LEN+1];
194       mtev_http_request *req = mtev_http_session_request(ctx);
195       mtev_hash_table *qs;
196       mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
197       const char *key;
198       int klen, i=0;
199       void *vval;
200       char type[2] = { '\0', '\0' };
201       type[0] = buff[0];
202
203 #define ra_write(a,b) if(mtev_http_response_append(ctx, a, b) == mtev_false) BAIL_HTTP_WRITE
204
205       snprintf(s_inc_id, sizeof(s_inc_id), "script-%08x", (*inc_id)++);
206       snprintf(buffer, sizeof(buffer), "<script id=\"%s\">%s({", s_inc_id, cb);
207       ra_write(buffer, strlen(buffer));
208
209       qs = mtev_http_request_querystring_table(req);
210       while(mtev_hash_next(qs, &iter, &key, &klen, &vval)) {
211         if(!strcmp(key, "cb")) continue;
212         mtev_hash_store(&json, key, klen, strdup(vval ?(char *)vval : "true"));
213       }
214       /* Time */
215       mtev_hash_store(&json, "script_id", 9, strdup(s_inc_id));
216       mtev_hash_store(&json, "type", 4, strdup(type));
217       PROCESS_NEXT_FIELD(token,len);
218       mtev_hash_store(&json, "time", 4, mtev__strndup(token, len));
219       /* UUID */
220       PROCESS_NEXT_FIELD(token,len);
221       noit_check_extended_id_split(token, len, target, sizeof(target),
222                                    module, sizeof(module), name, sizeof(name),
223                                    uuid_str, sizeof(uuid_str));
224       if(*uuid_str)
225         mtev_hash_store(&json, "id", 2,
226                         mtev__strndup(uuid_str, strlen(uuid_str)));
227       if(*target)
228         mtev_hash_store(&json, "check_target", 12,
229                         mtev__strndup(target, strlen(target)));
230       if(*module)
231         mtev_hash_store(&json, "check_module", 12,
232                         mtev__strndup(module, strlen(module)));
233       if(*name)
234         mtev_hash_store(&json, "check_name", 10,
235                         mtev__strndup(name, strlen(name)));
236       if(buff[0] == 'M') {
237         /* name */
238         PROCESS_NEXT_FIELD(token,len);
239         mtev_hash_store(&json, "metric_name", 11, mtev__strndup(token, len));
240         /* type */
241         PROCESS_NEXT_FIELD(token,len);
242         mtev_hash_store(&json, "metric_type", 11, mtev__strndup(token, len));
243         /* value */
244         PROCESS_LAST_FIELD(token,len); /* value */
245         mtev_hash_store(&json, "value", 5, mtev__strndup(token, len));
246       }
247       else if(buff[0] == 'S') {
248         /* state */
249         PROCESS_NEXT_FIELD(token,len);
250         mtev_hash_store(&json, "check_state", 11, mtev__strndup(token, len));
251         /* availability */
252         PROCESS_NEXT_FIELD(token,len);
253         mtev_hash_store(&json, "check_availability", 18, mtev__strndup(token, len));
254         /* duration */
255         PROCESS_NEXT_FIELD(token,len);
256         mtev_hash_store(&json, "check_duration_ms", 17, mtev__strndup(token, len));
257         /* status */
258         PROCESS_LAST_FIELD(token,len);
259         mtev_hash_store(&json, "status_message", 14, mtev__strndup(token, len));
260       }
261
262       memset(&iter, 0, sizeof(iter));
263       while(mtev_hash_next(&json, &iter, &key, &klen, &vval)) {
264         char *val = (char *)vval;
265         if(i++) ra_write(",", 1);
266         ra_write("\"", 1);
267         ra_write(key, klen);
268         ra_write("\":\"", 3);
269         while(*val) {
270           if(*val == '\"' || *val == '\\') {
271             ra_write((char *)"\\", 1);
272           }
273           if(isprint(*val)) {
274             ra_write((char *)val, 1);
275           }
276           else {
277             char od[5];
278             snprintf(od, sizeof(od), "\\%03o", *((unsigned char *)val));
279             ra_write(od, strlen(od));
280           }
281           val++;
282         }
283         ra_write("\"", 1);
284       }
285       snprintf(buffer, sizeof(buffer), "});</script>\n");
286       ra_write(buffer, strlen(buffer));
287
288       if(mtev_http_response_flush(ctx, mtev_false) == mtev_false) BAIL_HTTP_WRITE;
289     }
290
291     mtev_hash_destroy(&json, NULL, free);
292     memset(&json, 0, sizeof(json));
293   }
294   if(outrows) {
295     for(i=0;i<cnt;i++) if(outrows[i]) free(outrows[i]);
296     free(outrows);
297   }
298
299   return 0;
300
301  bad_row:
302   BAIL_HTTP_WRITE;
303 }
304 int
305 stratcon_realtime_uri_parse(realtime_context *rc, const char *uri) {
306   int len, cnt = 0;
307   const char *cp, *interest;
308   char *copy, *brk;
309   if(strncmp(uri, "/data/", 6)) return 0;
310   cp = uri + 6;
311   len = strlen(cp);
312   copy = alloca(len + 1);
313   if(!copy) return 0;
314   memcpy(copy, cp, len);
315   copy[len] = '\0';
316
317   for (interest = strtok_r(copy, "/", &brk);
318        interest;
319        interest = strtok_r(NULL, "/", &brk)) {
320     uuid_t in_uuid;
321     struct realtime_tracker *node;
322     char *interval;
323
324     interval = strchr(interest, '@');
325     if(!interval)
326       interval = "5000";
327     else
328       *interval++ = '\0';
329     if(uuid_parse((char *)interest, in_uuid)) continue;
330     node = calloc(1, sizeof(*node));
331     node->rc = rc;
332     uuid_copy(node->checkid, in_uuid);
333     node->interval = atoi(interval);
334     node->next = rc->checklist;
335     rc->checklist = node;
336     cnt++;
337   }
338   return cnt;
339 }
340 static void
341 free_realtime_recv_ctx(void *vctx) {
342   realtime_recv_ctx_t *rrctx = vctx;
343   mtev_http_session_ctx *ctx = rrctx->ctx;
344   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
345
346   if(mtev_http_session_ref_dec(ctx) == 1) {
347     mtev_http_response_end(ctx);
348     clear_realtime_context(rc);
349     mtev_http_session_trigger(ctx, EVENTER_WRITE | EVENTER_EXCEPTION);
350   }
351   free(rrctx);
352 }
353 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
354 static int
355 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
356   int len, mask;
357   while(ctx->bytes_read < ctx->bytes_expected) {
358     len = Eread(ctx->buffer + ctx->bytes_read,
359                 ctx->bytes_expected - ctx->bytes_read);
360     if(len < 0) {
361       *newmask = mask;
362       return -1;
363     }
364     /* if we get 0 inside SSL, and there was a real error, we
365      * will actually get a -1 here.
366      * if(len == 0) return ctx->bytes_read;
367      */
368     ctx->bytes_read += len;
369   }
370   mtevAssert(ctx->bytes_read == ctx->bytes_expected);
371   return ctx->bytes_read;
372 }
373 #define FULLREAD(e,ctx,size) do { \
374   int mask, len; \
375   if(!ctx->bytes_expected) { \
376     ctx->bytes_expected = size; \
377     if(ctx->buffer) free(ctx->buffer); \
378     ctx->buffer = malloc(size + 1); \
379     if(ctx->buffer == NULL) { \
380       mtevL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
381       goto socket_error; \
382     } \
383     ctx->buffer[size] = '\0'; \
384   } \
385   len = __read_on_ctx(e, ctx, &mask); \
386   if(len < 0) { \
387     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
388     mtevL(noit_error, "SSL read error: %s\n", strerror(errno)); \
389     goto socket_error; \
390   } \
391   ctx->bytes_read = 0; \
392   ctx->bytes_expected = 0; \
393   if(len != size) { \
394     mtevL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
395           ctx->state, len, (unsigned long)size); \
396     goto socket_error; \
397   } \
398 } while(0)
399
400 int
401 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
402                                struct timeval *now) {
403   static u_int32_t livestream_cmd = 0;
404   mtev_connection_ctx_t *nctx = closure;
405   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
406   int len;
407   u_int32_t nint;
408   char uuid_str[37];
409
410   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
411
412   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
413  socket_error:
414     ctx->state = REALTIME_HTTP_WANT_INITIATE;
415     ctx->count = 0;
416     ctx->bytes_read = 0;
417     ctx->bytes_written = 0;
418     ctx->bytes_expected = 0;
419     if(ctx->buffer) free(ctx->buffer);
420     ctx->buffer = NULL;
421     /* We close the event here and null it in the context
422      * because the mtev_connection_ctx_dealloc() will both close
423      * it and free it (which our caller will double free) and
424      * we consider double frees to be harmful.
425      */
426     eventer_remove_fd(e->fd);
427     e->opset->close(e->fd, &mask, e);
428     nctx->e = NULL;
429     mtev_connection_ctx_dealloc(nctx);
430     return 0;
431   }
432
433 #define full_nb_write(data, wlen) do { \
434   if(!ctx->bytes_expected) { \
435     ctx->bytes_written = 0; \
436     ctx->bytes_expected = wlen; \
437   } \
438   while(ctx->bytes_written < ctx->bytes_expected) { \
439     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
440                                        ctx->bytes_expected - ctx->bytes_written, \
441                                        &mask, e)) && errno == EINTR); \
442     if(len < 0) { \
443       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
444       goto socket_error; \
445     } \
446     ctx->bytes_written += len; \
447   } \
448   if(ctx->bytes_written != ctx->bytes_expected) { \
449     mtevL(noit_error, "short write on initiating stream [%d != %d].\n", \
450           ctx->bytes_written, ctx->bytes_expected); \
451     goto socket_error; \
452   } \
453   ctx->bytes_expected = 0; \
454 } while(0)
455
456   mtev_connection_update_timeout(nctx);
457   while(1) {
458     u_int32_t net_body_len;
459
460     switch(ctx->state) {
461       case REALTIME_HTTP_WANT_INITIATE:
462         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
463         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
464         /* FALLTHROUGH */
465       case REALTIME_HTTP_WANT_SEND_INTERVAL:
466         nint = htonl(ctx->rt->interval);
467         full_nb_write(&nint, sizeof(nint));
468         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
469         /* FALLTHROUGH */
470       case REALTIME_HTTP_WANT_SEND_UUID:
471         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
472         full_nb_write(uuid_str, 36);
473         ctx->state = REALTIME_HTTP_WANT_HEADER;
474         /* FALLTHROUGH */
475       case REALTIME_HTTP_WANT_HEADER:
476         FULLREAD(e, ctx, sizeof(u_int32_t));
477         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
478         ctx->body_len = ntohl(net_body_len);
479         free(ctx->buffer); ctx->buffer = NULL;
480         ctx->state = REALTIME_HTTP_WANT_BODY;
481         break;
482       case REALTIME_HTTP_WANT_BODY:
483         FULLREAD(e, ctx, ctx->body_len);
484         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer, &ctx->hack_inc_id)) goto socket_error;
485         free(ctx->buffer); ctx->buffer = NULL;
486         ctx->state = REALTIME_HTTP_WANT_HEADER;
487         break;
488     }
489   }
490
491 }
492
493 int
494 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
495                                    struct timeval *now) {
496   mtev_http_session_ctx *ctx = closure;
497   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
498   struct realtime_tracker *node;
499
500   for(node = rc->checklist; node; node = node->next) {
501     if(node->noit) {
502       realtime_recv_ctx_t *rrctx;
503       rrctx = calloc(1, sizeof(*rrctx));
504       rrctx->ctx = ctx;
505       rrctx->rt = node;
506       stratcon_streamer_connection(NULL, node->noit, "noit",
507                                    stratcon_realtime_recv_handler,
508                                    NULL, rrctx,
509                                    free_realtime_recv_ctx);
510     }
511     else
512       mtev_http_session_ref_dec(ctx);
513   }
514   if(mtev_http_session_ref_cnt(ctx) == 1) {
515     mtev_http_response_end(ctx);
516     clear_realtime_context(rc);
517     mtev_http_session_trigger(ctx, EVENTER_WRITE);
518   }
519   return 0;
520 }
521 int
522 stratcon_request_dispatcher(mtev_http_session_ctx *ctx) {
523   const char *key, *value;
524   realtime_context *rc = mtev_http_session_dispatcher_closure(ctx);
525   int klen;
526   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
527   mtev_http_request *req = mtev_http_session_request(ctx);
528
529   if(rc->setup == RC_INITIAL) {
530     eventer_t completion;
531     struct realtime_tracker *node;
532     char c[1024];
533     int num_interests;
534     const char *uri_str = mtev_http_request_uri_str(req);
535     mtev_hash_table *headers = mtev_http_request_headers_table(req);
536
537     num_interests = stratcon_realtime_uri_parse(rc, uri_str);
538     if(num_interests == 0) {
539       mtev_http_response_status_set(ctx, 404, "OK");
540       mtev_http_response_option_set(ctx, MTEV_HTTP_CLOSE);
541       mtev_http_response_end(ctx);
542       return 0;
543     }
544
545     mtevL(noit_error, "http: %s %s %s\n",
546           mtev_http_request_method_str(req), uri_str,
547           mtev_http_request_protocol_str(req));
548     while(mtev_hash_next_str(headers, &iter, &key, &klen, &value)) {
549       mtevL(noit_error, "http: [%s: %s]\n", key, value);
550     }
551     mtev_http_response_status_set(ctx, 200, "OK");
552     mtev_http_response_option_set(ctx, MTEV_HTTP_CHUNKED);
553     /*mtev_http_response_option_set(ctx, MTEV_HTTP_GZIP);*/
554     /*mtev_http_response_option_set(ctx, MTEV_HTTP_DEFLATE);*/
555     mtev_http_response_header_set(ctx, "Content-Type", "text/html");
556
557     snprintf(c, sizeof(c),
558              "<html><head><script>document.domain='%s';</script></head><body>\n",
559              rc->document_domain);
560     mtev_http_response_append(ctx, c, strlen(c));
561
562     /* this dumb crap is to make some browsers happy (Safari) */
563     memset(c, ' ', sizeof(c));
564     mtev_http_response_append(ctx, c, sizeof(c));
565     mtev_http_response_flush(ctx, mtev_false);
566
567     rc->setup = RC_REQ_RECV;
568     /* Each interest references the ctx */
569     for(node = rc->checklist; node; node = node->next) {
570       char uuid_str[UUID_STR_LEN+1];
571       mtev_http_session_ref_inc(ctx);
572       uuid_unparse_lower(node->checkid, uuid_str);
573       mtevL(noit_error, "Resolving uuid: %s\n", uuid_str);
574     }
575     completion = eventer_alloc();
576     completion->mask = EVENTER_TIMER;
577     completion->callback = stratcon_realtime_http_postresolve;
578     completion->closure = ctx;
579     gettimeofday(&completion->whence, NULL);
580     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, NULL,
581                             rc->checklist, completion);
582   }
583   return EVENTER_EXCEPTION;
584 }
585 int
586 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
587                                struct timeval *now) {
588   int done = 0, rv;
589   acceptor_closure_t *ac = closure;
590   mtev_http_session_ctx *http_ctx = ac->service_ctx;
591   rv = mtev_http_session_drive(e, mask, http_ctx, now, &done);
592   if(done) acceptor_closure_free(ac);
593   return rv;
594 }
595 static int
596 rest_stream_data(mtev_http_rest_closure_t *restc,
597                  int npats, char **pats) {
598   /* We're here and want to subvert the rest system */
599   const char *document_domain = NULL;
600   mtev_http_session_ctx *ctx = restc->http_ctx;
601   mtev_http_connection *conn = mtev_http_session_connection(ctx);
602   eventer_t e;
603   acceptor_closure_t *ac = restc->ac;
604
605   /* Rewire the handler */
606   if(ac->service_ctx_free)
607     ac->service_ctx_free(ac->service_ctx);
608   ac->service_ctx = ctx;
609   ac->service_ctx_free = mtev_http_ctx_acceptor_free;
610
611   if(!mtev_hash_retr_str(ac->config,
612                          "document_domain", strlen("document_domain"),
613                          &document_domain)) {
614     mtevL(noit_error, "Document domain not set!  Realtime streaming will be broken\n");
615     document_domain = "";
616   }
617
618   mtev_http_process_querystring(mtev_http_session_request(ctx));
619   /* Rewire the http context */
620   e = mtev_http_connection_event(conn);
621   e->callback = stratcon_realtime_http_handler;
622   mtev_http_session_set_dispatcher(ctx, stratcon_request_dispatcher,
623                                    alloc_realtime_context(document_domain));
624   return stratcon_request_dispatcher(ctx);
625 }
626
627 void
628 stratcon_realtime_http_init(const char *toplevel) {
629   eventer_name_callback("stratcon_realtime_http",
630                         stratcon_realtime_http_handler);
631   eventer_name_callback("stratcon_realtime_recv",
632                         stratcon_realtime_recv_handler);
633   mtevAssert(mtev_http_rest_register_auth(
634     "GET", "/data/",
635            "^((?:" UUID_REGEX "(?:@\\d+)?)(?:/" UUID_REGEX "(?:@\\d+)?)*)$",
636     rest_stream_data, mtev_http_rest_client_cert_auth
637   ) == 0);
638   mtevAssert(mtev_http_rest_register_auth(
639     "GET", "/", "^(.*)$", mtev_rest_simple_file_handler,
640            mtev_http_rest_client_cert_auth
641   ) == 0);
642 }
Note: See TracBrowser for help on using the browser.