root/src/modules/http.c

Revision a2e53dc0c9ff0d9e50eb095b926924850a6c8ff8, 26.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

implement staggered start skilz

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <math.h>
13
14 #include <libxml/parser.h>
15 #include <libxml/tree.h>
16 #include <libxml/xpath.h>
17
18 #include "noit_module.h"
19 #include "noit_check.h"
20 #include "utils/noit_log.h"
21 #include "utils/noit_hash.h"
22
23 #include <apr_uri.h>
24 #include <apr_atomic.h>
25 #include <apr_strings.h>
26 #include "serf.h"
27
28 #define NOIT_HTTP_VERSION_STRING "0.1"
29
30 typedef struct {
31   noit_hash_table *options;
32   void (*results)(noit_module_t *, noit_check_t *);
33 } serf_module_conf_t;
34
35 typedef struct {
36   int using_ssl;
37   serf_ssl_context_t *ssl_ctx;
38   serf_bucket_alloc_t *bkt_alloc;
39 } app_baton_t;
40
41 typedef struct {
42   serf_response_acceptor_t acceptor;
43   app_baton_t *acceptor_baton;
44
45   serf_response_handler_t handler;
46   const char *host;
47   const char *method;
48   const char *path;
49   const char *authn;
50
51   noit_module_t *self;
52   noit_check_t *check;
53 } handler_baton_t;
54
55 typedef struct buf_t {
56   char *b;
57   int l;
58 } buf_t;
59
60 typedef struct {
61   apr_pool_t *pool;
62   apr_sockaddr_t *address;
63   serf_context_t *context;
64   serf_connection_t *connection;
65   serf_request_t *request;
66   app_baton_t app_ctx;
67   handler_baton_t handler_ctx;
68   apr_uri_t url;
69   int timed_out;
70
71   serf_status_line status;
72   buf_t headers;
73   buf_t body;
74
75   struct timeval finish_time;
76   eventer_t fd_event;
77   eventer_t timeout_event;
78 } serf_check_info_t;
79
80 typedef struct {
81   serf_check_info_t serf;
82   struct timeval xml_doc_time;
83   char *xpathexpr;
84   xmlDocPtr xml_doc;
85   char *resmod;
86   char *resserv;
87 } resmon_check_info_t;
88
89 typedef struct {
90   noit_module_t *self;
91   noit_check_t *check;
92   void *serf_baton;
93   apr_socket_t *skt;
94 } serf_closure_t;
95
96 static noit_log_stream_t nlerr = NULL;
97 static noit_log_stream_t nldeb = NULL;
98 static int serf_handler(eventer_t e, int mask, void *closure,
99                         struct timeval *now);
100 static int serf_recur_handler(eventer_t e, int mask, void *closure,
101                               struct timeval *now);
102 static void serf_log_results(noit_module_t *self, noit_check_t *check);
103 static void resmon_log_results(noit_module_t *self, noit_check_t *check);
104 static void resmon_part_log_results(noit_module_t *self, noit_check_t *check,
105                                     noit_check_t *parent);
106
107 static int serf_config(noit_module_t *self, noit_hash_table *options) {
108   serf_module_conf_t *conf;
109   conf = calloc(1, sizeof(*conf));
110   conf->options = options;
111   conf->results = serf_log_results;
112   noit_module_set_userdata(self, conf);
113   return 0;
114 }
115 static int resmon_config(noit_module_t *self, noit_hash_table *options) {
116   serf_module_conf_t *conf;
117   conf = calloc(1, sizeof(*conf));
118   conf->options = options;
119   if(!conf->options) conf->options = calloc(1, sizeof(*conf->options));
120   noit_hash_store(conf->options, strdup("url"), strlen("url"),
121                   strdup("http://localhost:81/"));
122   conf->results = resmon_log_results;
123   noit_module_set_userdata(self, conf);
124   return 0;
125 }
126 static void generic_log_results(noit_module_t *self, noit_check_t *check) {
127   serf_module_conf_t *module_conf;
128   module_conf = noit_module_get_userdata(self);
129   module_conf->results(self, check);
130 }
131 static void serf_log_results(noit_module_t *self, noit_check_t *check) {
132   serf_check_info_t *ci = check->closure;
133   struct timeval duration;
134   stats_t current;
135   int expect_code = 200;
136   char *code_str;
137   char human_buffer[256], code[4], rt[14];
138
139   memset(&current, 0, sizeof(current));
140
141   if(noit_hash_retrieve(check->config, "code", strlen("code"),
142                         (void **)&code_str))
143     expect_code = atoi(code_str);
144
145   sub_timeval(ci->finish_time, check->last_fire_time, &duration);
146
147   snprintf(code, sizeof(code), "%3d", ci->status.code);
148   snprintf(rt, sizeof(rt), "%.3fms",
149            (float)duration.tv_sec + (float)duration.tv_usec / 1000000.0);
150   snprintf(human_buffer, sizeof(human_buffer),
151            "code=%s,rt=%s,bytes=%d",
152            ci->status.code ? code : "undefined",
153            ci->timed_out ? "timeout" : rt,
154            ci->body.l);
155   noitL(nldeb, "http(%s) [%s]\n", check->target, human_buffer);
156
157   memcpy(&current.whence, &ci->finish_time, sizeof(current.whence));
158   current.duration = duration.tv_sec * 1000 + duration.tv_usec / 1000;
159   current.available = (ci->timed_out || !ci->status.code) ? NP_UNAVAILABLE : NP_AVAILABLE;
160   current.state = (ci->status.code != 200) ? NP_BAD : NP_GOOD;
161   current.status = human_buffer;
162   if(current.available == NP_AVAILABLE) {
163     noit_stats_set_metric_int(&current, "code", &ci->status.code);
164     noit_stats_set_metric_int(&current, "bytes", &ci->body.l);
165   }
166   else {
167     noit_stats_set_metric_int(&current, "code", NULL);
168     noit_stats_set_metric_int(&current, "bytes", NULL);
169   }
170   noit_check_set_stats(self, check, &current);
171 }
172 static void resmon_part_log_results_xml(noit_module_t *self,
173                                         noit_check_t *check,
174                                         xmlDocPtr xml) {
175   resmon_check_info_t *rci = check->closure;
176   xmlXPathContextPtr xpath_ctxt = NULL;
177   stats_t current;
178
179   memset(&current, 0, sizeof(current));
180   current.available = NP_UNAVAILABLE;
181   current.state = NP_BAD;
182
183   if(xml && rci->xpathexpr) {
184     current.available = NP_AVAILABLE;
185     xpath_ctxt = xmlXPathNewContext(xml);
186     if(xpath_ctxt) {
187       xmlXPathObjectPtr pobj;
188       pobj = xmlXPathEval((xmlChar *)rci->xpathexpr, xpath_ctxt);
189       if(pobj) {
190         int i, cnt;
191         cnt = xmlXPathNodeSetGetLength(pobj->nodesetval);
192         for(i=0; i<cnt; i++) {
193           xmlNodePtr node;
194           char *value;
195           node = xmlXPathNodeSetItem(pobj->nodesetval, i);
196           value = (char *)xmlXPathCastNodeToString(node);
197           if(!strcmp((char *)node->name,"last_runtime_seconds")) {
198             float duration = atof(value) * 1000;
199             current.duration = (int) duration;
200           }
201           else if(!strcmp((char *)node->name, "message")) {
202             current.status = strdup(value);
203           }
204           else if(!strcmp((char *)node->name, "state")) {
205             current.state = strcmp(value,"OK") ? NP_BAD : NP_GOOD;
206           }
207         }
208       }
209     }
210   }
211   memcpy(&current.whence, &rci->serf.finish_time, sizeof(current.whence));
212   current.status = current.status ? current.status : strdup("unknown");
213   noitL(nldeb, "resmon_part(%s/%s/%s) [%s]\n", check->target,
214         rci->resmod, rci->resserv, current.status);
215   noit_check_set_stats(self, check, &current);
216 }
217 static void resmon_part_log_results(noit_module_t *self, noit_check_t *check,
218                                     noit_check_t *parent) {
219   resmon_check_info_t *rci = parent->closure;
220   resmon_part_log_results_xml(self, check, rci->xml_doc);
221 }
222 static void resmon_log_results(noit_module_t *self, noit_check_t *check) {
223   serf_check_info_t *ci = check->closure;
224   resmon_check_info_t *rci = check->closure;
225   struct timeval duration;
226   stats_t current;
227   int services = 0;
228   char human_buffer[256], rt[14];
229   xmlDocPtr resmon_results = NULL;
230   xmlXPathContextPtr xpath_ctxt = NULL;
231   xmlXPathObjectPtr pobj = NULL;
232
233   memset(&current, 0, sizeof(current));
234
235   if(ci->body.b) resmon_results = xmlParseMemory(ci->body.b, ci->body.l);
236   if(resmon_results) {
237     xpath_ctxt = xmlXPathNewContext(resmon_results);
238     pobj = xmlXPathEval((xmlChar *)"/ResmonResults/ResmonResult", xpath_ctxt);
239     if(pobj)
240       if(pobj->type == XPATH_NODESET)
241         services = xmlXPathNodeSetGetLength(pobj->nodesetval);
242   } else {
243     noitL(nlerr, "Error in resmon doc: %s\n", ci->body.b);
244   }
245
246   /* Save our results for future dependent checks */
247   memcpy(&current.whence, &ci->finish_time, sizeof(current.whence));
248   memcpy(&rci->xml_doc_time, &ci->finish_time, sizeof(ci->finish_time));
249   if(rci->xml_doc) xmlFreeDoc(rci->xml_doc);
250   rci->xml_doc = resmon_results;
251
252   if(rci->xpathexpr) {
253     /* This is actually a part check... we had to do all the work as
254      * it isn't being used as a causal firing from a generic resmon check
255      */
256     resmon_part_log_results_xml(self, check, rci->xml_doc);
257     goto out;
258   }
259
260   sub_timeval(ci->finish_time, check->last_fire_time, &duration);
261   snprintf(rt, sizeof(rt), "%.3fms",
262            (float)duration.tv_sec + (float)duration.tv_usec / 1000000.0);
263   snprintf(human_buffer, sizeof(human_buffer),
264            "services=%d,rt=%s",
265            services,
266            ci->timed_out ? "timeout" : rt);
267   noitL(nldeb, "resmon(%s) [%s]\n", check->target, human_buffer);
268
269   current.duration = duration.tv_sec * 1000 + duration.tv_usec / 1000;
270   current.available = (ci->timed_out || ci->status.code != 200) ?
271                           NP_UNAVAILABLE : NP_AVAILABLE;
272   current.state = services ? NP_GOOD : NP_BAD;
273   current.status = human_buffer;
274
275   noit_stats_set_metric_int(&current, "services", &services);
276   if(services) {
277     int i;
278     for(i=0; i<services; i++) {
279       xmlNodePtr node, attrnode;
280       node = xmlXPathNodeSetItem(pobj->nodesetval, i);
281       if(node) {
282         int a;
283         char *attrs[3] = { "last_runtime_seconds", "state", "message" };
284         char *resmod = NULL, *resserv = NULL, *value = NULL;
285         char attr[1024];
286         xmlXPathObjectPtr sobj;
287
288         xpath_ctxt->node = node;
289         sobj = xmlXPathEval((xmlChar *)"@module", xpath_ctxt);
290         resmod = (char *)xmlXPathCastNodeSetToString(sobj->nodesetval);
291         sobj = xmlXPathEval((xmlChar *)"@service", xpath_ctxt);
292         resserv = (char *)xmlXPathCastNodeSetToString(sobj->nodesetval);
293         if(!resmod && !resserv) continue;
294
295         for(a=0; a<3; a++) {
296           int intval;
297           sobj = xmlXPathEval((xmlChar *)attrs[a], xpath_ctxt);
298           attrnode = xmlXPathNodeSetItem(sobj->nodesetval, 0);
299           value = (char *)xmlXPathCastNodeToString(attrnode);
300           snprintf(attr, sizeof(attr), "%s`%s`%s",
301                    resmod, resserv, (char *)attrnode->name);
302           switch(a) {
303             case 0:
304               /* The first is integer */
305               intval = (int)(atof(value) * 1000.0);
306               noit_stats_set_metric_int(&current, attr, &intval);
307               break;
308             case 1:
309             case 2:
310               noit_stats_set_metric_string(&current, attr, (char *)value);
311               break;
312           }
313         }
314       }
315     }
316   }
317
318   noit_check_set_stats(self, check, &current);
319
320  out:
321   if(pobj) xmlXPathFreeObject(pobj);
322   if(xpath_ctxt) xmlXPathFreeContext(xpath_ctxt);
323 }
324 static int serf_complete(eventer_t e, int mask,
325                          void *closure, struct timeval *now) {
326   serf_closure_t *ccl = (serf_closure_t *)closure;
327   serf_check_info_t *ci = (serf_check_info_t *)ccl->check->closure;
328
329   noitLT(nldeb, now, "serf_complete(%s)\n", ccl->check->target);
330   generic_log_results(ccl->self, ccl->check);
331   if(ci->connection) {
332     serf_connection_close(ci->connection);
333     ci->connection = NULL;
334   }
335   if(ci->fd_event) {
336     eventer_remove_fd(ci->fd_event->fd);
337     eventer_free(ci->fd_event);
338     ci->fd_event = NULL;
339   }
340   ci->timeout_event = NULL;
341   apr_pool_destroy(ci->pool);
342   memset(ci, 0, sizeof(*ci));
343   ccl->check->flags &= ~NP_RUNNING;
344   free(ccl);
345   return 0;
346 }
347
348 static int serf_handler(eventer_t e, int mask,
349                         void *closure, struct timeval *now) {
350   apr_pollfd_t desc = { 0 };
351   serf_closure_t *sct = closure;
352   serf_check_info_t *ci = sct->check->closure;
353
354   desc.desc_type = APR_POLL_SOCKET;
355   desc.desc.s = sct->skt;
356
357   desc.rtnevents = 0;
358   if(mask & EVENTER_READ) desc.rtnevents |= APR_POLLIN;
359   if(mask & EVENTER_WRITE) desc.rtnevents |= APR_POLLOUT;
360   if(mask & EVENTER_EXCEPTION) desc.rtnevents |= APR_POLLERR;
361   serf_event_trigger(ci->context, sct->serf_baton, &desc);
362   serf_context_prerun(ci->context);
363
364   /* We're about to deschedule and free the event, drop our reference */
365   if(!e->mask)
366     ci->fd_event = NULL;
367
368   return e->mask;
369 }
370
371 static int serf_init(noit_module_t *self) {
372   return 0;
373 }
374 static void closed_connection(serf_connection_t *conn,
375                               void *closed_baton,
376                               apr_status_t why,
377                               apr_pool_t *pool) {
378 }
379
380 static serf_bucket_t* conn_setup(apr_socket_t *skt,
381                                 void *setup_baton,
382                                 apr_pool_t *pool) {
383   serf_bucket_t *c;
384   app_baton_t *ctx = setup_baton;
385
386   c = serf_bucket_socket_create(skt, ctx->bkt_alloc);
387   if (ctx->using_ssl) {
388       c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
389       if (!ctx->ssl_ctx) {
390           ctx->ssl_ctx = serf_bucket_ssl_decrypt_context_get(c);
391       }
392   }
393
394   return c;
395 }
396
397 static serf_bucket_t* accept_response(serf_request_t *request,
398                                       serf_bucket_t *stream,
399                                       void *acceptor_baton,
400                                       apr_pool_t *pool) {
401   serf_bucket_t *c;
402   serf_bucket_alloc_t *bkt_alloc;
403
404   /* get the per-request bucket allocator */
405   bkt_alloc = serf_request_get_alloc(request);
406
407   /* Create a barrier so the response doesn't eat us! */
408   c = serf_bucket_barrier_create(stream, bkt_alloc);
409
410   return serf_bucket_response_create(c, bkt_alloc);
411 }
412
413 static void append_buf(apr_pool_t *p, buf_t *b,
414                        const char *data, int len) {
415   char *n;
416   n = apr_palloc(p, b->l + len + 1);
417   if(b->l == 0)
418     b->b = n;
419   else {
420     memcpy(n, b->b, b->l);
421     b->b = n;
422   }
423   memcpy(b->b + b->l, data, len);
424   b->l += len;
425   b->b[b->l] = '\0';
426 }
427
428 static apr_status_t handle_response(serf_request_t *request,
429                                     serf_bucket_t *response,
430                                     void *handler_baton,
431                                     apr_pool_t *pool) {
432   const char *data;
433   apr_size_t len;
434   apr_status_t status;
435   handler_baton_t *ctx = handler_baton;
436   serf_check_info_t *ci = ctx->check->closure;
437
438   if(response == NULL) {
439     /* We were cancelled. */
440     goto finish;
441   }
442   status = serf_bucket_response_status(response, &ci->status);
443   if (status) {
444     if (APR_STATUS_IS_EAGAIN(status)) {
445       return status;
446     }
447     goto finish;
448   }
449
450   while (1) {
451     status = serf_bucket_read(response, 1024*32, &data, &len);
452     if (SERF_BUCKET_READ_ERROR(status))
453       return status;
454
455     append_buf(ci->pool, &ci->body, data, len);
456
457     /* are we done yet? */
458     if (APR_STATUS_IS_EOF(status)) {
459       serf_bucket_t *hdrs;
460       hdrs = serf_bucket_response_get_headers(response);
461       while (1) {
462         status = serf_bucket_read(hdrs, 2048, &data, &len);
463         if (SERF_BUCKET_READ_ERROR(status))
464           return status;
465
466         append_buf(ci->pool, &ci->headers, data, len);
467         if (APR_STATUS_IS_EOF(status)) {
468           break;
469         }
470       }
471
472       goto finish;
473     }
474
475     /* have we drained the response so far? */
476     if (APR_STATUS_IS_EAGAIN(status))
477       return status;
478
479     /* loop to read some more. */
480   }
481  finish:
482   gettimeofday(&ci->finish_time, NULL);
483   if(ci->timeout_event) {
484     eventer_remove(ci->timeout_event);
485     ci->timed_out = 0;
486     memcpy(&ci->timeout_event->whence, &ci->finish_time,
487            sizeof(&ci->finish_time));
488     eventer_add(ci->timeout_event);
489   }
490   return APR_EOF;
491 }
492
493 static apr_status_t setup_request(serf_request_t *request,
494                                   void *setup_baton,
495                                   serf_bucket_t **req_bkt,
496                                   serf_response_acceptor_t *acceptor,                                             void **acceptor_baton,
497                                   serf_response_handler_t *handler,
498                                   void **handler_baton,
499                                   apr_pool_t *pool) {
500   handler_baton_t *ctx = setup_baton;
501   serf_bucket_t *hdrs_bkt;
502   serf_bucket_t *body_bkt;
503
504   body_bkt = NULL;
505
506   *req_bkt = serf_bucket_request_create(ctx->method, ctx->path, body_bkt,
507                                         serf_request_get_alloc(request));
508
509   hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
510
511   serf_bucket_headers_setn(hdrs_bkt, "Host", ctx->host);
512   serf_bucket_headers_setn(hdrs_bkt, "User-Agent",
513                            "Noit/" NOIT_HTTP_VERSION_STRING);
514   /* Shouldn't serf do this for us? */
515   serf_bucket_headers_setn(hdrs_bkt, "Accept-Encoding", "gzip");
516
517   if (ctx->authn != NULL) {
518     serf_bucket_headers_setn(hdrs_bkt, "Authorization", ctx->authn);
519   }
520
521   if (ctx->acceptor_baton->using_ssl) {
522     serf_bucket_alloc_t *req_alloc;
523     app_baton_t *app_ctx = ctx->acceptor_baton;
524
525     req_alloc = serf_request_get_alloc(request);
526
527     if (app_ctx->ssl_ctx == NULL) {
528       *req_bkt =
529         serf_bucket_ssl_encrypt_create(*req_bkt, NULL,
530                                        app_ctx->bkt_alloc);
531       app_ctx->ssl_ctx =
532         serf_bucket_ssl_encrypt_context_get(*req_bkt);
533     }
534     else {
535       *req_bkt =
536         serf_bucket_ssl_encrypt_create(*req_bkt, app_ctx->ssl_ctx,
537                                        app_ctx->bkt_alloc);
538     }
539   }
540
541   *acceptor = ctx->acceptor;
542   *acceptor_baton = ctx->acceptor_baton;
543   *handler = ctx->handler;
544   *handler_baton = ctx;
545
546   return APR_SUCCESS;
547 }
548
549 struct __unix_apr_socket_t {
550   apr_pool_t *pool;
551   int socketdes;
552 };
553
554 static apr_status_t serf_eventer_add(void *user_baton,
555                                      apr_pollfd_t *pfd,
556                                      void *serf_baton) {
557   eventer_t e, newe = NULL;
558   serf_closure_t *sct = user_baton, *newsct;
559   assert(pfd->desc_type == APR_POLL_SOCKET);
560   struct __unix_apr_socket_t *hack = (struct __unix_apr_socket_t *)pfd->desc.s;
561
562   noitL(nldeb, "serf_eventer_add() => %d\n", hack->socketdes);
563   e = eventer_find_fd(hack->socketdes);
564   if(!e) {
565     newe = e = eventer_alloc();
566     e->fd = hack->socketdes;
567     e->callback = serf_handler;
568     e->closure = calloc(1, sizeof(serf_closure_t));
569   }
570   newsct = e->closure;
571   newsct->self = sct->self;
572   newsct->check = sct->check;
573   newsct->serf_baton = serf_baton;
574   newsct->skt = pfd->desc.s;
575   e->mask = 0;
576   if(pfd->reqevents & APR_POLLIN) e->mask |= EVENTER_READ;
577   if(pfd->reqevents & APR_POLLOUT) e->mask |= EVENTER_WRITE;
578   if(pfd->reqevents & APR_POLLERR) e->mask |= EVENTER_EXCEPTION;
579   if(newe) {
580     serf_check_info_t *ci = sct->check->closure;
581     eventer_add(newe);
582     ci->fd_event = newe;
583   }
584 /* ** Unneeded as this is called recursively **
585   else
586     eventer_update(e);
587 */
588   return APR_SUCCESS;
589 }
590 static apr_status_t serf_eventer_remove(void *user_baton,
591                                         apr_pollfd_t *pfd,
592                                         void *serf_baton) {
593   serf_closure_t *sct = user_baton;
594   serf_check_info_t *ci;
595   eventer_t e;
596
597   ci = sct->check->closure;
598   assert(pfd->desc_type == APR_POLL_SOCKET);
599   struct __unix_apr_socket_t *hack = (struct __unix_apr_socket_t *)pfd->desc.s;
600
601   noitL(nldeb, "serf_eventer_remove() => %d\n", hack->socketdes);
602   e = eventer_find_fd(hack->socketdes);
603   if(e) e->mask = 0;
604   return 0;
605 }
606
607 static int serf_initiate(noit_module_t *self, noit_check_t *check) {
608   serf_closure_t *ccl;
609   serf_check_info_t *ci;
610   struct timeval when, p_int;
611   apr_status_t status;
612   eventer_t newe;
613   serf_module_conf_t *mod_config;
614   char *config_url;
615
616   mod_config = noit_module_get_userdata(self);
617   ci = (serf_check_info_t *)check->closure;
618   /* We cannot be running */
619   assert(!(check->flags & NP_RUNNING));
620   check->flags |= NP_RUNNING;
621   noitL(nldeb, "serf_initiate(%p,%s)\n",
622         self, check->target);
623
624   /* remove a timeout if we still have one -- we should unless someone
625    * has set a lower timeout than the period.
626    */
627   ci->timed_out = 1;
628   if(ci->timeout_event) {
629     eventer_remove(ci->timeout_event);
630     free(ci->timeout_event->closure);
631     eventer_free(ci->timeout_event);
632     ci->timeout_event = NULL;
633   }
634   assert(!ci->pool);
635   apr_pool_create(&ci->pool, NULL);
636   apr_atomic_init(ci->pool);
637
638   gettimeofday(&when, NULL);
639   memcpy(&check->last_fire_time, &when, sizeof(when));
640
641   ccl = apr_pcalloc(ci->pool, sizeof(*ccl));
642   ccl->self = self;
643   ccl->check = check;
644
645   if(!noit_hash_retrieve(check->config, "url", strlen("url"),
646                         (void **)&config_url))
647     if(!mod_config->options ||
648        !noit_hash_retrieve(mod_config->options, "url", strlen("url"),
649                            (void **)&config_url))
650       config_url = "http://localhost/";
651   apr_uri_parse(ci->pool, config_url, &ci->url);
652
653   if (!ci->url.port) {
654     ci->url.port = apr_uri_port_of_scheme(ci->url.scheme);
655   }
656   if (!ci->url.path) {
657     ci->url.path = "/";
658   }
659
660   if (strcasecmp(ci->url.scheme, "https") == 0) {
661     ci->app_ctx.using_ssl = 1;
662   }
663   else {
664     ci->app_ctx.using_ssl = 0;
665   }
666
667   status = apr_sockaddr_info_get(&ci->address,
668                                  check->target, APR_UNSPEC, ci->url.port, 0,
669                                  ci->pool);
670   if (status) {
671     /* Handle error -- log failure */
672     apr_pool_destroy(ci->pool);
673     memset(ci, 0, sizeof(*ci));
674     check->flags &= ~NP_RUNNING;
675     return 0;
676   }
677
678   ci->context = serf_context_create_ex(ccl, serf_eventer_add,
679                                        serf_eventer_remove, ci->pool);
680
681   ci->app_ctx.bkt_alloc = serf_bucket_allocator_create(ci->pool, NULL, NULL);
682   ci->app_ctx.ssl_ctx = NULL;
683
684   ci->connection = serf_connection_create(ci->context, ci->address,
685                                           conn_setup, &ci->app_ctx,
686                                           closed_connection, &ci->app_ctx,
687                                           ci->pool);
688
689   ci->handler_ctx.method = apr_pstrdup(ci->pool, "GET");
690   ci->handler_ctx.host = apr_pstrdup(ci->pool, check->target);
691   ci->handler_ctx.path = ci->url.path;
692   ci->handler_ctx.authn = NULL;
693
694   ci->handler_ctx.acceptor = accept_response;
695   ci->handler_ctx.acceptor_baton = &ci->app_ctx;
696   ci->handler_ctx.handler = handle_response;
697   ci->handler_ctx.self = self;
698   ci->handler_ctx.check = check;
699
700   ci->request = serf_connection_request_create(ci->connection, setup_request,
701                                                &ci->handler_ctx);
702   serf_context_prerun(ci->context);
703
704   newe = eventer_alloc();
705   newe->mask = EVENTER_TIMER;
706   gettimeofday(&when, NULL);
707   p_int.tv_sec = check->timeout / 1000;
708   p_int.tv_usec = (check->timeout % 1000) * 1000;
709   add_timeval(when, p_int, &newe->whence);
710   ccl = calloc(1, sizeof(*ccl));
711   ccl->self = self;
712   ccl->check = check;
713   newe->closure = ccl;
714   newe->callback = serf_complete;
715   eventer_add(newe);
716   ci->timeout_event = newe;
717   return 0;
718 }
719 static int serf_schedule_next(noit_module_t *self,
720                               struct timeval *last_check, noit_check_t *check,
721                               struct timeval *now) {
722   eventer_t newe;
723   struct timeval period, earliest;
724   serf_closure_t *ccl;
725
726   if(check->period == 0) return 0;
727
728   /* If we have an event, we know when we intended it to fire.  This means
729    * we should schedule that point + period.
730    */
731   if(now)
732     memcpy(&earliest, now, sizeof(earliest));
733   else
734     gettimeofday(&earliest, NULL);
735   period.tv_sec = check->period / 1000;
736   period.tv_usec = (check->period % 1000) * 1000;
737
738   newe = eventer_alloc();
739   memcpy(&newe->whence, last_check, sizeof(*last_check));
740   add_timeval(newe->whence, period, &newe->whence);
741   if(compare_timeval(newe->whence, earliest) < 0)
742     memcpy(&newe->whence, &earliest, sizeof(earliest));
743   newe->mask = EVENTER_TIMER;
744   newe->callback = serf_recur_handler;
745   ccl = calloc(1, sizeof(*ccl));
746   ccl->self = self;
747   ccl->check = check;
748   newe->closure = ccl;
749
750   eventer_add(newe);
751   check->fire_event = newe;
752   return 0;
753 }
754 static int serf_recur_handler(eventer_t e, int mask, void *closure,
755                               struct timeval *now) {
756   serf_closure_t *cl = (serf_closure_t *)closure;
757   serf_schedule_next(cl->self, &e->whence, cl->check, now);
758   serf_initiate(cl->self, cl->check);
759   free(cl);
760   return 0;
761 }
762 static int serf_initiate_check(noit_module_t *self, noit_check_t *check,
763                                int once, noit_check_t *cause) {
764   if(!check->closure) check->closure = calloc(1, sizeof(serf_check_info_t));
765   if(once) {
766     serf_initiate(self, check);
767     return 0;
768   }
769   /* If check->fire_event, we're already scheduled... */
770   if(!check->fire_event) {
771     struct timeval epoch = { 0L, 0L };
772     noit_check_fake_last_check(check, &epoch, NULL);
773     serf_schedule_next(self, &epoch, check, NULL);
774   }
775   return 0;
776 }
777 static int resmon_initiate_check(noit_module_t *self, noit_check_t *check,
778                                  int once, noit_check_t *parent) {
779   /* resmon_check_info_t gives us a bit more space */
780   if(!check->closure) check->closure = calloc(1, sizeof(resmon_check_info_t));
781   if(once) {
782     serf_initiate(self, check);
783     return 0;
784   }
785   if(!check->fire_event) {
786     struct timeval epoch = { 0L, 0L };
787     noit_check_fake_last_check(check, &epoch, NULL);
788     serf_schedule_next(self, &epoch, check, NULL);
789   }
790   return 0;
791 }
792
793 static int resmon_part_initiate_check(noit_module_t *self, noit_check_t *check,
794                                       int once, noit_check_t *parent) {
795   char xpathexpr[1024];
796   const char *resmod, *resserv;
797   resmon_check_info_t *rci;
798
799   if(!check->closure) check->closure = calloc(1, sizeof(resmon_check_info_t));
800   rci = check->closure;
801   if(!rci->xpathexpr) {
802     if(!noit_hash_retrieve(check->config,
803                            "resmon_module", strlen("resmon_module"),
804                            (void **)&resmod)) {
805       resmod = "DUMMY_MODULE";
806     }
807     if(!noit_hash_retrieve(check->config,
808                            "resmon_service", strlen("resmon_service"),
809                            (void **)&resserv)) {
810       resserv = "DUMMY_SERVICE";
811     }
812     snprintf(xpathexpr, sizeof(xpathexpr),
813              "//ResmonResult[@module=\"%s\" and @service=\"%s\"]/*",
814              resmod, resserv);
815     rci->xpathexpr = strdup(xpathexpr);
816     rci->resmod = strdup(resmod);
817     rci->resserv = strdup(resserv);
818   }
819
820   if(parent && !strcmp(parent->module, "resmon")) {
821     /* Content is cached in the parent */
822     resmon_part_log_results(self, check, parent);
823     return 0;
824   }
825   if(once) {
826     serf_initiate(self, check);
827     return 0;
828   }
829   if(!check->fire_event)
830     serf_schedule_next(self, NULL, check, NULL);
831   return 0;
832 }
833
834
835
836 static int serf_onload(noit_module_t *self) {
837   apr_initialize();
838   atexit(apr_terminate);
839
840   nlerr = noit_log_stream_find("error/serf");
841   nldeb = noit_log_stream_find("debug/serf");
842   if(!nlerr) nlerr = noit_stderr;
843   if(!nldeb) nldeb = noit_debug;
844
845   eventer_name_callback("http/serf_handler", serf_handler);
846   eventer_name_callback("http/serf_complete", serf_complete);
847   eventer_name_callback("http/serf_recur_handler", serf_recur_handler);
848   return 0;
849 }
850 noit_module_t http = {
851   NOIT_MODULE_MAGIC,
852   NOIT_MODULE_ABI_VERSION,
853   "http",
854   "libserf-based HTTP and HTTPS resource checker",
855   serf_onload,
856   serf_config,
857   serf_init,
858   serf_initiate_check
859 };
860
861 noit_module_t resmon = {
862   NOIT_MODULE_MAGIC,
863   NOIT_MODULE_ABI_VERSION,
864   "resmon",
865   "libserf-based resmon resource checker",
866   serf_onload,
867   resmon_config,
868   serf_init,
869   resmon_initiate_check
870 };
871
872 noit_module_t resmon_part = {
873   NOIT_MODULE_MAGIC,
874   NOIT_MODULE_ABI_VERSION,
875   "resmon_part",
876   "resmon part resource checker",
877   serf_onload,
878   resmon_config,
879   serf_init,
880   resmon_part_initiate_check
881 };
882
Note: See TracBrowser for help on using the browser.