root/src/stratcon_realtime_http.c

Revision be05f9f62e0da14f7ee9e4be1482b2698b400ac4, 12.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

make http flush fail when things go south, refs #71

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "noit_listener.h"
14 #include "noit_http.h"
15 #include "noit_livestream_listener.h"
16 #include "stratcon_realtime_http.h"
17 #include "stratcon_jlog_streamer.h"
18 #include "stratcon_datastore.h"
19
20 #include <unistd.h>
21 #include <assert.h>
22 #include <errno.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #ifdef HAVE_SYS_FILIO_H
26 #include <sys/filio.h>
27 #endif
28 #include <netinet/in.h>
29 #include <sys/un.h>
30 #include <arpa/inet.h>
31
32
33 typedef struct realtime_recv_ctx_t {
34   int bytes_expected;
35   int bytes_read;
36   int bytes_written;
37   int body_len;
38   char *buffer;         /* These guys are for doing partial reads */
39
40   enum {
41     WANT_INITIATE = 0,
42     WANT_SEND_INTERVAL = 1,
43     WANT_SEND_UUID = 2,
44     WANT_HEADER = 3,
45     WANT_BODY = 4,
46   } state;
47   int count;            /* Number of jlog messages we need to read */
48   noit_http_session_ctx *ctx;
49   struct realtime_tracker *rt;
50 } realtime_recv_ctx_t;
51
52 typedef struct realtime_context {
53   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
54   struct realtime_tracker *checklist;
55 } realtime_context;
56
57 static realtime_context *alloc_realtime_context() {
58   realtime_context *ctx;
59   return calloc(sizeof(*ctx), 1);
60 }
61 static void free_realtime_tracker(struct realtime_tracker *rt) {
62   if(rt->noit) free(rt->noit);
63   free(rt);
64 }
65 static void clear_realtime_context(realtime_context *rc) {
66   while(rc->checklist) {
67     struct realtime_tracker *tofree;
68     tofree = rc->checklist;
69     rc->checklist = tofree->next;
70     free_realtime_tracker(tofree);
71   }
72 }
73 int
74 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) {
75   char buffer[1024];
76   char *scp, *ecp, *token;
77   int len;
78
79 #define BAIL_HTTP_WRITE do { \
80   noitL(noit_error, "javascript emit failed: %s:%s:%d\n", \
81         __FILE__, __FUNCTION__, __LINE__); \
82   return -1; \
83 } while(0)
84
85 #define PROCESS_NEXT_FIELD(t,l) do { \
86   if(!*scp) goto bad_row; \
87   ecp = strchr(scp, '\t'); \
88   if(!ecp) goto bad_row; \
89   t = scp; \
90   l = (ecp-scp); \
91   scp = ecp + 1; \
92 } while(0)
93 #define PROCESS_LAST_FIELD(t,l) do { \
94   if(!*scp) ecp = scp; \
95   else { \
96     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
97     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
98   } \
99   t = scp; \
100   l = (ecp-scp); \
101 } while(0)
102
103   scp = buff;
104   PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
105   if(buff[0] == 'M') {
106     snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('");
107     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
108
109     /* Time */
110     PROCESS_NEXT_FIELD(token,len);
111     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
112
113     snprintf(buffer, sizeof(buffer), "', '");
114     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
115
116     /* UUID */
117     PROCESS_NEXT_FIELD(token,len);
118     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
119
120     snprintf(buffer, sizeof(buffer), "', '");
121     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
122
123     /* name */
124     PROCESS_NEXT_FIELD(token,len);
125     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
126
127     snprintf(buffer, sizeof(buffer), "', '");
128     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
129
130     PROCESS_NEXT_FIELD(token,len); /* skip type */
131     PROCESS_LAST_FIELD(token,len); /* value */
132     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
133
134     snprintf(buffer, sizeof(buffer), "');</script>\n");
135     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
136
137     if(noit_http_response_flush(ctx, noit_false) == noit_false) BAIL_HTTP_WRITE;
138   }
139
140   return 0;
141
142  bad_row:
143   BAIL_HTTP_WRITE;
144   if(0) {
145     noit_http_response_end(ctx);
146     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
147     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
148     return 0;
149   }
150 }
151 int
152 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) {
153   int len, cnt = 0;
154   char *cp, *copy, *interest, *brk;
155   if(strncmp(uri, "/data/", 6)) return 0;
156   cp = uri + 6;
157   len = strlen(cp);
158   copy = alloca(len + 1);
159   if(!copy) return 0;
160   memcpy(copy, cp, len);
161   copy[len] = '\0';
162
163   for (interest = strtok_r(copy, "/", &brk);
164        interest;
165        interest = strtok_r(NULL, "/", &brk)) {
166     struct realtime_tracker *node;
167     char *interval;
168
169     interval = strchr(interest, '@');
170     if(!interval) return 0;
171     *interval++ = '\0';
172     node = calloc(1, sizeof(*node));
173     node->rc = rc;
174     node->sid = atoi(interest);
175     node->interval = atoi(interval);
176     node->next = rc->checklist;
177     rc->checklist = node;
178     cnt++;
179   }
180   return cnt;
181 }
182 static void
183 free_realtime_recv_ctx(void *vctx) {
184   realtime_recv_ctx_t *rrctx = vctx;
185   noit_atomic_dec32(&rrctx->ctx->ref_cnt);
186   free(rrctx);
187 }
188 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
189 static int
190 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
191   int len, mask;
192   while(ctx->bytes_read < ctx->bytes_expected) {
193     len = Eread(ctx->buffer + ctx->bytes_read,
194                 ctx->bytes_expected - ctx->bytes_read);
195     if(len < 0) {
196       *newmask = mask;
197       return -1;
198     }
199     /* if we get 0 inside SSL, and there was a real error, we
200      * will actually get a -1 here.
201      * if(len == 0) return ctx->bytes_read;
202      */
203     ctx->bytes_read += len;
204   }
205   assert(ctx->bytes_read == ctx->bytes_expected);
206   return ctx->bytes_read;
207 }
208 #define FULLREAD(e,ctx,size) do { \
209   int mask, len; \
210   if(!ctx->bytes_expected) { \
211     ctx->bytes_expected = size; \
212     if(ctx->buffer) free(ctx->buffer); \
213     ctx->buffer = malloc(size + 1); \
214     if(ctx->buffer == NULL) { \
215       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
216       goto socket_error; \
217     } \
218     ctx->buffer[size] = '\0'; \
219   } \
220   len = __read_on_ctx(e, ctx, &mask); \
221   if(len < 0) { \
222     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
223     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
224     goto socket_error; \
225   } \
226   ctx->bytes_read = 0; \
227   ctx->bytes_expected = 0; \
228   if(len != size) { \
229     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
230           ctx->state, len, (unsigned long)size); \
231     goto socket_error; \
232   } \
233 } while(0)
234
235 int
236 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
237                                struct timeval *now) {
238   static u_int32_t livestream_cmd = 0;
239   noit_connection_ctx_t *nctx = closure;
240   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
241   int len;
242   u_int32_t nint;
243   char uuid_str[37];
244
245   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
246
247   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
248  socket_error:
249     ctx->state = WANT_INITIATE;
250     ctx->count = 0;
251     ctx->bytes_read = 0;
252     ctx->bytes_written = 0;
253     ctx->bytes_expected = 0;
254     if(ctx->buffer) free(ctx->buffer);
255     ctx->buffer = NULL;
256     free_realtime_recv_ctx(ctx);
257     eventer_remove_fd(e->fd);
258     e->opset->close(e->fd, &mask, e);
259     return 0;
260   }
261
262 #define full_nb_write(data, wlen) do { \
263   if(!ctx->bytes_expected) { \
264     ctx->bytes_written = 0; \
265     ctx->bytes_expected = wlen; \
266   } \
267   while(ctx->bytes_written < ctx->bytes_expected) { \
268     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
269                                        ctx->bytes_expected - ctx->bytes_written, \
270                                        &mask, e)) && errno == EINTR); \
271     if(len < 0) { \
272       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
273       goto socket_error; \
274     } \
275     ctx->bytes_written += len; \
276   } \
277   if(ctx->bytes_written != ctx->bytes_expected) { \
278     noitL(noit_error, "short write on initiating stream [%d != %d].\n", \
279           ctx->bytes_written, ctx->bytes_expected); \
280     goto socket_error; \
281   } \
282   ctx->bytes_expected = 0; \
283 } while(0)
284
285   while(1) {
286     u_int32_t net_body_len;
287
288     switch(ctx->state) {
289       case WANT_INITIATE:
290         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
291         ctx->state = WANT_SEND_INTERVAL;
292       case WANT_SEND_INTERVAL:
293         nint = htonl(ctx->rt->interval);
294         full_nb_write(&nint, sizeof(nint));
295         ctx->state = WANT_SEND_UUID;
296       case WANT_SEND_UUID:
297         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
298         full_nb_write(uuid_str, 36);
299         ctx->state = WANT_HEADER;
300       case WANT_HEADER:
301         FULLREAD(e, ctx, sizeof(u_int32_t));
302         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
303         ctx->body_len = ntohl(net_body_len);
304         free(ctx->buffer); ctx->buffer = NULL;
305         ctx->state = WANT_BODY;
306         break;
307       case WANT_BODY:
308         FULLREAD(e, ctx, ctx->body_len);
309         noitL(noit_error, "Read: '%s'\n", ctx->buffer);
310         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error;
311         free(ctx->buffer); ctx->buffer = NULL;
312         ctx->state = WANT_HEADER;
313         break;
314     }
315   }
316
317 }
318
319 int
320 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
321                                    struct timeval *now) {
322   noit_http_session_ctx *ctx = closure;
323   realtime_context *rc = ctx->dispatcher_closure;
324   struct realtime_tracker *node;
325
326   for(node = rc->checklist; node; node = node->next) {
327     if(node->noit) {
328       realtime_recv_ctx_t *rrctx;
329       rrctx = calloc(1, sizeof(*rrctx));
330       rrctx->ctx = ctx;
331       rrctx->rt = node;
332       stratcon_streamer_connection(NULL, node->noit,
333                                    stratcon_realtime_recv_handler,
334                                    NULL, rrctx,
335                                    free_realtime_recv_ctx);
336     }
337     else
338       noit_atomic_dec32(&ctx->ref_cnt);
339   }
340   if(ctx->ref_cnt == 1) {
341     noit_http_response_end(ctx);
342     clear_realtime_context(rc);
343     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
344     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
345   }
346   return 0;
347 }
348 int
349 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
350   const char *key, *value;
351   realtime_context *rc = ctx->dispatcher_closure;
352   int klen;
353   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
354   noit_http_request *req = &ctx->req;
355
356   if(rc->setup == RC_INITIAL) {
357     eventer_t completion;
358     struct realtime_tracker *node;
359     char c[1024];
360     int num_interests;
361
362     num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str);
363     if(num_interests == 0) {
364       noit_http_response_status_set(ctx, 404, "OK");
365       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
366       noit_http_response_end(ctx);
367       return 0;
368     }
369
370     noitL(noit_error, "http: %s %s %s\n",
371           req->method_str, req->uri_str, req->protocol_str);
372     while(noit_hash_next(&req->headers, &iter, &key, &klen, (void **)&value)) {
373       noitL(noit_error, "http: [%s: %s]\n", key, value);
374     }
375     noit_http_response_status_set(ctx, 200, "OK");
376     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
377     noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);
378     noit_http_response_header_set(ctx, "Content-Type", "text/html");
379
380     snprintf(c, sizeof(c), "<html><head><script>document.domain='omniti.com';</script></head><body>\n");
381     noit_http_response_append(ctx, c, strlen(c));
382
383     /* this dumb crap is to make some browsers happy (Safari) */
384     memset(c, ' ', sizeof(c));
385     noit_http_response_append(ctx, c, sizeof(c));
386     noit_http_response_flush(ctx, noit_false);
387
388     rc->setup = RC_REQ_RECV;
389     /* Each interest references the ctx */
390     for(node = rc->checklist; node; node = node->next) {
391       noit_atomic_inc32(&ctx->ref_cnt);
392       stratcon_datastore_push(DS_OP_FIND, NULL, node);
393       noitL(noit_error, "Resolving sid: %d\n", node->sid);
394     }
395     completion = eventer_alloc();
396     completion->mask = EVENTER_TIMER;
397     completion->callback = stratcon_realtime_http_postresolve;
398     completion->closure = ctx;
399     gettimeofday(&completion->whence, NULL);
400     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion);
401   }
402   return EVENTER_EXCEPTION;
403 }
404
405 int
406 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
407                                struct timeval *now) {
408   acceptor_closure_t *ac = closure;
409   noit_http_session_ctx *http_ctx = ac->service_ctx;
410   if(!http_ctx) {
411     http_ctx = ac->service_ctx =
412       noit_http_session_ctx_new(stratcon_request_dispatcher,
413                                 alloc_realtime_context(),
414                                 e);
415   }
416   return http_ctx->drive(e, mask, http_ctx, now);
417 }
418
419 void
420 stratcon_realtime_http_init(const char *toplevel) {
421   eventer_name_callback("stratcon_realtime_http",
422                         stratcon_realtime_http_handler);
423 }
Note: See TracBrowser for help on using the browser.