root/src/stratcon_realtime_http.c

Revision 6210da7ee0e2ed143d71a8e00b709f16e71059f8, 12.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

various changes to avoid dereferencing type-punned pointers and breaking strict-aliasing rules, refs #34

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "noit_listener.h"
14 #include "noit_http.h"
15 #include "noit_livestream_listener.h"
16 #include "stratcon_realtime_http.h"
17 #include "stratcon_jlog_streamer.h"
18 #include "stratcon_datastore.h"
19
20 #include <unistd.h>
21 #include <assert.h>
22 #include <errno.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #ifdef HAVE_SYS_FILIO_H
26 #include <sys/filio.h>
27 #endif
28 #include <netinet/in.h>
29 #include <sys/un.h>
30 #include <arpa/inet.h>
31
32
33 typedef struct realtime_recv_ctx_t {
34   int bytes_expected;
35   int bytes_read;
36   int bytes_written;
37   int body_len;
38   char *buffer;         /* These guys are for doing partial reads */
39
40   enum {
41     WANT_INITIATE = 0,
42     WANT_SEND_INTERVAL = 1,
43     WANT_SEND_UUID = 2,
44     WANT_HEADER = 3,
45     WANT_BODY = 4,
46   } state;
47   int count;            /* Number of jlog messages we need to read */
48   noit_http_session_ctx *ctx;
49   struct realtime_tracker *rt;
50 } realtime_recv_ctx_t;
51
52 typedef struct realtime_context {
53   enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup;
54   struct realtime_tracker *checklist;
55 } realtime_context;
56
57 static realtime_context *alloc_realtime_context() {
58   realtime_context *ctx;
59   return calloc(sizeof(*ctx), 1);
60 }
61 static void free_realtime_tracker(struct realtime_tracker *rt) {
62   if(rt->noit) free(rt->noit);
63   free(rt);
64 }
65 static void clear_realtime_context(realtime_context *rc) {
66   while(rc->checklist) {
67     struct realtime_tracker *tofree;
68     tofree = rc->checklist;
69     rc->checklist = tofree->next;
70     free_realtime_tracker(tofree);
71   }
72 }
73 int
74 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) {
75   char buffer[1024];
76   char *scp, *ecp, *token;
77   int len;
78
79 #define BAIL_HTTP_WRITE do { \
80   noitL(noit_error, "javascript emit failed: %s:%s:%d\n", \
81         __FILE__, __FUNCTION__, __LINE__); \
82   return -1; \
83 } while(0)
84
85 #define PROCESS_NEXT_FIELD(t,l) do { \
86   if(!*scp) goto bad_row; \
87   ecp = strchr(scp, '\t'); \
88   if(!ecp) goto bad_row; \
89   t = scp; \
90   l = (ecp-scp); \
91   scp = ecp + 1; \
92 } while(0)
93 #define PROCESS_LAST_FIELD(t,l) do { \
94   if(!*scp) ecp = scp; \
95   else { \
96     ecp = scp + strlen(scp); /* Puts us at the '\0' */ \
97     if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \
98   } \
99   t = scp; \
100   l = (ecp-scp); \
101 } while(0)
102
103   scp = buff;
104   PROCESS_NEXT_FIELD(token,len); /* Skip the leader */
105   if(buff[0] == 'M') {
106     snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('");
107     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
108
109     /* Time */
110     PROCESS_NEXT_FIELD(token,len);
111     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
112
113     snprintf(buffer, sizeof(buffer), "', '");
114     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
115
116     /* UUID */
117     PROCESS_NEXT_FIELD(token,len);
118     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
119
120     snprintf(buffer, sizeof(buffer), "', '");
121     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
122
123     /* name */
124     PROCESS_NEXT_FIELD(token,len);
125     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
126
127     snprintf(buffer, sizeof(buffer), "', '");
128     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
129
130     PROCESS_NEXT_FIELD(token,len); /* skip type */
131     PROCESS_LAST_FIELD(token,len); /* value */
132     if(noit_http_response_append(ctx, token, len) == noit_false) BAIL_HTTP_WRITE;
133
134     snprintf(buffer, sizeof(buffer), "');</script>\n");
135     if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) BAIL_HTTP_WRITE;
136
137     if(noit_http_response_flush(ctx, noit_false) == noit_false) BAIL_HTTP_WRITE;
138   }
139
140   return 0;
141
142  bad_row:
143   BAIL_HTTP_WRITE;
144   if(0) {
145     noit_http_response_end(ctx);
146     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
147     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
148     return 0;
149   }
150 }
151 int
152 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) {
153   int len, cnt = 0;
154   char *cp, *copy, *interest, *brk;
155   if(strncmp(uri, "/data/", 6)) return 0;
156   cp = uri + 6;
157   len = strlen(cp);
158   copy = alloca(len + 1);
159   if(!copy) return 0;
160   memcpy(copy, cp, len);
161   copy[len] = '\0';
162
163   for (interest = strtok_r(copy, "/", &brk);
164        interest;
165        interest = strtok_r(NULL, "/", &brk)) {
166     struct realtime_tracker *node;
167     char *interval;
168
169     interval = strchr(interest, '@');
170     if(!interval)
171       interval = "5000";
172     else
173       *interval++ = '\0';
174     node = calloc(1, sizeof(*node));
175     node->rc = rc;
176     node->sid = atoi(interest);
177     node->interval = atoi(interval);
178     node->next = rc->checklist;
179     rc->checklist = node;
180     cnt++;
181   }
182   return cnt;
183 }
184 static void
185 free_realtime_recv_ctx(void *vctx) {
186   realtime_recv_ctx_t *rrctx = vctx;
187   noit_atomic_dec32(&rrctx->ctx->ref_cnt);
188   free(rrctx);
189 }
190 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
191 static int
192 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) {
193   int len, mask;
194   while(ctx->bytes_read < ctx->bytes_expected) {
195     len = Eread(ctx->buffer + ctx->bytes_read,
196                 ctx->bytes_expected - ctx->bytes_read);
197     if(len < 0) {
198       *newmask = mask;
199       return -1;
200     }
201     /* if we get 0 inside SSL, and there was a real error, we
202      * will actually get a -1 here.
203      * if(len == 0) return ctx->bytes_read;
204      */
205     ctx->bytes_read += len;
206   }
207   assert(ctx->bytes_read == ctx->bytes_expected);
208   return ctx->bytes_read;
209 }
210 #define FULLREAD(e,ctx,size) do { \
211   int mask, len; \
212   if(!ctx->bytes_expected) { \
213     ctx->bytes_expected = size; \
214     if(ctx->buffer) free(ctx->buffer); \
215     ctx->buffer = malloc(size + 1); \
216     if(ctx->buffer == NULL) { \
217       noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \
218       goto socket_error; \
219     } \
220     ctx->buffer[size] = '\0'; \
221   } \
222   len = __read_on_ctx(e, ctx, &mask); \
223   if(len < 0) { \
224     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
225     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
226     goto socket_error; \
227   } \
228   ctx->bytes_read = 0; \
229   ctx->bytes_expected = 0; \
230   if(len != size) { \
231     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
232           ctx->state, len, (unsigned long)size); \
233     goto socket_error; \
234   } \
235 } while(0)
236
237 int
238 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure,
239                                struct timeval *now) {
240   static u_int32_t livestream_cmd = 0;
241   noit_connection_ctx_t *nctx = closure;
242   realtime_recv_ctx_t *ctx = nctx->consumer_ctx;
243   int len;
244   u_int32_t nint;
245   char uuid_str[37];
246
247   if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED);
248
249   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
250  socket_error:
251     ctx->state = WANT_INITIATE;
252     ctx->count = 0;
253     ctx->bytes_read = 0;
254     ctx->bytes_written = 0;
255     ctx->bytes_expected = 0;
256     if(ctx->buffer) free(ctx->buffer);
257     ctx->buffer = NULL;
258     free_realtime_recv_ctx(ctx);
259     eventer_remove_fd(e->fd);
260     e->opset->close(e->fd, &mask, e);
261     return 0;
262   }
263
264 #define full_nb_write(data, wlen) do { \
265   if(!ctx->bytes_expected) { \
266     ctx->bytes_written = 0; \
267     ctx->bytes_expected = wlen; \
268   } \
269   while(ctx->bytes_written < ctx->bytes_expected) { \
270     while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \
271                                        ctx->bytes_expected - ctx->bytes_written, \
272                                        &mask, e)) && errno == EINTR); \
273     if(len < 0) { \
274       if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
275       goto socket_error; \
276     } \
277     ctx->bytes_written += len; \
278   } \
279   if(ctx->bytes_written != ctx->bytes_expected) { \
280     noitL(noit_error, "short write on initiating stream [%d != %d].\n", \
281           ctx->bytes_written, ctx->bytes_expected); \
282     goto socket_error; \
283   } \
284   ctx->bytes_expected = 0; \
285 } while(0)
286
287   while(1) {
288     u_int32_t net_body_len;
289
290     switch(ctx->state) {
291       case WANT_INITIATE:
292         full_nb_write(&livestream_cmd, sizeof(livestream_cmd));
293         ctx->state = WANT_SEND_INTERVAL;
294       case WANT_SEND_INTERVAL:
295         nint = htonl(ctx->rt->interval);
296         full_nb_write(&nint, sizeof(nint));
297         ctx->state = WANT_SEND_UUID;
298       case WANT_SEND_UUID:
299         uuid_unparse_lower(ctx->rt->checkid, uuid_str);
300         full_nb_write(uuid_str, 36);
301         ctx->state = WANT_HEADER;
302       case WANT_HEADER:
303         FULLREAD(e, ctx, sizeof(u_int32_t));
304         memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t));
305         ctx->body_len = ntohl(net_body_len);
306         free(ctx->buffer); ctx->buffer = NULL;
307         ctx->state = WANT_BODY;
308         break;
309       case WANT_BODY:
310         FULLREAD(e, ctx, ctx->body_len);
311         if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error;
312         free(ctx->buffer); ctx->buffer = NULL;
313         ctx->state = WANT_HEADER;
314         break;
315     }
316   }
317
318 }
319
320 int
321 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure,
322                                    struct timeval *now) {
323   noit_http_session_ctx *ctx = closure;
324   realtime_context *rc = ctx->dispatcher_closure;
325   struct realtime_tracker *node;
326
327   for(node = rc->checklist; node; node = node->next) {
328     if(node->noit) {
329       realtime_recv_ctx_t *rrctx;
330       rrctx = calloc(1, sizeof(*rrctx));
331       rrctx->ctx = ctx;
332       rrctx->rt = node;
333       stratcon_streamer_connection(NULL, node->noit,
334                                    stratcon_realtime_recv_handler,
335                                    NULL, rrctx,
336                                    free_realtime_recv_ctx);
337     }
338     else
339       noit_atomic_dec32(&ctx->ref_cnt);
340   }
341   if(ctx->ref_cnt == 1) {
342     noit_http_response_end(ctx);
343     clear_realtime_context(rc);
344     memset(ctx->dispatcher_closure, 0, sizeof(realtime_context));
345     if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE);
346   }
347   return 0;
348 }
349 int
350 stratcon_request_dispatcher(noit_http_session_ctx *ctx) {
351   const char *key, *value;
352   realtime_context *rc = ctx->dispatcher_closure;
353   int klen;
354   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
355   noit_http_request *req = &ctx->req;
356
357   if(rc->setup == RC_INITIAL) {
358     eventer_t completion;
359     struct realtime_tracker *node;
360     char c[1024];
361     int num_interests;
362
363     num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str);
364     if(num_interests == 0) {
365       noit_http_response_status_set(ctx, 404, "OK");
366       noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE);
367       noit_http_response_end(ctx);
368       return 0;
369     }
370
371     noitL(noit_error, "http: %s %s %s\n",
372           req->method_str, req->uri_str, req->protocol_str);
373     while(noit_hash_next_str(&req->headers, &iter, &key, &klen, &value)) {
374       noitL(noit_error, "http: [%s: %s]\n", key, value);
375     }
376     noit_http_response_status_set(ctx, 200, "OK");
377     noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED);
378     /*noit_http_response_option_set(ctx, NOIT_HTTP_GZIP);*/
379     /*noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE);*/
380     noit_http_response_header_set(ctx, "Content-Type", "text/html");
381
382     snprintf(c, sizeof(c), "<html><head><script>document.domain='omniti.com';</script></head><body>\n");
383     noit_http_response_append(ctx, c, strlen(c));
384
385     /* this dumb crap is to make some browsers happy (Safari) */
386     memset(c, ' ', sizeof(c));
387     noit_http_response_append(ctx, c, sizeof(c));
388     noit_http_response_flush(ctx, noit_false);
389
390     rc->setup = RC_REQ_RECV;
391     /* Each interest references the ctx */
392     for(node = rc->checklist; node; node = node->next) {
393       noit_atomic_inc32(&ctx->ref_cnt);
394       stratcon_datastore_push(DS_OP_FIND, NULL, node);
395       noitL(noit_error, "Resolving sid: %d\n", node->sid);
396     }
397     completion = eventer_alloc();
398     completion->mask = EVENTER_TIMER;
399     completion->callback = stratcon_realtime_http_postresolve;
400     completion->closure = ctx;
401     gettimeofday(&completion->whence, NULL);
402     stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion);
403   }
404   return EVENTER_EXCEPTION;
405 }
406
407 int
408 stratcon_realtime_http_handler(eventer_t e, int mask, void *closure,
409                                struct timeval *now) {
410   acceptor_closure_t *ac = closure;
411   noit_http_session_ctx *http_ctx = ac->service_ctx;
412   if(!http_ctx) {
413     http_ctx = ac->service_ctx =
414       noit_http_session_ctx_new(stratcon_request_dispatcher,
415                                 alloc_realtime_context(),
416                                 e);
417   }
418   return http_ctx->drive(e, mask, http_ctx, now);
419 }
420
421 void
422 stratcon_realtime_http_init(const char *toplevel) {
423   eventer_name_callback("stratcon_realtime_http",
424                         stratcon_realtime_http_handler);
425 }
Note: See TracBrowser for help on using the browser.