root/src/modules/http.c

Revision e3c8f105af17e80c10e566366bb11a9f2a510c20, 16.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 11 years ago)

oh my... http

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <math.h>
13
14 #include "noit_module.h"
15 #include "noit_poller.h"
16 #include "utils/noit_log.h"
17
18 #include <apr_uri.h>
19 #include <apr_atomic.h>
20 #include <apr_strings.h>
21 #include "serf.h"
22
23 #define NOIT_HTTP_VERSION_STRING "0.1"
24
25 typedef struct {
26   int using_ssl;
27   serf_ssl_context_t *ssl_ctx;
28   serf_bucket_alloc_t *bkt_alloc;
29 } app_baton_t;
30
31 typedef struct {
32   serf_response_acceptor_t acceptor;
33   app_baton_t *acceptor_baton;
34
35   serf_response_handler_t handler;
36   const char *host;
37   const char *method;
38   const char *path;
39   const char *authn;
40
41   noit_module_t *self;
42   noit_check_t check;
43 } handler_baton_t;
44
45 typedef struct buf_t {
46   char *b;
47   int l;
48 } buf_t;
49
50 typedef struct {
51   apr_pool_t *pool;
52   apr_sockaddr_t *address;
53   serf_context_t *context;
54   serf_connection_t *connection;
55   serf_request_t *request;
56   app_baton_t app_ctx;
57   handler_baton_t handler_ctx;
58   apr_uri_t url;
59   int timed_out;
60
61   serf_status_line status;
62   buf_t headers;
63   buf_t body;
64
65   struct timeval finish_time;
66   eventer_t fd_event;
67   eventer_t timeout_event;
68 } check_info_t;
69
70 typedef struct {
71   noit_module_t *self;
72   noit_check_t check;
73   void *serf_baton;
74   apr_socket_t *skt;
75 } serf_closure_t;
76
77 static noit_log_stream_t nlerr = NULL;
78 static noit_log_stream_t nldeb = NULL;
79 static int serf_handler(eventer_t e, int mask, void *closure,
80                         struct timeval *now);
81 static int serf_recur_handler(eventer_t e, int mask, void *closure,
82                               struct timeval *now);
83
84 static int serf_config(noit_module_t *self, noit_hash_table *options) {
85   return 0;
86 }
87 static void serf_log_results(noit_module_t *self, noit_check_t check) {
88   check_info_t *ci = check->closure;
89   struct timeval duration;
90   stats_t current;
91   char human_buffer[256];
92   char code[4];
93   char rt[14];
94
95   sub_timeval(ci->finish_time, check->last_fire_time, &duration);
96
97   snprintf(code, sizeof(code), "%3d", ci->status.code);
98   snprintf(rt, sizeof(rt), "%.3fms",
99            (float)duration.tv_sec + (float)duration.tv_usec / 1000000.0);
100   snprintf(human_buffer, sizeof(human_buffer),
101            "code=%s,rt=%s,bytes=%d",
102            ci->status.code ? code : "undefined",
103            ci->timed_out ? "timeout" : rt,
104            ci->body.l);
105   noit_log(nldeb, NULL, "http(%s) [%s]\n", check->target, human_buffer);
106
107   current.duration = duration.tv_sec * 1000 + duration.tv_usec / 1000;
108   current.available = (ci->timed_out || !ci->status.code) ? NP_UNAVAILABLE : NP_AVAILABLE;
109   current.state = (ci->status.code != 200) ? NP_BAD : NP_GOOD;
110   current.status = human_buffer;
111   noit_poller_set_state(check, &current);
112 }
113 static int serf_complete(eventer_t e, int mask,
114                          void *closure, struct timeval *now) {
115   serf_closure_t *ccl = (serf_closure_t *)closure;
116   check_info_t *ci = (check_info_t *)ccl->check->closure;
117
118   noit_log(nldeb, now, "serf_complete(%s)\n", ccl->check->target);
119   serf_log_results(ccl->self, ccl->check);
120   if(ci->connection) {
121     serf_connection_close(ci->connection);
122     ci->connection = NULL;
123   }
124   if(ci->fd_event) {
125     eventer_remove_fd(ci->fd_event->fd);
126     eventer_free(ci->fd_event);
127     ci->fd_event = NULL;
128   }
129   ci->timeout_event = NULL;
130   apr_pool_destroy(ci->pool);
131   memset(ci, 0, sizeof(*ci));
132   ccl->check->flags &= ~NP_RUNNING;
133   free(ccl);
134   return 0;
135 }
136
137 static int serf_handler(eventer_t e, int mask,
138                         void *closure, struct timeval *now) {
139   apr_pollfd_t desc = { 0 };
140   serf_closure_t *sct = closure;
141   check_info_t *ci = sct->check->closure;
142
143   desc.desc_type = APR_POLL_SOCKET;
144   desc.desc.s = sct->skt;
145
146   desc.rtnevents = 0;
147   if(mask & EVENTER_READ) desc.rtnevents |= APR_POLLIN;
148   if(mask & EVENTER_WRITE) desc.rtnevents |= APR_POLLOUT;
149   if(mask & EVENTER_EXCEPTION) desc.rtnevents |= APR_POLLERR;
150   serf_event_trigger(ci->context, sct->serf_baton, &desc);
151   serf_context_prerun(ci->context);
152   if(e->mask == 0) {
153     eventer_remove_fd(e->fd);
154     eventer_free(e);
155   }
156   return e->mask;
157 }
158
159 static int serf_init(noit_module_t *self) {
160   return 0;
161 }
162 static void closed_connection(serf_connection_t *conn,
163                               void *closed_baton,
164                               apr_status_t why,
165                               apr_pool_t *pool) {
166 }
167
168 static serf_bucket_t* conn_setup(apr_socket_t *skt,
169                                 void *setup_baton,
170                                 apr_pool_t *pool) {
171   serf_bucket_t *c;
172   app_baton_t *ctx = setup_baton;
173
174   c = serf_bucket_socket_create(skt, ctx->bkt_alloc);
175   if (ctx->using_ssl) {
176       c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
177       if (!ctx->ssl_ctx) {
178           ctx->ssl_ctx = serf_bucket_ssl_decrypt_context_get(c);
179       }
180   }
181
182   return c;
183 }
184
185 static serf_bucket_t* accept_response(serf_request_t *request,
186                                       serf_bucket_t *stream,
187                                       void *acceptor_baton,
188                                       apr_pool_t *pool) {
189   serf_bucket_t *c;
190   serf_bucket_alloc_t *bkt_alloc;
191
192   /* get the per-request bucket allocator */
193   bkt_alloc = serf_request_get_alloc(request);
194
195   /* Create a barrier so the response doesn't eat us! */
196   c = serf_bucket_barrier_create(stream, bkt_alloc);
197
198   return serf_bucket_response_create(c, bkt_alloc);
199 }
200
201 static void append_buf(apr_pool_t *p, buf_t *b,
202                        const char *data, int len) {
203   char *n;
204   n = apr_palloc(p, b->l + len + 1);
205   if(b->l == 0)
206     b->b = n;
207   else {
208     memcpy(b->b, n, b->l);
209     b->b = n;
210   }
211   memcpy(b->b + b->l, data, len);
212   b->l += len;
213   b->b[b->l] = '\0';
214 }
215
216 static apr_status_t handle_response(serf_request_t *request,
217                                     serf_bucket_t *response,
218                                     void *handler_baton,
219                                     apr_pool_t *pool) {
220   const char *data;
221   apr_size_t len;
222   apr_status_t status;
223   handler_baton_t *ctx = handler_baton;
224   check_info_t *ci = ctx->check->closure;
225
226   if(response == NULL) {
227     /* We were cancelled. */
228     goto finish;
229   }
230   status = serf_bucket_response_status(response, &ci->status);
231   if (status) {
232     if (APR_STATUS_IS_EAGAIN(status)) {
233       return status;
234     }
235     goto finish;
236   }
237
238   while (1) {
239     status = serf_bucket_read(response, 1024*32, &data, &len);
240     if (SERF_BUCKET_READ_ERROR(status))
241       return status;
242
243     append_buf(ci->pool, &ci->body, data, len);
244
245     /* are we done yet? */
246     if (APR_STATUS_IS_EOF(status)) {
247       serf_bucket_t *hdrs;
248       hdrs = serf_bucket_response_get_headers(response);
249       while (1) {
250         status = serf_bucket_read(hdrs, 2048, &data, &len);
251         if (SERF_BUCKET_READ_ERROR(status))
252           return status;
253
254         append_buf(ci->pool, &ci->headers, data, len);
255         if (APR_STATUS_IS_EOF(status)) {
256           break;
257         }
258       }
259
260       goto finish;
261     }
262
263     /* have we drained the response so far? */
264     if (APR_STATUS_IS_EAGAIN(status))
265       return status;
266
267     /* loop to read some more. */
268   }
269  finish:
270   gettimeofday(&ci->finish_time, NULL);
271   if(ci->timeout_event) {
272     eventer_remove(ci->timeout_event);
273     ci->timed_out = 0;
274     memcpy(&ci->timeout_event->whence, &ci->finish_time,
275            sizeof(&ci->finish_time));
276     eventer_add(ci->timeout_event);
277   }
278   return APR_EOF;
279 }
280
281 static apr_status_t setup_request(serf_request_t *request,
282                                   void *setup_baton,
283                                   serf_bucket_t **req_bkt,
284                                   serf_response_acceptor_t *acceptor,                                             void **acceptor_baton,
285                                   serf_response_handler_t *handler,
286                                   void **handler_baton,
287                                   apr_pool_t *pool) {
288   handler_baton_t *ctx = setup_baton;
289   serf_bucket_t *hdrs_bkt;
290   serf_bucket_t *body_bkt;
291
292   body_bkt = NULL;
293
294   *req_bkt = serf_bucket_request_create(ctx->method, ctx->path, body_bkt,
295                                         serf_request_get_alloc(request));
296
297   hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
298
299   serf_bucket_headers_setn(hdrs_bkt, "Host", ctx->host);
300   serf_bucket_headers_setn(hdrs_bkt, "User-Agent",
301                            "Noit/" NOIT_HTTP_VERSION_STRING);
302   /* Shouldn't serf do this for us? */
303   serf_bucket_headers_setn(hdrs_bkt, "Accept-Encoding", "gzip");
304
305   if (ctx->authn != NULL) {
306     serf_bucket_headers_setn(hdrs_bkt, "Authorization", ctx->authn);
307   }
308
309   if (ctx->acceptor_baton->using_ssl) {
310     serf_bucket_alloc_t *req_alloc;
311     app_baton_t *app_ctx = ctx->acceptor_baton;
312
313     req_alloc = serf_request_get_alloc(request);
314
315     if (app_ctx->ssl_ctx == NULL) {
316       *req_bkt =
317         serf_bucket_ssl_encrypt_create(*req_bkt, NULL,
318                                        app_ctx->bkt_alloc);
319       app_ctx->ssl_ctx =
320         serf_bucket_ssl_encrypt_context_get(*req_bkt);
321     }
322     else {
323       *req_bkt =
324         serf_bucket_ssl_encrypt_create(*req_bkt, app_ctx->ssl_ctx,
325                                        app_ctx->bkt_alloc);
326     }
327   }
328
329   *acceptor = ctx->acceptor;
330   *acceptor_baton = ctx->acceptor_baton;
331   *handler = ctx->handler;
332   *handler_baton = ctx;
333
334   return APR_SUCCESS;
335 }
336
337 struct __unix_apr_socket_t {
338   apr_pool_t *pool;
339   int socketdes;
340 };
341
342 static apr_status_t serf_eventer_add(void *user_baton,
343                                      apr_pollfd_t *pfd,
344                                      void *serf_baton) {
345   eventer_t e, newe = NULL;
346   serf_closure_t *sct = user_baton, *newsct;
347   assert(pfd->desc_type == APR_POLL_SOCKET);
348   struct __unix_apr_socket_t *hack = (struct __unix_apr_socket_t *)pfd->desc.s;
349
350   noit_log(nldeb, NULL, "serf_eventer_add() => %d\n",
351            hack->socketdes);
352   e = eventer_find_fd(hack->socketdes);
353   if(!e) {
354     newe = e = eventer_alloc();
355     e->fd = hack->socketdes;
356     e->callback = serf_handler;
357     e->closure = calloc(1, sizeof(serf_closure_t));
358   }
359   newsct = e->closure;
360   newsct->self = sct->self;
361   newsct->check = sct->check;
362   newsct->serf_baton = serf_baton;
363   newsct->skt = pfd->desc.s;
364   e->mask = 0;
365   if(pfd->reqevents & APR_POLLIN) e->mask |= EVENTER_READ;
366   if(pfd->reqevents & APR_POLLOUT) e->mask |= EVENTER_WRITE;
367   if(pfd->reqevents & APR_POLLERR) e->mask |= EVENTER_EXCEPTION;
368   if(newe) {
369     check_info_t *ci = sct->check->closure;
370     eventer_add(newe);
371     ci->fd_event = newe;
372   }
373 /* ** Unneeded as this is called recursively **
374   else
375     eventer_update(e);
376 */
377   return APR_SUCCESS;
378 }
379 static apr_status_t serf_eventer_remove(void *user_baton,
380                                         apr_pollfd_t *pfd,
381                                         void *serf_baton) {
382   serf_closure_t *sct = user_baton;
383   check_info_t *ci;
384   eventer_t e;
385
386   ci = sct->check->closure;
387   assert(pfd->desc_type == APR_POLL_SOCKET);
388   struct __unix_apr_socket_t *hack = (struct __unix_apr_socket_t *)pfd->desc.s;
389
390   noit_log(nldeb, NULL, "serf_eventer_remove() => %d\n",
391            hack->socketdes);
392   e = eventer_find_fd(hack->socketdes);
393   if(e) e->mask = 0;
394   return 0;
395 }
396
397 static int serf_initiate(noit_module_t *self, noit_check_t check) {
398   serf_closure_t *ccl;
399   check_info_t *ci;
400   struct timeval when, p_int;
401   apr_status_t status;
402   eventer_t newe;
403
404   ci = (check_info_t *)check->closure;
405   /* We cannot be running */
406   assert(!(check->flags & NP_RUNNING));
407   check->flags |= NP_RUNNING;
408   noit_log(nldeb, NULL, "serf_initiate(%p,%s)\n",
409            self, check->target);
410
411   /* remove a timeout if we still have one -- we should unless someone
412    * has set a lower timeout than the period.
413    */
414   ci->timed_out = 1;
415   if(ci->timeout_event) {
416     eventer_remove(ci->timeout_event);
417     free(ci->timeout_event->closure);
418     eventer_free(ci->timeout_event);
419     ci->timeout_event = NULL;
420   }
421   assert(!ci->pool);
422   apr_pool_create(&ci->pool, NULL);
423   apr_atomic_init(ci->pool);
424
425   gettimeofday(&when, NULL);
426   memcpy(&check->last_fire_time, &when, sizeof(when));
427
428   ccl = apr_pcalloc(ci->pool, sizeof(*ccl));
429   ccl->self = self;
430   ccl->check = check;
431
432   apr_uri_parse(ci->pool, "http://localhost/", &ci->url);
433   if (!ci->url.port) {
434     ci->url.port = apr_uri_port_of_scheme(ci->url.scheme);
435   }
436   if (!ci->url.path) {
437     ci->url.path = "/";
438   }
439
440   if (strcasecmp(ci->url.scheme, "https") == 0) {
441     ci->app_ctx.using_ssl = 1;
442   }
443   else {
444     ci->app_ctx.using_ssl = 0;
445   }
446
447   status = apr_sockaddr_info_get(&ci->address,
448                                  check->target, APR_UNSPEC, ci->url.port, 0,
449                                  ci->pool);
450   if (status) {
451     /* Handle error -- log failure */
452     apr_pool_destroy(ci->pool);
453     memset(ci, 0, sizeof(*ci));
454     check->flags &= ~NP_RUNNING;
455     return 0;
456   }
457
458   ci->context = serf_context_create_ex(ccl, serf_eventer_add,
459                                        serf_eventer_remove, ci->pool);
460
461   ci->app_ctx.bkt_alloc = serf_bucket_allocator_create(ci->pool, NULL, NULL);
462   ci->app_ctx.ssl_ctx = NULL;
463
464   ci->connection = serf_connection_create(ci->context, ci->address,
465                                           conn_setup, &ci->app_ctx,
466                                           closed_connection, &ci->app_ctx,
467                                           ci->pool);
468
469   ci->handler_ctx.method = apr_pstrdup(ci->pool, "GET");
470   ci->handler_ctx.host = apr_pstrdup(ci->pool, check->target);
471   ci->handler_ctx.path = ci->url.path;
472   ci->handler_ctx.authn = NULL;
473
474   ci->handler_ctx.acceptor = accept_response;
475   ci->handler_ctx.acceptor_baton = &ci->app_ctx;
476   ci->handler_ctx.handler = handle_response;
477   ci->handler_ctx.self = self;
478   ci->handler_ctx.check = check;
479
480   ci->request = serf_connection_request_create(ci->connection, setup_request,
481                                                &ci->handler_ctx);
482   serf_context_prerun(ci->context);
483
484   newe = eventer_alloc();
485   newe->mask = EVENTER_TIMER;
486   gettimeofday(&when, NULL);
487   p_int.tv_sec = check->timeout / 1000;
488   p_int.tv_usec = (check->timeout % 1000) * 1000;
489   add_timeval(when, p_int, &newe->whence);
490   ccl = calloc(1, sizeof(*ccl));
491   ccl->self = self;
492   ccl->check = check;
493   newe->closure = ccl;
494   newe->callback = serf_complete;
495   eventer_add(newe);
496   ci->timeout_event = newe;
497   return 0;
498 }
499 static int serf_schedule_next(noit_module_t *self,
500                               eventer_t e, noit_check_t check,
501                               struct timeval *now) {
502   eventer_t newe;
503   struct timeval last_check = { 0L, 0L };
504   struct timeval period, earliest;
505   serf_closure_t *ccl;
506
507   /* If we have an event, we know when we intended it to fire.  This means
508    * we should schedule that point + period.
509    */
510   if(now)
511     memcpy(&earliest, now, sizeof(earliest));
512   else
513     gettimeofday(&earliest, NULL);
514   if(e) memcpy(&last_check, &e->whence, sizeof(last_check));
515   period.tv_sec = check->period / 1000;
516   period.tv_usec = (check->period % 1000) * 1000;
517
518   newe = eventer_alloc();
519   memcpy(&newe->whence, &last_check, sizeof(last_check));
520   add_timeval(newe->whence, period, &newe->whence);
521   if(compare_timeval(newe->whence, earliest) < 0)
522     memcpy(&newe->whence, &earliest, sizeof(earliest));
523   newe->mask = EVENTER_TIMER;
524   newe->callback = serf_recur_handler;
525   ccl = calloc(1, sizeof(*ccl));
526   ccl->self = self;
527   ccl->check = check;
528   newe->closure = ccl;
529
530   eventer_add(newe);
531   check->fire_event = newe;
532   return 0;
533 }
534 static int serf_recur_handler(eventer_t e, int mask, void *closure,
535                               struct timeval *now) {
536   serf_closure_t *cl = (serf_closure_t *)closure;
537   serf_schedule_next(cl->self, e, cl->check, now);
538   serf_initiate(cl->self, cl->check);
539   free(cl);
540   return 0;
541 }
542 static int serf_initiate_check(noit_module_t *self, noit_check_t check) {
543   check->closure = calloc(1, sizeof(check_info_t));
544   serf_schedule_next(self, NULL, check, NULL);
545   return 0;
546 }
547
548 static int serf_onload(noit_module_t *self) {
549   apr_initialize();
550   atexit(apr_terminate);
551
552   nlerr = noit_log_stream_find("error/serf");
553   nldeb = noit_log_stream_find("debug/serf");
554   if(!nlerr) nlerr = noit_stderr;
555   if(!nldeb) nldeb = noit_debug;
556
557   eventer_name_callback("http/serf_handler", serf_handler);
558   eventer_name_callback("http/serf_complete", serf_complete);
559   eventer_name_callback("http/serf_recur_handler", serf_recur_handler);
560   return 0;
561 }
562 noit_module_t http = {
563   NOIT_MODULE_MAGIC,
564   NOIT_MODULE_ABI_VERSION,
565   "http",
566   "libserf-based HTTP and HTTPS resource checker",
567   serf_onload,
568   serf_config,
569   serf_init,
570   serf_initiate_check
571 };
572
Note: See TracBrowser for help on using the browser.