root/src/modules/http.c

Revision 0dff7e9e489238bfe0e8f15a5021453ac9fde2b0, 28.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

set the finish time when we are causally induced

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7
8 #include <stdio.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <math.h>
13
14 #include <libxml/parser.h>
15 #include <libxml/tree.h>
16 #include <libxml/xpath.h>
17
18 #include "noit_module.h"
19 #include "noit_check.h"
20 #include "utils/noit_log.h"
21 #include "utils/noit_hash.h"
22
23 #include <apr_uri.h>
24 #include <apr_atomic.h>
25 #include <apr_strings.h>
26 #include "serf.h"
27
28 #define NOIT_HTTP_VERSION_STRING "0.1"
29
30 typedef struct {
31   noit_hash_table *options;
32   void (*results)(noit_module_t *, noit_check_t *);
33 } serf_module_conf_t;
34
35 typedef struct {
36   int using_ssl;
37   serf_ssl_context_t *ssl_ctx;
38   serf_bucket_alloc_t *bkt_alloc;
39 } app_baton_t;
40
41 typedef struct {
42   serf_response_acceptor_t acceptor;
43   app_baton_t *acceptor_baton;
44
45   serf_response_handler_t handler;
46   const char *host;
47   const char *method;
48   const char *path;
49   const char *authn;
50
51   noit_module_t *self;
52   noit_check_t *check;
53 } handler_baton_t;
54
55 typedef struct buf_t {
56   char *b;
57   int32_t l;
58 } buf_t;
59
60 typedef struct {
61   apr_pool_t *pool;
62   apr_sockaddr_t *address;
63   serf_context_t *context;
64   serf_connection_t *connection;
65   serf_request_t *request;
66   app_baton_t app_ctx;
67   handler_baton_t handler_ctx;
68   apr_uri_t url;
69   int timed_out;
70
71   serf_status_line status;
72   buf_t headers;
73   buf_t body;
74
75   struct timeval finish_time;
76   eventer_t fd_event;
77   eventer_t timeout_event;
78 } serf_check_info_t;
79
80 typedef struct {
81   serf_check_info_t serf;
82   struct timeval xml_doc_time;
83   char *xpathexpr;
84   xmlDocPtr xml_doc;
85   char *resmod;
86   char *resserv;
87 } resmon_check_info_t;
88
89 typedef struct {
90   noit_module_t *self;
91   noit_check_t *check;
92   void *serf_baton;
93   apr_socket_t *skt;
94 } serf_closure_t;
95
96 static noit_log_stream_t nlerr = NULL;
97 static noit_log_stream_t nldeb = NULL;
98 static int serf_handler(eventer_t e, int mask, void *closure,
99                         struct timeval *now);
100 static int serf_recur_handler(eventer_t e, int mask, void *closure,
101                               struct timeval *now);
102 static void serf_log_results(noit_module_t *self, noit_check_t *check);
103 static void resmon_log_results(noit_module_t *self, noit_check_t *check);
104 static void resmon_part_log_results(noit_module_t *self, noit_check_t *check,
105                                     noit_check_t *parent);
106
107 static int serf_config(noit_module_t *self, noit_hash_table *options) {
108   serf_module_conf_t *conf;
109   conf = noit_module_get_userdata(self);
110   if(conf) {
111     if(conf->options) {
112       noit_hash_destroy(conf->options, free, free);
113       free(conf->options);
114     }
115   }
116   else
117     conf = calloc(1, sizeof(*conf));
118   conf->options = options;
119   conf->results = serf_log_results;
120   noit_module_set_userdata(self, conf);
121   return 0;
122 }
123 static int resmon_config(noit_module_t *self, noit_hash_table *options) {
124   serf_module_conf_t *conf;
125   conf = noit_module_get_userdata(self);
126   if(conf) {
127     if(conf->options) {
128       noit_hash_destroy(conf->options, free, free);
129       free(conf->options);
130     }
131   }
132   else
133     conf = calloc(1, sizeof(*conf));
134   conf->options = options;
135   if(!conf->options) conf->options = calloc(1, sizeof(*conf->options));
136   noit_hash_store(conf->options, strdup("url"), strlen("url"),
137                   strdup("http://localhost:81/"));
138   conf->results = resmon_log_results;
139   noit_module_set_userdata(self, conf);
140   return 0;
141 }
142 static void generic_log_results(noit_module_t *self, noit_check_t *check) {
143   serf_module_conf_t *module_conf;
144   module_conf = noit_module_get_userdata(self);
145   module_conf->results(self, check);
146 }
147 static void serf_log_results(noit_module_t *self, noit_check_t *check) {
148   serf_check_info_t *ci = check->closure;
149   struct timeval duration;
150   stats_t current;
151   int expect_code = 200;
152   char *code_str;
153   char human_buffer[256], code[4], rt[14];
154
155   noit_check_stats_clear(&current);
156
157   if(noit_hash_retrieve(check->config, "code", strlen("code"),
158                         (void **)&code_str))
159     expect_code = atoi(code_str);
160
161   sub_timeval(ci->finish_time, check->last_fire_time, &duration);
162
163   snprintf(code, sizeof(code), "%3d", ci->status.code);
164   snprintf(rt, sizeof(rt), "%.3fs",
165            (float)duration.tv_sec + (float)duration.tv_usec / 1000000.0);
166   snprintf(human_buffer, sizeof(human_buffer),
167            "code=%s,rt=%s,bytes=%d",
168            ci->status.code ? code : "undefined",
169            ci->timed_out ? "timeout" : rt,
170            ci->body.l);
171   noitL(nldeb, "http(%s) [%s]\n", check->target, human_buffer);
172
173   memcpy(&current.whence, &ci->finish_time, sizeof(current.whence));
174   current.duration = duration.tv_sec * 1000 + duration.tv_usec / 1000;
175   current.available = (ci->timed_out || !ci->status.code) ? NP_UNAVAILABLE : NP_AVAILABLE;
176   current.state = (ci->status.code != 200) ? NP_BAD : NP_GOOD;
177   current.status = human_buffer;
178   if(current.available == NP_AVAILABLE) {
179     noit_stats_set_metric(&current, "code",
180                           METRIC_STRING, ci->status.code?code:NULL);
181     noit_stats_set_metric(&current, "bytes",
182                           METRIC_INT32, &ci->body.l);
183   }
184   else {
185     noit_stats_set_metric(&current, "code", METRIC_STRING, NULL);
186     noit_stats_set_metric(&current, "bytes", METRIC_INT32, NULL);
187   }
188   noit_check_set_stats(self, check, &current);
189 }
190 static void resmon_part_log_results_xml(noit_module_t *self,
191                                         noit_check_t *check,
192                                         xmlDocPtr xml) {
193   serf_check_info_t *ci = check->closure;
194   resmon_check_info_t *rci = check->closure;
195   xmlXPathContextPtr xpath_ctxt = NULL;
196   stats_t current;
197
198   noit_check_stats_clear(&current);
199   memcpy(&current.whence, &ci->finish_time, sizeof(current.whence));
200   current.available = NP_UNAVAILABLE;
201   current.state = NP_BAD;
202
203   if(xml && rci->xpathexpr) {
204     current.available = NP_AVAILABLE;
205     xpath_ctxt = xmlXPathNewContext(xml);
206     if(xpath_ctxt) {
207       xmlXPathObjectPtr pobj;
208       pobj = xmlXPathEval((xmlChar *)rci->xpathexpr, xpath_ctxt);
209       if(pobj) {
210         int i, cnt;
211         cnt = xmlXPathNodeSetGetLength(pobj->nodesetval);
212         for(i=0; i<cnt; i++) {
213           xmlNodePtr node;
214           char *value;
215           node = xmlXPathNodeSetItem(pobj->nodesetval, i);
216           value = (char *)xmlXPathCastNodeToString(node);
217           if(!strcmp((char *)node->name,"last_runtime_seconds")) {
218             float duration = atof(value) * 1000;
219             current.duration = (int) duration;
220           }
221           else if(!strcmp((char *)node->name, "message")) {
222             current.status = strdup(value);
223           }
224           else if(!strcmp((char *)node->name, "state")) {
225             current.state = strcmp(value,"OK") ? NP_BAD : NP_GOOD;
226           }
227         }
228       }
229     }
230   }
231   memcpy(&current.whence, &rci->serf.finish_time, sizeof(current.whence));
232   current.status = current.status ? current.status : strdup("unknown");
233   noitL(nldeb, "resmon_part(%s/%s/%s) [%s]\n", check->target,
234         rci->resmod, rci->resserv, current.status);
235   noit_check_set_stats(self, check, &current);
236 }
237 static void resmon_part_log_results(noit_module_t *self, noit_check_t *check,
238                                     noit_check_t *parent) {
239   resmon_check_info_t *rci = parent->closure;
240   resmon_part_log_results_xml(self, check, rci->xml_doc);
241 }
242 static void resmon_log_results(noit_module_t *self, noit_check_t *check) {
243   serf_check_info_t *ci = check->closure;
244   resmon_check_info_t *rci = check->closure;
245   struct timeval duration;
246   stats_t current;
247   int32_t services = 0;
248   char human_buffer[256], rt[14];
249   xmlDocPtr resmon_results = NULL;
250   xmlXPathContextPtr xpath_ctxt = NULL;
251   xmlXPathObjectPtr pobj = NULL;
252
253   noit_check_stats_clear(&current);
254
255   if(ci->body.b) resmon_results = xmlParseMemory(ci->body.b, ci->body.l);
256   if(resmon_results) {
257     xpath_ctxt = xmlXPathNewContext(resmon_results);
258     pobj = xmlXPathEval((xmlChar *)"/ResmonResults/ResmonResult", xpath_ctxt);
259     if(pobj)
260       if(pobj->type == XPATH_NODESET)
261         services = xmlXPathNodeSetGetLength(pobj->nodesetval);
262   } else {
263     if(ci->body.l)
264       noitL(nlerr, "Error in resmon doc: %s\n", ci->body.b);
265   }
266
267   /* Save our results for future dependent checks */
268   memcpy(&current.whence, &ci->finish_time, sizeof(current.whence));
269   memcpy(&rci->xml_doc_time, &ci->finish_time, sizeof(ci->finish_time));
270   if(rci->xml_doc) xmlFreeDoc(rci->xml_doc);
271   rci->xml_doc = resmon_results;
272
273   if(rci->xpathexpr) {
274     /* This is actually a part check... we had to do all the work as
275      * it isn't being used as a causal firing from a generic resmon check
276      */
277     resmon_part_log_results_xml(self, check, rci->xml_doc);
278     goto out;
279   }
280
281   sub_timeval(ci->finish_time, check->last_fire_time, &duration);
282   snprintf(rt, sizeof(rt), "%.3fs",
283            (float)duration.tv_sec + (float)duration.tv_usec / 1000000.0);
284   snprintf(human_buffer, sizeof(human_buffer),
285            "services=%d,rt=%s",
286            services,
287            ci->timed_out ? "timeout" : rt);
288   noitL(nldeb, "resmon(%s) [%s]\n", check->target, human_buffer);
289
290   current.duration = duration.tv_sec * 1000 + duration.tv_usec / 1000;
291   current.available = (ci->timed_out || ci->status.code != 200) ?
292                           NP_UNAVAILABLE : NP_AVAILABLE;
293   current.state = services ? NP_GOOD : NP_BAD;
294   current.status = human_buffer;
295
296   noit_stats_set_metric(&current, "services", METRIC_INT32, &services);
297   if(services) {
298     int i;
299     for(i=0; i<services; i++) {
300       xmlNodePtr node, attrnode;
301       node = xmlXPathNodeSetItem(pobj->nodesetval, i);
302       if(node) {
303         int a;
304         char *attrs[3] = { "last_runtime_seconds", "state", "message" };
305         char *resmod = NULL, *resserv = NULL, *value = NULL;
306         char attr[1024];
307         xmlXPathObjectPtr sobj;
308
309         xpath_ctxt->node = node;
310         sobj = xmlXPathEval((xmlChar *)"@module", xpath_ctxt);
311         resmod = (char *)xmlXPathCastNodeSetToString(sobj->nodesetval);
312         sobj = xmlXPathEval((xmlChar *)"@service", xpath_ctxt);
313         resserv = (char *)xmlXPathCastNodeSetToString(sobj->nodesetval);
314         if(!resmod && !resserv) continue;
315
316         for(a=0; a<3; a++) {
317           int32_t intval;
318           sobj = xmlXPathEval((xmlChar *)attrs[a], xpath_ctxt);
319           attrnode = xmlXPathNodeSetItem(sobj->nodesetval, 0);
320           value = (char *)xmlXPathCastNodeToString(attrnode);
321           snprintf(attr, sizeof(attr), "%s`%s`%s",
322                    resmod, resserv, (char *)attrnode->name);
323           switch(a) {
324             case 0:
325               /* The first is integer */
326               intval = (int)(atof(value) * 1000.0);
327               noit_stats_set_metric(&current, attr, METRIC_INT32, &intval);
328               break;
329             case 1:
330               noit_stats_set_metric(&current, attr, METRIC_STRING, value);
331               break;
332             case 2:
333               noit_stats_set_metric(&current, attr, METRIC_GUESS, value);
334               break;
335           }
336         }
337       }
338     }
339   }
340
341   noit_check_set_stats(self, check, &current);
342
343  out:
344   if(pobj) xmlXPathFreeObject(pobj);
345   if(xpath_ctxt) xmlXPathFreeContext(xpath_ctxt);
346 }
347 static void serf_check_info_cleanup(serf_check_info_t *ci) {
348   if(ci->connection) {
349     serf_connection_close(ci->connection);
350     ci->connection = NULL;
351   }
352   if(ci->fd_event) {
353     eventer_remove_fd(ci->fd_event->fd);
354     eventer_free(ci->fd_event);
355     ci->fd_event = NULL;
356   }
357   ci->timeout_event = NULL;
358   if(ci->pool) apr_pool_destroy(ci->pool);
359   memset(ci, 0, sizeof(*ci));
360 }
361 static int serf_complete(eventer_t e, int mask,
362                          void *closure, struct timeval *now) {
363   serf_closure_t *ccl = (serf_closure_t *)closure;
364   serf_check_info_t *ci = (serf_check_info_t *)ccl->check->closure;
365
366   noitLT(nldeb, now, "serf_complete(%s)\n", ccl->check->target);
367   if(!NOIT_CHECK_DISABLED(ccl->check) && !NOIT_CHECK_KILLED(ccl->check)) {
368     generic_log_results(ccl->self, ccl->check);
369   }
370   serf_check_info_cleanup(ci);
371   ccl->check->flags &= ~NP_RUNNING;
372   free(ccl);
373   return 0;
374 }
375
376 static int serf_handler(eventer_t e, int mask,
377                         void *closure, struct timeval *now) {
378   apr_pollfd_t desc = { 0 };
379   serf_closure_t *sct = closure;
380   serf_check_info_t *ci = sct->check->closure;
381
382   desc.desc_type = APR_POLL_SOCKET;
383   desc.desc.s = sct->skt;
384
385   desc.rtnevents = 0;
386   if(mask & EVENTER_READ) desc.rtnevents |= APR_POLLIN;
387   if(mask & EVENTER_WRITE) desc.rtnevents |= APR_POLLOUT;
388   if(mask & EVENTER_EXCEPTION) desc.rtnevents |= APR_POLLERR;
389   serf_event_trigger(ci->context, sct->serf_baton, &desc);
390   serf_context_prerun(ci->context);
391
392   /* We're about to deschedule and free the event, drop our reference */
393   if(!e->mask)
394     ci->fd_event = NULL;
395
396   return e->mask;
397 }
398
399 static int serf_init(noit_module_t *self) {
400   return 0;
401 }
402 static void closed_connection(serf_connection_t *conn,
403                               void *closed_baton,
404                               apr_status_t why,
405                               apr_pool_t *pool) {
406 }
407
408 static serf_bucket_t* conn_setup(apr_socket_t *skt,
409                                 void *setup_baton,
410                                 apr_pool_t *pool) {
411   serf_bucket_t *c;
412   app_baton_t *ctx = setup_baton;
413
414   c = serf_bucket_socket_create(skt, ctx->bkt_alloc);
415   if (ctx->using_ssl) {
416       c = serf_bucket_ssl_decrypt_create(c, ctx->ssl_ctx, ctx->bkt_alloc);
417       if (!ctx->ssl_ctx) {
418           ctx->ssl_ctx = serf_bucket_ssl_decrypt_context_get(c);
419       }
420   }
421
422   return c;
423 }
424
425 static serf_bucket_t* accept_response(serf_request_t *request,
426                                       serf_bucket_t *stream,
427                                       void *acceptor_baton,
428                                       apr_pool_t *pool) {
429   serf_bucket_t *c;
430   serf_bucket_alloc_t *bkt_alloc;
431
432   /* get the per-request bucket allocator */
433   bkt_alloc = serf_request_get_alloc(request);
434
435   /* Create a barrier so the response doesn't eat us! */
436   c = serf_bucket_barrier_create(stream, bkt_alloc);
437
438   return serf_bucket_response_create(c, bkt_alloc);
439 }
440
441 static void append_buf(apr_pool_t *p, buf_t *b,
442                        const char *data, int len) {
443   char *n;
444   n = apr_palloc(p, b->l + len + 1);
445   if(b->l == 0)
446     b->b = n;
447   else {
448     memcpy(n, b->b, b->l);
449     b->b = n;
450   }
451   memcpy(b->b + b->l, data, len);
452   b->l += len;
453   b->b[b->l] = '\0';
454 }
455
456 static apr_status_t handle_response(serf_request_t *request,
457                                     serf_bucket_t *response,
458                                     void *handler_baton,
459                                     apr_pool_t *pool) {
460   const char *data;
461   apr_size_t len;
462   apr_status_t status;
463   handler_baton_t *ctx = handler_baton;
464   serf_check_info_t *ci = ctx->check->closure;
465
466   if(response == NULL) {
467     /* We were cancelled. */
468     goto finish;
469   }
470   status = serf_bucket_response_status(response, &ci->status);
471   if (status) {
472     if (APR_STATUS_IS_EAGAIN(status)) {
473       return status;
474     }
475     goto finish;
476   }
477
478   while (1) {
479     status = serf_bucket_read(response, 1024*32, &data, &len);
480     if (SERF_BUCKET_READ_ERROR(status))
481       return status;
482
483     append_buf(ci->pool, &ci->body, data, len);
484
485     /* are we done yet? */
486     if (APR_STATUS_IS_EOF(status)) {
487       serf_bucket_t *hdrs;
488       hdrs = serf_bucket_response_get_headers(response);
489       while (1) {
490         status = serf_bucket_read(hdrs, 2048, &data, &len);
491         if (SERF_BUCKET_READ_ERROR(status))
492           return status;
493
494         append_buf(ci->pool, &ci->headers, data, len);
495         if (APR_STATUS_IS_EOF(status)) {
496           break;
497         }
498       }
499
500       goto finish;
501     }
502
503     /* have we drained the response so far? */
504     if (APR_STATUS_IS_EAGAIN(status))
505       return status;
506
507     /* loop to read some more. */
508   }
509  finish:
510   gettimeofday(&ci->finish_time, NULL);
511   if(ci->timeout_event) {
512     eventer_remove(ci->timeout_event);
513     ci->timed_out = 0;
514     memcpy(&ci->timeout_event->whence, &ci->finish_time,
515            sizeof(&ci->finish_time));
516     eventer_add(ci->timeout_event);
517   }
518   return APR_EOF;
519 }
520
521 static apr_status_t setup_request(serf_request_t *request,
522                                   void *setup_baton,
523                                   serf_bucket_t **req_bkt,
524                                   serf_response_acceptor_t *acceptor,                                             void **acceptor_baton,
525                                   serf_response_handler_t *handler,
526                                   void **handler_baton,
527                                   apr_pool_t *pool) {
528   handler_baton_t *ctx = setup_baton;
529   serf_bucket_t *hdrs_bkt;
530   serf_bucket_t *body_bkt;
531
532   body_bkt = NULL;
533
534   *req_bkt = serf_bucket_request_create(ctx->method, ctx->path, body_bkt,
535                                         serf_request_get_alloc(request));
536
537   hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
538
539   serf_bucket_headers_setn(hdrs_bkt, "Host", ctx->host);
540   serf_bucket_headers_setn(hdrs_bkt, "User-Agent",
541                            "Noit/" NOIT_HTTP_VERSION_STRING);
542   /* Shouldn't serf do this for us? */
543   serf_bucket_headers_setn(hdrs_bkt, "Accept-Encoding", "gzip");
544
545   if (ctx->authn != NULL) {
546     serf_bucket_headers_setn(hdrs_bkt, "Authorization", ctx->authn);
547   }
548
549   if (ctx->acceptor_baton->using_ssl) {
550     serf_bucket_alloc_t *req_alloc;
551     app_baton_t *app_ctx = ctx->acceptor_baton;
552
553     req_alloc = serf_request_get_alloc(request);
554
555     if (app_ctx->ssl_ctx == NULL) {
556       *req_bkt =
557         serf_bucket_ssl_encrypt_create(*req_bkt, NULL,
558                                        app_ctx->bkt_alloc);
559       app_ctx->ssl_ctx =
560         serf_bucket_ssl_encrypt_context_get(*req_bkt);
561     }
562     else {
563       *req_bkt =
564         serf_bucket_ssl_encrypt_create(*req_bkt, app_ctx->ssl_ctx,
565                                        app_ctx->bkt_alloc);
566     }
567   }
568
569   *acceptor = ctx->acceptor;
570   *acceptor_baton = ctx->acceptor_baton;
571   *handler = ctx->handler;
572   *handler_baton = ctx;
573
574   return APR_SUCCESS;
575 }
576
577 struct __unix_apr_socket_t {
578   apr_pool_t *pool;
579   int socketdes;
580 };
581
582 static apr_status_t serf_eventer_add(void *user_baton,
583                                      apr_pollfd_t *pfd,
584                                      void *serf_baton) {
585   eventer_t e, newe = NULL;
586   serf_closure_t *sct = user_baton, *newsct;
587   assert(pfd->desc_type == APR_POLL_SOCKET);
588   struct __unix_apr_socket_t *hack = (struct __unix_apr_socket_t *)pfd->desc.s;
589
590   noitL(nldeb, "serf_eventer_add() => %d\n", hack->socketdes);
591   e = eventer_find_fd(hack->socketdes);
592   if(!e) {
593     newe = e = eventer_alloc();
594     e->fd = hack->socketdes;
595     e->callback = serf_handler;
596     e->closure = calloc(1, sizeof(serf_closure_t));
597   }
598   newsct = e->closure;
599   newsct->self = sct->self;
600   newsct->check = sct->check;
601   newsct->serf_baton = serf_baton;
602   newsct->skt = pfd->desc.s;
603   e->mask = 0;
604   if(pfd->reqevents & APR_POLLIN) e->mask |= EVENTER_READ;
605   if(pfd->reqevents & APR_POLLOUT) e->mask |= EVENTER_WRITE;
606   if(pfd->reqevents & APR_POLLERR) e->mask |= EVENTER_EXCEPTION;
607   if(newe) {
608     serf_check_info_t *ci = sct->check->closure;
609     eventer_add(newe);
610     ci->fd_event = newe;
611   }
612 /* ** Unneeded as this is called recursively **
613   else
614     eventer_update(e);
615 */
616   return APR_SUCCESS;
617 }
618 static apr_status_t serf_eventer_remove(void *user_baton,
619                                         apr_pollfd_t *pfd,
620                                         void *serf_baton) {
621   serf_closure_t *sct = user_baton;
622   serf_check_info_t *ci;
623   eventer_t e;
624
625   ci = sct->check->closure;
626   assert(pfd->desc_type == APR_POLL_SOCKET);
627   struct __unix_apr_socket_t *hack = (struct __unix_apr_socket_t *)pfd->desc.s;
628
629   noitL(nldeb, "serf_eventer_remove() => %d\n", hack->socketdes);
630   e = eventer_find_fd(hack->socketdes);
631   if(e) e->mask = 0;
632   return 0;
633 }
634
635 static int serf_initiate(noit_module_t *self, noit_check_t *check) {
636   serf_closure_t *ccl;
637   serf_check_info_t *ci;
638   struct timeval when, p_int;
639   apr_status_t status;
640   eventer_t newe;
641   serf_module_conf_t *mod_config;
642   char *config_url;
643
644   mod_config = noit_module_get_userdata(self);
645   ci = (serf_check_info_t *)check->closure;
646   /* We cannot be running */
647   assert(!(check->flags & NP_RUNNING));
648   check->flags |= NP_RUNNING;
649   noitL(nldeb, "serf_initiate(%p,%s)\n",
650         self, check->target);
651
652   /* remove a timeout if we still have one -- we should unless someone
653    * has set a lower timeout than the period.
654    */
655   ci->timed_out = 1;
656   if(ci->timeout_event) {
657     eventer_remove(ci->timeout_event);
658     free(ci->timeout_event->closure);
659     eventer_free(ci->timeout_event);
660     ci->timeout_event = NULL;
661   }
662   assert(!ci->pool);
663   apr_pool_create(&ci->pool, NULL);
664   apr_atomic_init(ci->pool);
665
666   gettimeofday(&when, NULL);
667   memcpy(&check->last_fire_time, &when, sizeof(when));
668
669   ccl = apr_pcalloc(ci->pool, sizeof(*ccl));
670   ccl->self = self;
671   ccl->check = check;
672
673   if(!noit_hash_retrieve(check->config, "url", strlen("url"),
674                         (void **)&config_url))
675     if(!mod_config->options ||
676        !noit_hash_retrieve(mod_config->options, "url", strlen("url"),
677                            (void **)&config_url))
678       config_url = "http://localhost/";
679   apr_uri_parse(ci->pool, config_url, &ci->url);
680
681   if (!ci->url.port) {
682     ci->url.port = apr_uri_port_of_scheme(ci->url.scheme);
683   }
684   if (!ci->url.path) {
685     ci->url.path = "/";
686   }
687
688   if (strcasecmp(ci->url.scheme, "https") == 0) {
689     ci->app_ctx.using_ssl = 1;
690   }
691   else {
692     ci->app_ctx.using_ssl = 0;
693   }
694
695   status = apr_sockaddr_info_get(&ci->address,
696                                  check->target, APR_UNSPEC, ci->url.port, 0,
697                                  ci->pool);
698   if (status) {
699     /* Handle error -- log failure */
700     apr_pool_destroy(ci->pool);
701     memset(ci, 0, sizeof(*ci));
702     check->flags &= ~NP_RUNNING;
703     return 0;
704   }
705
706   ci->context = serf_context_create_ex(ccl, serf_eventer_add,
707                                        serf_eventer_remove, ci->pool);
708
709   ci->app_ctx.bkt_alloc = serf_bucket_allocator_create(ci->pool, NULL, NULL);
710   ci->app_ctx.ssl_ctx = NULL;
711
712   ci->connection = serf_connection_create(ci->context, ci->address,
713                                           conn_setup, &ci->app_ctx,
714                                           closed_connection, &ci->app_ctx,
715                                           ci->pool);
716
717   ci->handler_ctx.method = apr_pstrdup(ci->pool, "GET");
718   ci->handler_ctx.host = apr_pstrdup(ci->pool, ci->url.hostname);
719   ci->handler_ctx.path = ci->url.path;
720   ci->handler_ctx.authn = NULL;
721
722   ci->handler_ctx.acceptor = accept_response;
723   ci->handler_ctx.acceptor_baton = &ci->app_ctx;
724   ci->handler_ctx.handler = handle_response;
725   ci->handler_ctx.self = self;
726   ci->handler_ctx.check = check;
727
728   ci->request = serf_connection_request_create(ci->connection, setup_request,
729                                                &ci->handler_ctx);
730   serf_context_prerun(ci->context);
731
732   newe = eventer_alloc();
733   newe->mask = EVENTER_TIMER;
734   gettimeofday(&when, NULL);
735   p_int.tv_sec = check->timeout / 1000;
736   p_int.tv_usec = (check->timeout % 1000) * 1000;
737   add_timeval(when, p_int, &newe->whence);
738   ccl = calloc(1, sizeof(*ccl));
739   ccl->self = self;
740   ccl->check = check;
741   newe->closure = ccl;
742   newe->callback = serf_complete;
743   eventer_add(newe);
744   ci->timeout_event = newe;
745   return 0;
746 }
747 static int serf_schedule_next(noit_module_t *self,
748                               struct timeval *last_check, noit_check_t *check,
749                               struct timeval *now) {
750   eventer_t newe;
751   struct timeval period, earliest;
752   serf_closure_t *ccl;
753
754   if(check->period == 0) return 0;
755
756   /* If we have an event, we know when we intended it to fire.  This means
757    * we should schedule that point + period.
758    */
759   if(now)
760     memcpy(&earliest, now, sizeof(earliest));
761   else
762     gettimeofday(&earliest, NULL);
763   period.tv_sec = check->period / 1000;
764   period.tv_usec = (check->period % 1000) * 1000;
765
766   newe = eventer_alloc();
767   memcpy(&newe->whence, last_check, sizeof(*last_check));
768   add_timeval(newe->whence, period, &newe->whence);
769   if(compare_timeval(newe->whence, earliest) < 0)
770     memcpy(&newe->whence, &earliest, sizeof(earliest));
771   newe->mask = EVENTER_TIMER;
772   newe->callback = serf_recur_handler;
773   ccl = calloc(1, sizeof(*ccl));
774   ccl->self = self;
775   ccl->check = check;
776   newe->closure = ccl;
777
778   eventer_add(newe);
779   check->fire_event = newe;
780   return 0;
781 }
782 static int serf_recur_handler(eventer_t e, int mask, void *closure,
783                               struct timeval *now) {
784   serf_closure_t *cl = (serf_closure_t *)closure;
785   serf_schedule_next(cl->self, &e->whence, cl->check, now);
786   serf_initiate(cl->self, cl->check);
787   free(cl);
788   return 0;
789 }
790 static void serf_cleanup(noit_module_t *self, noit_check_t *check) {
791   serf_check_info_t *sci;
792   if(check->fire_event) {
793     eventer_remove(check->fire_event);
794     free(check->fire_event->closure);
795     eventer_free(check->fire_event);
796     check->fire_event = NULL;
797   }
798   sci = check->closure;
799   if(sci) {
800     serf_check_info_cleanup(sci);
801     free(sci);
802   }
803 }
804 static int serf_initiate_check(noit_module_t *self, noit_check_t *check,
805                                int once, noit_check_t *cause) {
806   if(!check->closure) check->closure = calloc(1, sizeof(serf_check_info_t));
807   if(once) {
808     serf_initiate(self, check);
809     return 0;
810   }
811   /* If check->fire_event, we're already scheduled... */
812   if(!check->fire_event) {
813     struct timeval epoch = { 0L, 0L };
814     noit_check_fake_last_check(check, &epoch, NULL);
815     serf_schedule_next(self, &epoch, check, NULL);
816   }
817   return 0;
818 }
819 static int resmon_initiate_check(noit_module_t *self, noit_check_t *check,
820                                  int once, noit_check_t *parent) {
821   /* resmon_check_info_t gives us a bit more space */
822   if(!check->closure) check->closure = calloc(1, sizeof(resmon_check_info_t));
823   if(once) {
824     serf_initiate(self, check);
825     return 0;
826   }
827   if(!check->fire_event) {
828     struct timeval epoch = { 0L, 0L };
829     noit_check_fake_last_check(check, &epoch, NULL);
830     serf_schedule_next(self, &epoch, check, NULL);
831   }
832   return 0;
833 }
834
835 static void resmon_cleanup(noit_module_t *self, noit_check_t *check) {
836   resmon_check_info_t *rci;
837   if(check->fire_event) {
838     eventer_remove(check->fire_event);
839     free(check->fire_event->closure);
840     eventer_free(check->fire_event);
841     check->fire_event = NULL;
842   }
843   rci = check->closure;
844   if(rci) {
845     if(rci->xpathexpr) free(rci->xpathexpr);
846     if(rci->resmod) free(rci->resmod);
847     if(rci->resserv) free(rci->resserv);
848     if(rci->xml_doc) xmlFreeDoc(rci->xml_doc);
849     serf_check_info_cleanup(&rci->serf);
850     free(rci);
851   }
852 }
853 static int resmon_part_initiate_check(noit_module_t *self, noit_check_t *check,
854                                       int once, noit_check_t *parent) {
855   char xpathexpr[1024];
856   const char *resmod, *resserv;
857   resmon_check_info_t *rci;
858
859   if(NOIT_CHECK_DISABLED(check) || NOIT_CHECK_KILLED(check)) return 0;
860
861   if(!check->closure) check->closure = calloc(1, sizeof(resmon_check_info_t));
862   rci = check->closure;
863   if(!rci->xpathexpr) {
864     if(!noit_hash_retrieve(check->config,
865                            "resmon_module", strlen("resmon_module"),
866                            (void **)&resmod)) {
867       resmod = "DUMMY_MODULE";
868     }
869     if(!noit_hash_retrieve(check->config,
870                            "resmon_service", strlen("resmon_service"),
871                            (void **)&resserv)) {
872       resserv = "DUMMY_SERVICE";
873     }
874     snprintf(xpathexpr, sizeof(xpathexpr),
875              "//ResmonResult[@module=\"%s\" and @service=\"%s\"]/*",
876              resmod, resserv);
877     rci->xpathexpr = strdup(xpathexpr);
878     rci->resmod = strdup(resmod);
879     rci->resserv = strdup(resserv);
880   }
881
882   if(parent && !strcmp(parent->module, "resmon")) {
883     /* Content is cached in the parent */
884     serf_check_info_t *ci = (serf_check_info_t *)rci;
885     gettimeofday(&ci->finish_time, NULL);
886     resmon_part_log_results(self, check, parent);
887     return 0;
888   }
889   if(once) {
890     serf_initiate(self, check);
891     return 0;
892   }
893   if(!check->fire_event)
894     serf_schedule_next(self, NULL, check, NULL);
895   return 0;
896 }
897
898
899
900 static int serf_onload(noit_module_t *self) {
901   apr_initialize();
902   atexit(apr_terminate);
903
904   nlerr = noit_log_stream_find("error/serf");
905   nldeb = noit_log_stream_find("debug/serf");
906   if(!nlerr) nlerr = noit_stderr;
907   if(!nldeb) nldeb = noit_debug;
908
909   eventer_name_callback("http/serf_handler", serf_handler);
910   eventer_name_callback("http/serf_complete", serf_complete);
911   eventer_name_callback("http/serf_recur_handler", serf_recur_handler);
912   return 0;
913 }
914 noit_module_t http = {
915   NOIT_MODULE_MAGIC,
916   NOIT_MODULE_ABI_VERSION,
917   "http",
918   "libserf-based HTTP and HTTPS resource checker",
919   serf_onload,
920   serf_config,
921   serf_init,
922   serf_initiate_check,
923   serf_cleanup
924 };
925
926 noit_module_t resmon = {
927   NOIT_MODULE_MAGIC,
928   NOIT_MODULE_ABI_VERSION,
929   "resmon",
930   "libserf-based resmon resource checker",
931   serf_onload,
932   resmon_config,
933   serf_init,
934   resmon_initiate_check,
935   resmon_cleanup
936 };
937
938 noit_module_t resmon_part = {
939   NOIT_MODULE_MAGIC,
940   NOIT_MODULE_ABI_VERSION,
941   "resmon_part",
942   "resmon part resource checker",
943   serf_onload,
944   resmon_config,
945   serf_init,
946   resmon_part_initiate_check,
947   resmon_cleanup
948 };
949
Note: See TracBrowser for help on using the browser.