root/src/stratcon_realtime_http.c

Revision 21b0c6c1011f78327925b8a1914d98cdd5cd43d5, 12.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

first whack at feeding actual data. refs #71

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "noit_listener.h"
14 #include "noit_http.h"
15 #include "noit_livestream_listener.h"
16 #include "stratcon_realtime_http.h"
17 #include "stratcon_jlog_streamer.h"
18 #include "stratcon_datastore.h"
19
20 #include <unistd.h>
21 #include <assert.h>
22 #include <errno.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #ifdef HAVE_SYS_FILIO_H
26 #include <sys/filio.h>
27 #endif
28 #include <netinet/in.h>
29 #include <sys/un.h>
30 #include <arpa/inet.h>
31
32
33 typedef struct realtime_recv_ctx_t {
34   int bytes_expected;
35   int bytes_read;
36   char *buffer;         /* These guys are for doing partial reads */
37
38   enum {
39     WANT_INITIATE = 0,
40     WANT_SEND_INTERVAL = 1,
41     WANT_SEND_UUID = 2,
42     WANT_HEADER = 3,
43     WANT_BODY = 4,
44   } state;
45   int count;            /* Number of jlog messages we need to read */
46   noit_http_session_ctx *ctx;
47   struct realtime_tracker *rt;
48 } realtime_recv_ctx_t;
49
50 typedef struct realtime_context {
51   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
52   struct realtime_tracker *checklist;
53 } realtime_context;
54
55 static realtime_context *alloc_realtime_context() {
56   realtime_context *ctx;
57   return calloc(sizeof(*ctx), 1);
58 }
59 static void free_realtime_tracker(struct realtime_tracker *rt) {
60   if(rt->noit) free(rt->noit);
61   free(rt);
62 }
63 static void clear_realtime_context(realtime_context *rc) {
64   while(rc->checklist) {
65     struct realtime_tracker *tofree;
66     tofree = rc->checklist;
67     rc->checklist = tofree->next;
68     free_realtime_tracker(tofree);
69   }
70 }
71 int
72 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) {
73   char buffer[1024];
74   char *scp, *ecp, *token;
75   int len;
76
77 #define PROCESS_NEXT_FIELD(t,l) do { \
78   if(!*scp) goto bad_row; \
79   ecp = strchr(scp, '\t'); \
80   if(!ecp) goto bad_row; \
81   t = scp; \
82   l = (ecp-scp); \
83   scp = ecp + 1; \
84 } while(0)
85 #define PROCESS_LAST_FIELD(t,l) do { \
86   if(!*scp) ecp = scp; \
87   else { \
88     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
89     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
90   } \
91   t = scp; \
92   l = (ecp-scp); \
93 } while(0)
94
95   PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
96   if(buff[0] == 'M') {
97     scp = buff;
98     snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('");
99     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1;
100
101     /* Time */
102     PROCESS_NEXT_FIELD(token,len);
103     if(noit_http_response_append(ctx, token, len) == noit_false) return -1;
104
105     snprintf(buffer, sizeof(buffer), "', '");
106     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1;
107
108     /* UUID */
109     PROCESS_NEXT_FIELD(token,len);
110     if(noit_http_response_append(ctx, token, len) == noit_false) return -1;
111
112     snprintf(buffer, sizeof(buffer), "', '");
113     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1;
114
115     /* name */
116     PROCESS_NEXT_FIELD(token,len);
117     if(noit_http_response_append(ctx, token, len) == noit_false) return -1;
118
119     snprintf(buffer, sizeof(buffer), "', '");
120     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1;
121
122     PROCESS_NEXT_FIELD(token,len); /* skip type */
123     PROCESS_LAST_FIELD(token,len); /* value */
124     if(noit_http_response_append(ctx, token, len) == noit_false) return -1;
125
126     snprintf(buffer, sizeof(buffer), "');</script>\n");
127     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1;
128
129     if(noit_http_response_flush(ctx, noit_false) == noit_false) return -1;
130   }
131
132   return 0;
133
134  bad_row:
135   return -1;
136   if(0) {
137     noit_http_response_end(ctx);
138     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
139     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
140     return 0;
141   }
142 }
143 int
144 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) {
145   int len, cnt = 0;
146   char *cp, *copy, *interest, *brk;
147   if(strncmp(uri, "/data/", 6)) return 0;
148   cp = uri + 6;
149   len = strlen(cp);
150   copy = alloca(len + 1);
151   if(!copy) return 0;
152   memcpy(copy, cp, len);
153   copy[len] = '\0';
154
155   for (interest = strtok_r(copy, "/", &brk);
156        interest;
157        interest = strtok_r(NULL, "/", &brk)) {
158     struct realtime_tracker *node;
159     char *interval;
160
161     interval = strchr(interest, '@');
162     if(!interval) return 0;
163     *interval++ = '\0';
164     node = calloc(1, sizeof(*node));
165     node->rc = rc;
166     node->sid = atoi(interest);
167     node->interval = atoi(interval);
168     node->next = rc->checklist;
169     rc->checklist = node;
170     cnt++;
171   }
172   return cnt;
173 }
174 static void
175 free_realtime_recv_ctx(void *vctx) {
176   realtime_recv_ctx_t *rrctx = vctx;
177   noit_atomic_dec32(&rrctx->ctx->ref_cnt);
178   free(rrctx);
179 }
180 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
181 static int
182 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
183   int len, mask;
184   while(ctx->bytes_read < ctx->bytes_expected) {
185     len = Eread(ctx->buffer + ctx->bytes_read,
186                 ctx->bytes_expected - ctx->bytes_read);
187     if(len < 0) {
188       *newmask = mask;
189       return -1;
190     }
191     /* if we get 0 inside SSL, and there was a real error, we
192      * will actually get a -1 here.
193      * if(len == 0) return ctx->bytes_read;
194      */
195     ctx->bytes_read += len;
196   }
197   assert(ctx->bytes_read == ctx->bytes_expected);
198   return ctx->bytes_read;
199 }
200 #define FULLREAD(e,ctx,size) do { \
201   int mask, len; \
202   if(!ctx->bytes_expected) { \
203     ctx->bytes_expected = size; \
204     if(ctx->buffer) free(ctx->buffer); \
205     ctx->buffer = malloc(size + 1); \
206     if(ctx->buffer == NULL) { \
207       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
208       goto socket_error; \
209     } \
210     ctx->buffer[size] = '\0'; \
211   } \
212   len = __read_on_ctx(e, ctx, &mask); \
213   if(len < 0) { \
214     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
215     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
216     goto socket_error; \
217   } \
218   ctx->bytes_read = 0; \
219   ctx->bytes_expected = 0; \
220   if(len != size) { \
221     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
222           ctx->state, len, (unsigned long)size); \
223     goto socket_error; \
224   } \
225 } while(0)
226
227 int
228 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
229                                struct timeval *now) {
230   static u_int32_t livestream_cmd = 0;
231   noit_connection_ctx_t *nctx = closure;
232   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
233   int len;
234   u_int32_t nint;
235   char uuid_str[37];
236
237   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
238
239   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
240  socket_error:
241     ctx->state = WANT_INITIATE;
242     ctx->count = 0;
243     ctx->bytes_read = 0;
244     ctx->bytes_expected = 0;
245     if(ctx->buffer) free(ctx->buffer);
246     ctx->buffer = NULL;
247     eventer_remove_fd(e->fd);
248     e->opset->close(e->fd, &mask, e);
249     return 0;
250   }
251
252 #define full_nb_write(data, wlen) do { \
253   len = e->opset->write(e->fd, data, wlen, \
254                         &mask, e); \
255   if(len < 0) { \
256     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
257     goto socket_error; \
258   } \
259   if(len != sizeof(livestream_cmd)) { \
260     noitL(noit_error, "short write on initiating stream.\n"); \
261     goto socket_error; \
262   } \
263 } while(0)
264
265   while(1) {
266     switch(ctx->state) {
267       case WANT_INITIATE:
268         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
269         ctx->state = WANT_SEND_INTERVAL;
270       case WANT_SEND_INTERVAL:
271         nint = htonl(ctx->rt->interval);
272         full_nb_write(&nint, sizeof(nint));
273         ctx->state = WANT_SEND_UUID;
274       case WANT_SEND_UUID:
275         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
276         full_nb_write(uuid_str, 36);
277         ctx->state = WANT_HEADER;
278       case WANT_HEADER:
279         FULLREAD(e, ctx, sizeof(u_int32_t));
280         memcpy(&ctx->bytes_expected, ctx->buffer, sizeof(u_int32_t));
281         ctx->bytes_expected = ntohl(ctx->bytes_expected);
282         free(ctx->buffer); ctx->buffer = NULL;
283         ctx->state = WANT_BODY;
284         break;
285       case WANT_BODY:
286         FULLREAD(e, ctx, ctx->bytes_expected);
287         noitL(noit_error, "Read: '%s'\n", ctx->buffer);
288         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error;
289         free(ctx->buffer); ctx->buffer = NULL;
290         ctx->state = WANT_BODY;
291         break;
292     }
293   }
294
295 }
296
297 int
298 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
299                                    struct timeval *now) {
300   noit_http_session_ctx *ctx = closure;
301   realtime_context *rc = ctx->dispatcher_closure;
302   struct realtime_tracker *node;
303
304   for(node = rc->checklist; node; node = node->next) {
305     if(node->noit) {
306       realtime_recv_ctx_t *rrctx;
307       rrctx = calloc(1, sizeof(*rrctx));
308       rrctx->ctx = ctx;
309       rrctx->rt = node;
310       stratcon_streamer_connection(NULL, node->noit,
311                                    stratcon_realtime_recv_handler,
312                                    NULL, rrctx,
313                                    free_realtime_recv_ctx);
314     }
315     else
316       noit_atomic_dec32(&ctx->ref_cnt);
317   }
318   if(ctx->ref_cnt == 1) {
319     noit_http_response_end(ctx);
320     clear_realtime_context(rc);
321     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
322     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
323   }
324   return 0;
325 }
326 int
327 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
328   const char *key, *value;
329   realtime_context *rc = ctx->dispatcher_closure;
330   int klen;
331   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
332   noit_http_request *req = &ctx->req;
333
334   if(rc->setup == RC_INITIAL) {
335     eventer_t completion;
336     struct realtime_tracker *node;
337     char c[1024];
338     int num_interests;
339
340     num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str);
341     if(num_interests == 0) {
342       noit_http_response_status_set(ctx, 404, "OK");
343       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
344       noit_http_response_end(ctx);
345       return 0;
346     }
347
348     noitL(noit_error, "http: %s %s %s\n",
349           req->method_str, req->uri_str, req->protocol_str);
350     while(noit_hash_next(&req->headers, &iter, &key, &klen, (void **)&value)) {
351       noitL(noit_error, "http: [%s: %s]\n", key, value);
352     }
353     noit_http_response_status_set(ctx, 200, "OK");
354     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
355     noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);
356     noit_http_response_header_set(ctx, "Content-Type", "text/html");
357
358     snprintf(c, sizeof(c), "<html><head><script>document.domain='omniti.com';</script></head><body>\n");
359     noit_http_response_append(ctx, c, strlen(c));
360
361     /* this dumb crap is to make some browsers happy (Safari) */
362     memset(c, ' ', sizeof(c));
363     noit_http_response_append(ctx, c, sizeof(c));
364     noit_http_response_flush(ctx, noit_false);
365
366     rc->setup = RC_REQ_RECV;
367     /* Each interest references the ctx */
368     for(node = rc->checklist; node; node = node->next) {
369       noit_atomic_inc32(&ctx->ref_cnt);
370       stratcon_datastore_push(DS_OP_FIND, NULL, node);
371       noitL(noit_error, "Resolving sid: %d\n", node->sid);
372     }
373     completion = eventer_alloc();
374     completion->mask = EVENTER_TIMER;
375     completion->callback = stratcon_realtime_http_postresolve;
376     completion->closure = ctx;
377     gettimeofday(&completion->whence, NULL);
378     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion);
379   }
380   return EVENTER_EXCEPTION;
381 }
382
383 int
384 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
385                                struct timeval *now) {
386   acceptor_closure_t *ac = closure;
387   noit_http_session_ctx *http_ctx = ac->service_ctx;
388   if(!http_ctx) {
389     http_ctx = ac->service_ctx =
390       noit_http_session_ctx_new(stratcon_request_dispatcher,
391                                 alloc_realtime_context(),
392                                 e);
393   }
394   return http_ctx->drive(e, mask, http_ctx, now);
395 }
396
397 void
398 stratcon_realtime_http_init(const char *toplevel) {
399   eventer_name_callback("stratcon_realtime_http",
400                         stratcon_realtime_http_handler);
401 }
Note: See TracBrowser for help on using the browser.