root/src/stratcon_realtime_http.c

Revision be29b6f498e50a6a8152c48d7ccc193a9e6bfdb7, 15.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fix accounting, refs #170

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "noit_jlog_listener.h"
40 #include "noit_listener.h"
41 #include "noit_http.h"
42 #include "noit_livestream_listener.h"
43 #include "stratcon_realtime_http.h"
44 #include "stratcon_jlog_streamer.h"
45 #include "stratcon_datastore.h"
46
47 #include <unistd.h>
48 #include <assert.h>
49 #include <errno.h>
50 #include <sys/types.h>
51 #include <sys/socket.h>
52 #ifdef HAVE_SYS_FILIO_H
53 #include <sys/filio.h>
54 #endif
55 #include <netinet/in.h>
56 #include <sys/un.h>
57 #include <arpa/inet.h>
58
59
60 typedef struct realtime_recv_ctx_t {
61   int bytes_expected;
62   int bytes_read;
63   int bytes_written;
64   int body_len;
65   char *buffer;         /* These guys are for doing partial reads */
66
67   enum {
68     REALTIME_HTTP_WANT_INITIATE = 0,
69     REALTIME_HTTP_WANT_SEND_INTERVAL = 1,
70     REALTIME_HTTP_WANT_SEND_UUID = 2,
71     REALTIME_HTTP_WANT_HEADER = 3,
72     REALTIME_HTTP_WANT_BODY = 4,
73   } state;
74   int count;            /* Number of jlog messages we need to read */
75   noit_http_session_ctx *ctx;
76   struct realtime_tracker *rt;
77 } realtime_recv_ctx_t;
78
79 typedef struct realtime_context {
80   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
81   struct realtime_tracker *checklist;
82   char *document_domain;
83 } realtime_context;
84
85 static realtime_context *alloc_realtime_context(const char *domain) {
86   realtime_context *ctx;
87   ctx = calloc(sizeof(*ctx), 1);
88   ctx->document_domain = strdup(domain);
89   return ctx;
90 }
91 static void free_realtime_tracker(struct realtime_tracker *rt) {
92   if(rt->noit) free(rt->noit);
93   free(rt);
94 }
95 static void clear_realtime_context(realtime_context *rc) {
96  rc->setup = RC_INITIAL;
97   while(rc->checklist) {
98     struct realtime_tracker *tofree;
99     tofree = rc->checklist;
100     rc->checklist = tofree->next;
101     free_realtime_tracker(tofree);
102   }
103   if(rc->document_domain) free(rc->document_domain);
104   rc->document_domain = NULL;
105 }
106 int
107 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) {
108   char buffer[1024];
109   char *scp, *ecp, *token;
110   int len;
111
112 #define BAIL_HTTP_WRITE do { \
113   noitL(noit_error, "javascript emit failed: %s:%s:%d\n", \
114         __FILE__, __FUNCTION__, __LINE__); \
115   return -1; \
116 } while(0)
117
118 #define PROCESS_NEXT_FIELD(t,l) do { \
119   if(!*scp) goto bad_row; \
120   ecp = strchr(scp, '\t'); \
121   if(!ecp) goto bad_row; \
122   t = scp; \
123   l = (ecp-scp); \
124   scp = ecp + 1; \
125 } while(0)
126 #define PROCESS_LAST_FIELD(t,l) do { \
127   if(!*scp) ecp = scp; \
128   else { \
129     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
130     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
131   } \
132   t = scp; \
133   l = (ecp-scp); \
134 } while(0)
135
136   scp = buff;
137   PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
138   if(buff[0] == 'M') {
139     snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('");
140     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
141
142     /* Time */
143     PROCESS_NEXT_FIELD(token,len);
144     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
145
146     snprintf(buffer, sizeof(buffer), "', '");
147     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
148
149     /* UUID */
150     PROCESS_NEXT_FIELD(token,len);
151     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
152
153     snprintf(buffer, sizeof(buffer), "', '");
154     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
155
156     /* name */
157     PROCESS_NEXT_FIELD(token,len);
158     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
159
160     snprintf(buffer, sizeof(buffer), "', '");
161     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
162
163     PROCESS_NEXT_FIELD(token,len); /* skip type */
164     PROCESS_LAST_FIELD(token,len); /* value */
165     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
166
167     snprintf(buffer, sizeof(buffer), "');</script>\n");
168     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
169
170     if(noit_http_response_flush(ctx, noit_false) == noit_false) BAIL_HTTP_WRITE;
171   }
172
173   return 0;
174
175  bad_row:
176   BAIL_HTTP_WRITE;
177   if(0) {
178     noit_http_response_end(ctx);
179     clear_realtime_context(ctx->dispatcher_closure);
180     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
181     return 0;
182   }
183 }
184 int
185 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) {
186   int len, cnt = 0;
187   char *cp, *copy, *interest, *brk;
188   if(strncmp(uri, "/data/", 6)) return 0;
189   cp = uri + 6;
190   len = strlen(cp);
191   copy = alloca(len + 1);
192   if(!copy) return 0;
193   memcpy(copy, cp, len);
194   copy[len] = '\0';
195
196   for (interest = strtok_r(copy, "/", &brk);
197        interest;
198        interest = strtok_r(NULL, "/", &brk)) {
199     struct realtime_tracker *node;
200     char *interval;
201
202     interval = strchr(interest, '@');
203     if(!interval)
204       interval = "5000";
205     else
206       *interval++ = '\0';
207     node = calloc(1, sizeof(*node));
208     node->rc = rc;
209     node->sid = atoi(interest);
210     node->interval = atoi(interval);
211     node->next = rc->checklist;
212     rc->checklist = node;
213     cnt++;
214   }
215   return cnt;
216 }
217 static void
218 free_realtime_recv_ctx(void *vctx) {
219   realtime_recv_ctx_t *rrctx = vctx;
220   noit_atomic_dec32(&rrctx->ctx->ref_cnt);
221   free(rrctx);
222 }
223 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
224 static int
225 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
226   int len, mask;
227   while(ctx->bytes_read < ctx->bytes_expected) {
228     len = Eread(ctx->buffer + ctx->bytes_read,
229                 ctx->bytes_expected - ctx->bytes_read);
230     if(len < 0) {
231       *newmask = mask;
232       return -1;
233     }
234     /* if we get 0 inside SSL, and there was a real error, we
235      * will actually get a -1 here.
236      * if(len == 0) return ctx->bytes_read;
237      */
238     ctx->bytes_read += len;
239   }
240   assert(ctx->bytes_read == ctx->bytes_expected);
241   return ctx->bytes_read;
242 }
243 #define FULLREAD(e,ctx,size) do { \
244   int mask, len; \
245   if(!ctx->bytes_expected) { \
246     ctx->bytes_expected = size; \
247     if(ctx->buffer) free(ctx->buffer); \
248     ctx->buffer = malloc(size + 1); \
249     if(ctx->buffer == NULL) { \
250       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
251       goto socket_error; \
252     } \
253     ctx->buffer[size] = '\0'; \
254   } \
255   len = __read_on_ctx(e, ctx, &mask); \
256   if(len < 0) { \
257     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
258     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
259     goto socket_error; \
260   } \
261   ctx->bytes_read = 0; \
262   ctx->bytes_expected = 0; \
263   if(len != size) { \
264     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
265           ctx->state, len, (unsigned long)size); \
266     goto socket_error; \
267   } \
268 } while(0)
269
270 int
271 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
272                                struct timeval *now) {
273   static u_int32_t livestream_cmd = 0;
274   noit_connection_ctx_t *nctx = closure;
275   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
276   int len;
277   u_int32_t nint;
278   char uuid_str[37];
279
280   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
281
282   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
283  socket_error:
284     ctx->state = REALTIME_HTTP_WANT_INITIATE;
285     ctx->count = 0;
286     ctx->bytes_read = 0;
287     ctx->bytes_written = 0;
288     ctx->bytes_expected = 0;
289     if(ctx->buffer) free(ctx->buffer);
290     ctx->buffer = NULL;
291     noit_connection_ctx_dealloc(nctx);
292     eventer_remove_fd(e->fd);
293     e->opset->close(e->fd, &mask, e);
294     return 0;
295   }
296
297 #define full_nb_write(data, wlen) do { \
298   if(!ctx->bytes_expected) { \
299     ctx->bytes_written = 0; \
300     ctx->bytes_expected = wlen; \
301   } \
302   while(ctx->bytes_written < ctx->bytes_expected) { \
303     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
304                                        ctx->bytes_expected - ctx->bytes_written, \
305                                        &mask, e)) && errno == EINTR); \
306     if(len < 0) { \
307       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
308       goto socket_error; \
309     } \
310     ctx->bytes_written += len; \
311   } \
312   if(ctx->bytes_written != ctx->bytes_expected) { \
313     noitL(noit_error, "short write on initiating stream [%d != %d].\n", \
314           ctx->bytes_written, ctx->bytes_expected); \
315     goto socket_error; \
316   } \
317   ctx->bytes_expected = 0; \
318 } while(0)
319
320   while(1) {
321     u_int32_t net_body_len;
322
323     switch(ctx->state) {
324       case REALTIME_HTTP_WANT_INITIATE:
325         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
326         ctx->state = REALTIME_HTTP_WANT_SEND_INTERVAL;
327       case REALTIME_HTTP_WANT_SEND_INTERVAL:
328         nint = htonl(ctx->rt->interval);
329         full_nb_write(&nint, sizeof(nint));
330         ctx->state = REALTIME_HTTP_WANT_SEND_UUID;
331       case REALTIME_HTTP_WANT_SEND_UUID:
332         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
333         full_nb_write(uuid_str, 36);
334         ctx->state = REALTIME_HTTP_WANT_HEADER;
335       case REALTIME_HTTP_WANT_HEADER:
336         FULLREAD(e, ctx, sizeof(u_int32_t));
337         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
338         ctx->body_len = ntohl(net_body_len);
339         free(ctx->buffer); ctx->buffer = NULL;
340         ctx->state = REALTIME_HTTP_WANT_BODY;
341         break;
342       case REALTIME_HTTP_WANT_BODY:
343         FULLREAD(e, ctx, ctx->body_len);
344         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error;
345         free(ctx->buffer); ctx->buffer = NULL;
346         ctx->state = REALTIME_HTTP_WANT_HEADER;
347         break;
348     }
349   }
350
351 }
352
353 int
354 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
355                                    struct timeval *now) {
356   noit_http_session_ctx *ctx = closure;
357   realtime_context *rc = ctx->dispatcher_closure;
358   struct realtime_tracker *node;
359
360   for(node = rc->checklist; node; node = node->next) {
361     if(node->noit) {
362       realtime_recv_ctx_t *rrctx;
363       rrctx = calloc(1, sizeof(*rrctx));
364       rrctx->ctx = ctx;
365       rrctx->rt = node;
366       stratcon_streamer_connection(NULL, node->noit,
367                                    stratcon_realtime_recv_handler,
368                                    NULL, rrctx,
369                                    free_realtime_recv_ctx);
370     }
371     else
372       noit_atomic_dec32(&ctx->ref_cnt);
373   }
374   if(ctx->ref_cnt == 1) {
375     noit_http_response_end(ctx);
376     clear_realtime_context(rc);
377     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
378   }
379   return 0;
380 }
381 int
382 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
383   const char *key, *value;
384   realtime_context *rc = ctx->dispatcher_closure;
385   int klen;
386   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
387   noit_http_request *req = &ctx->req;
388
389   if(rc->setup == RC_INITIAL) {
390     eventer_t completion;
391     struct realtime_tracker *node;
392     char c[1024];
393     int num_interests;
394
395     num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str);
396     if(num_interests == 0) {
397       noit_http_response_status_set(ctx, 404, "OK");
398       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
399       noit_http_response_end(ctx);
400       return 0;
401     }
402
403     noitL(noit_error, "http: %s %s %s\n",
404           req->method_str, req->uri_str, req->protocol_str);
405     while(noit_hash_next_str(&req->headers, &iter, &key, &klen, &value)) {
406       noitL(noit_error, "http: [%s: %s]\n", key, value);
407     }
408     noit_http_response_status_set(ctx, 200, "OK");
409     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
410     /*noit_http_response_option_set(ctx, NOIT_HTTP_GZIP);*/
411     /*noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);*/
412     noit_http_response_header_set(ctx, "Content-Type", "text/html");
413
414     snprintf(c, sizeof(c),
415              "<html><head><script>document.domain='%s';</script></head><body>\n",
416              rc->document_domain);
417     noit_http_response_append(ctx, c, strlen(c));
418
419     /* this dumb crap is to make some browsers happy (Safari) */
420     memset(c, ' ', sizeof(c));
421     noit_http_response_append(ctx, c, sizeof(c));
422     noit_http_response_flush(ctx, noit_false);
423
424     rc->setup = RC_REQ_RECV;
425     /* Each interest references the ctx */
426     for(node = rc->checklist; node; node = node->next) {
427       noit_atomic_inc32(&ctx->ref_cnt);
428       stratcon_datastore_push(DS_OP_FIND, NULL, node);
429       noitL(noit_error, "Resolving sid: %d\n", node->sid);
430     }
431     completion = eventer_alloc();
432     completion->mask = EVENTER_TIMER;
433     completion->callback = stratcon_realtime_http_postresolve;
434     completion->closure = ctx;
435     gettimeofday(&completion->whence, NULL);
436     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion);
437   }
438   return EVENTER_EXCEPTION;
439 }
440
441 int
442 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
443                                struct timeval *now) {
444   acceptor_closure_t *ac = closure;
445   noit_http_session_ctx *http_ctx = ac->service_ctx;
446   if(!http_ctx) {
447     const char *document_domain = NULL;
448
449     if(!noit_hash_retr_str(ac->config,
450                            "document_domain", strlen("document_domain"),
451                            &document_domain)) {
452       noitL(noit_error, "Document domain not set!  Realtime streaming will be broken\n");
453       document_domain = "";
454     }
455
456     http_ctx = ac->service_ctx =
457       noit_http_session_ctx_new(stratcon_request_dispatcher,
458                                 alloc_realtime_context(document_domain),
459                                 e);
460   }
461   return http_ctx->drive(e, mask, http_ctx, now);
462 }
463
464 void
465 stratcon_realtime_http_init(const char *toplevel) {
466   eventer_name_callback("stratcon_realtime_http",
467                         stratcon_realtime_http_handler);
468 }
Note: See TracBrowser for help on using the browser.