root/src/modules/ganglia.c

Revision 94e88e17012a8f3d984b2ead91e9d3bd606c1e92, 13.9 kB (checked in by gallison <gallison@circonus.com>, 5 years ago)

Updated with postwait's changes

  • Property mode set to 100644
Line 
1 #include "noit_defines.h"
2 #include "utils/noit_str.h"
3
4 #include <stdlib.h>
5 #include <errno.h>
6 #include <inttypes.h>
7 #include <netinet/in.h>
8 #include <arpa/inet.h>
9 #include <sys/time.h>
10 #include <unistd.h>
11 #include <string.h>
12
13 #include "noit_module.h"
14 #include "noit_check.h"
15 #include "noit_check_tools.h"
16 #include "noit_rest.h"
17 #include "utils/noit_log.h"
18 #include "utils/noit_hash.h"
19 #include "utils/noit_b64.h"
20
21 #define GANGLIA_DEFAULT_MCAST_ADDR "239.2.11.71"
22 #define GANGLIA_DEFAULT_MCAST_PORT 8649
23
24 typedef struct _mod_config {
25   noit_hash_table *options;
26   noit_boolean asynch_metrics;
27   int ipv4_fd;
28   int ipv6_fd;
29 } ganglia_mod_config_t;
30
31 typedef struct ganglia_closure_s {
32   stats_t current;
33   int stats_count;
34   int ntfy_count;
35 } ganglia_closure_t;
36
37 /* based on formats from \\ganglia/monitor-core/lib/gm_protocol.x */
38 /* taken from version 3.2, 2009-12-13 15:38:58 -0500 */
39 enum Ganglia_msg_formats {
40    gmetadata_full = 128, /* this one refers to metadataref */
41    gmetric_ushort,
42    gmetric_short,
43    gmetric_int,
44    gmetric_uint,
45    gmetric_string,
46    gmetric_float,
47    gmetric_double,
48    gmetadata_request
49 };
50
51 struct ganglia_dgram {
52   noit_module_t *self;
53   void *payload;
54   int len;
55   int type;
56   char *name;
57 };
58
59 union ifd {
60   int i;
61   float f;
62   double d;
63 };
64
65 static noit_boolean
66 noit_collects_check_aynsch(noit_module_t *self,
67                            noit_check_t *check) {
68   const char *config_val;
69   ganglia_mod_config_t *conf = noit_module_get_userdata(self);
70   noit_boolean is_asynch = conf->asynch_metrics;
71   if(noit_hash_retr_str(check->config,
72                         "asynch_metrics", strlen("asynch_metrics"),
73                         (const char **)&config_val)) {
74     if(!strcasecmp(config_val, "false") || !strcasecmp(config_val, "off"))
75       is_asynch = noit_false;
76   }
77
78   if(is_asynch) check->flags |= NP_SUPPRESS_METRICS;
79   else check->flags &= ~NP_SUPPRESS_METRICS;
80   return is_asynch;
81 }
82
83 static void clear_closure(noit_check_t *check, ganglia_closure_t *gcl) {
84   gcl->stats_count = 0;
85   gcl->ntfy_count = 0;
86   noit_check_stats_clear(check, &gcl->current);
87 }
88
89 static int
90 ganglia_process_dgram(noit_check_t *check, void *closure) {
91   struct ganglia_dgram *pkt = closure;
92   noit_boolean immediate;
93   ganglia_closure_t *gcl;
94
95   if (!check || strcmp(check->module, "ganglia")) return 0;
96
97   immediate = noit_collects_check_aynsch(pkt->self, check);
98   if(check->closure)
99     gcl = check->closure;
100   else
101    gcl = check->closure = calloc(1, sizeof(ganglia_closure_t));
102
103   switch(pkt->type) {
104     case gmetadata_full:
105       /* nothing of value to get from the metadata */
106       break;
107     case gmetadata_request:
108       break;
109     case gmetric_short:
110     case gmetric_int:{
111       int *val = pkt->payload;
112       *val = ntohl(*val);
113       noit_stats_set_metric(check, &gcl->current, pkt->name, METRIC_INT32, val);
114       if(immediate) noit_stats_log_immediate_metric(check, pkt->name, METRIC_INT32, val);
115       break;
116                      }
117     case gmetric_ushort:
118     case gmetric_uint:{
119       uint32_t *val = pkt->payload;
120       *val = ntohl(*val);
121       noit_stats_set_metric(check, &gcl->current, pkt->name, METRIC_UINT32, val);
122       if(immediate) noit_stats_log_immediate_metric(check, pkt->name, METRIC_UINT32, val);
123       break;
124                       }
125     case gmetric_string:{
126       uint32_t *len = pkt->payload;
127       *len = ntohl(*len);
128       pkt->payload += 4;
129       char *str = pkt->payload;
130       noit_stats_set_metric(check, &gcl->current, pkt->name, METRIC_STRING, str);
131       if(immediate) noit_stats_log_immediate_metric(check, pkt->name, METRIC_STRING, str);
132       break;
133                         }
134     case gmetric_float:{
135       double val = 0;
136       union ifd *ptr = pkt->payload;
137       ptr->i = ntohl(ptr->i);
138       val = (double) ptr->f;
139       noit_stats_set_metric(check, &gcl->current, pkt->name, METRIC_DOUBLE, &val);
140       if(immediate) noit_stats_log_immediate_metric(check, pkt->name, METRIC_DOUBLE, &val);
141       break;   
142                        }
143     case gmetric_double:{
144       double val = 0;
145       union ifd *ptr = pkt->payload;
146       ptr->i = ntohl(ptr->i);
147       (ptr+4)->i = ntohl((ptr+4)->i);
148       val = ptr->d;
149       noit_stats_set_metric(check, &gcl->current, pkt->name, METRIC_DOUBLE, &val);
150       if(immediate) noit_stats_log_immediate_metric(check, pkt->name, METRIC_DOUBLE, &val);
151       break;
152                         }
153     default:
154       noitL(noit_error, "ganglia: unknown packet type received\n");
155       return 0;
156   }
157
158   gcl->stats_count++;
159
160   return 1;
161 }
162
163 static int noit_ganglia_handler(eventer_t e, int mask, void *closure,
164                              struct timeval *now) {
165   char packet[1500]; /* 1500 is correct; see __GANGLIA_MTU */
166   int packet_len = sizeof(packet);
167   noit_module_t *self = (noit_module_t *)closure;
168
169   while(1) {
170     struct ganglia_dgram pkt;
171     int inlen;
172     void *payload = &packet;
173     char *host, *name;
174     int *len;
175     uint32_t *type;
176
177     inlen = recvfrom(e->fd, packet, packet_len, 0, NULL, 0);
178
179     if(inlen < 0) {
180       if(errno == EAGAIN) break; /* out of data to read, hand it back to eventer
181                                     and wait to be scheduled again */
182       noitLT(noit_error, now, "ganglia: recvfrom: %s\n", strerror(errno));
183       break;
184     }
185
186     type = payload;
187     *type = ntohl(*type);
188     payload += 4;
189
190     len = payload;
191     *len = ntohl(*len);
192     if(!*len) {
193       noitL(noit_error, "ganglia: empty host\n");
194       return -1;
195     }
196     payload += 4;
197     host = payload;
198     payload += (*len + 3) & ~0x03;
199
200     len = payload;
201     *len = ntohl(*len);
202     if(!*len) {
203       noitL(noit_error, "ganglia: empty name\n");
204       return -1;
205     }
206     payload += 4;
207     name = payload;
208     payload += (*len + 3) & ~0x03;
209     *len = 0; /* add null char to end of host string */
210
211     /* skip the spoof boolean */
212     payload += 4;
213
214     /* skip the format string */
215     len = payload;
216     *len = ntohl(*len);
217     payload += 4;
218     payload += (*len + 3) & ~0x03;
219     *len = 0; /* add null char to end of name string */
220
221     pkt.self = self;
222     pkt.payload = payload;
223     pkt.len = inlen;
224     pkt.type = *type;
225     pkt.name = name;
226
227     if(!noit_poller_target_do(host, ganglia_process_dgram, &pkt))
228       noitL(noit_error, "ganglia: no checks from host: %s\n", host);
229   }
230   return EVENTER_READ | EVENTER_EXCEPTION;
231 }
232
233 static int
234 ganglia_submit(noit_module_t *self, noit_check_t *check,
235                          noit_check_t *cause) {
236   /* almost entirely pulled from collectd.c:collectd_submit_internal() */
237   ganglia_closure_t *gcl;
238   struct timeval now, duration, age;
239   noit_boolean immediate;
240   /* We are passive, so we don't do anything for transient checks */
241   if(check->flags & NP_TRANSIENT) return 0;
242
243   gettimeofday(&now, NULL);
244
245   /* If we're immediately logging things and we've done so within the
246    * check's period... we've no reason to passively log now.
247    */
248   immediate = noit_collects_check_aynsch(self, check);
249   sub_timeval(now, check->stats.current.whence, &age);
250   if(immediate && (age.tv_sec * 1000 + age.tv_usec / 1000) < check->period)
251     return 0;
252
253   if(!check->closure) {
254     gcl = check->closure = (void *)calloc(1, sizeof(ganglia_closure_t));
255     memset(gcl, 0, sizeof(ganglia_closure_t));
256     memcpy(&gcl->current.whence, &now, sizeof(now));
257   } else {
258     /*  Don't count the first run */
259     char human_buffer[256];
260     gcl = (ganglia_closure_t*)check->closure;
261     memcpy(&gcl->current.whence, &now, sizeof(now));
262     sub_timeval(gcl->current.whence, check->last_fire_time, &duration);
263     gcl->current.duration = duration.tv_sec; /*  + duration.tv_usec / (1000 * 1000); */
264
265     snprintf(human_buffer, sizeof(human_buffer),
266              "dur=%d,run=%d,stats=%d,ntfy=%d", gcl->current.duration,
267              check->generation, gcl->stats_count, gcl->ntfy_count);
268     noitL(noit_debug, "ganglia(%s) [%s]\n", check->target, human_buffer);
269
270     gcl->current.available = (gcl->ntfy_count > 0 || gcl->stats_count > 0) ?
271         NP_AVAILABLE : NP_UNAVAILABLE;
272     gcl->current.state = (gcl->ntfy_count > 0 || gcl->stats_count > 0) ?
273         NP_GOOD : NP_BAD;
274     gcl->current.status = human_buffer;
275     noit_check_passive_set_stats(check, &gcl->current);
276
277     memcpy(&check->last_fire_time, &gcl->current.whence, sizeof(duration));
278   }
279   clear_closure(check, gcl);
280   return 0;
281 }
282
283 static int noit_ganglia_initiate_check(noit_module_t *self,
284                                         noit_check_t *check,
285                                         int once, noit_check_t *cause) {
286   check->flags |= NP_PASSIVE_COLLECTION;
287   INITIATE_CHECK(ganglia_submit, self, check, cause);
288   return 0;
289 }
290
291 static int noit_ganglia_config(noit_module_t *self, noit_hash_table *options) {
292   ganglia_mod_config_t *conf = noit_module_get_userdata(self);
293   if(conf) {
294     if(conf->options) {
295       noit_hash_destroy(conf->options, free, free);
296       free(conf->options);
297     }
298   }
299   else
300     conf = calloc(1, sizeof(*conf));
301   conf->options = options;
302   noit_module_set_userdata(self, conf);
303   return 1;
304 }
305
306 static int noit_ganglia_onload(noit_image_t *self) {
307   eventer_name_callback("noit_ganglia/handler", noit_ganglia_handler);
308   return 0;
309 }
310
311 static int noit_ganglia_init(noit_module_t *self) {
312   const char *config_val;
313   ganglia_mod_config_t *conf;
314   conf = noit_module_get_userdata(self);
315   struct ip_mreq mreq;
316   struct ipv6_mreq mreqv6;
317   struct sockaddr_in skaddr;
318   struct sockaddr_in6 skaddr6;
319   const char *multiaddr, *multiaddr6;
320   int portint=0;
321   unsigned short port;
322
323   conf->asynch_metrics = noit_true;
324   if(noit_hash_retr_str(conf->options,
325                         "asynch_metrics", strlen("asynch_metrics"),
326                         (const char **)&config_val)) {
327     if(!strcasecmp(config_val, "false") || !strcasecmp(config_val, "off"))
328       conf->asynch_metrics = noit_false;
329   }
330
331   /* Default Collectd port */
332   portint = GANGLIA_DEFAULT_MCAST_PORT;
333   if(noit_hash_retr_str(conf->options,
334                          "port", strlen("port"),
335                          (const char**)&config_val))
336     portint = atoi(config_val);
337   port = (unsigned short) portint;
338
339   if(!noit_hash_retr_str(conf->options,
340                          "multiaddr", strlen("multiaddr"),
341                          (const char**)&multiaddr))
342     multiaddr = GANGLIA_DEFAULT_MCAST_ADDR;
343
344
345   conf->ipv4_fd = conf->ipv6_fd = -1;
346
347   /* ipv4 socket and binding */
348   conf->ipv4_fd = socket(PF_INET, NE_SOCK_CLOEXEC|SOCK_DGRAM, IPPROTO_UDP);
349   if(conf->ipv4_fd < 0) {
350     close(conf->ipv4_fd);
351     noitL(noit_error, "ganglia: ipv4 socket failed: %s\n", strerror(errno));
352     return -1;
353   }
354
355   if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
356     close(conf->ipv4_fd);
357     noitL(noit_error, "ganglia: could not set ipv4 socket non-blocking: %s\n", strerror(errno));
358     return -1;
359   }
360
361   /* ipv4 binding */
362   memset(&skaddr, 0, sizeof(skaddr));
363   skaddr.sin_family = AF_INET;
364   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
365   skaddr.sin_port = htons(port);
366
367   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sizeof(skaddr))) {
368     close(conf->ipv4_fd);
369     noitL(noit_error, "ganglia: ipv4 binding failed: %s\n", strerror(errno));
370     return -1;
371   }
372
373   /* join ipv4 multicast */
374   memset(&mreq, 0, sizeof(mreq));
375
376   inet_pton(AF_INET, multiaddr, &mreq.imr_multiaddr.s_addr);
377
378   mreq.imr_interface.s_addr = htonl(INADDR_ANY);
379
380   if(setsockopt(conf->ipv4_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof(mreq))) {
381     close(conf->ipv4_fd);
382     noitL(noit_error, "ganglia: ipv4 multicast join failed: %s\n", strerror(errno));
383     return -1;
384   }
385
386   eventer_t newe;
387   newe = eventer_alloc();
388   newe->fd = conf->ipv4_fd;
389   newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
390   newe->callback = noit_ganglia_handler;
391   newe->closure = self;
392   eventer_add(newe);
393   noitL(noit_debug, "ganglia: Added ipv4 handler!\n");
394
395   portint = GANGLIA_DEFAULT_MCAST_PORT;
396   if(noit_hash_retr_str(conf->options,
397                          "port6", strlen("port6"),
398                          (const char**)&config_val))
399     portint = atoi(config_val);
400   port = (unsigned short) portint;
401
402   if(!noit_hash_retr_str(conf->options,
403                          "multiaddr6", strlen("multiaddr6"),
404                          (const char**)&multiaddr6))
405     /* ganglia doesn't have a default ipv6 multicast */
406     multiaddr6 = NULL;
407
408   /* ipv6 socket, nonblocking */
409   if(multiaddr6) {
410     conf->ipv6_fd = socket(AF_INET6, NE_SOCK_CLOEXEC|SOCK_DGRAM, IPPROTO_UDP);
411     if(conf->ipv6_fd < 0) {
412       noitL(noit_error, "ganglia: IPv6 socket creation failed: %s\n", strerror(errno));
413     }
414     else if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
415       noitL(noit_error, "ganglia: could not set socket non-blocking: %s\n", strerror(errno));
416       close(conf->ipv6_fd);
417       conf->ipv6_fd =  -1;
418     }
419   }
420
421   /* ipv6 binding */
422   if(conf->ipv6_fd > 0) {
423     memset(&skaddr6, 0, sizeof(skaddr6));
424     skaddr6.sin6_family = AF_INET6;
425     skaddr6.sin6_addr = in6addr_any;
426     skaddr6.sin6_port = htons(port);
427     if(bind(conf->ipv6_fd, (struct sockaddr *)&skaddr6, sizeof(skaddr6))) {
428       noitL(noit_error, "ganglia: ipv6 binding failed: %s\n", strerror(errno));
429       close(conf->ipv6_fd);
430       conf->ipv6_fd = -1;
431     }
432   }
433
434   /* join ipv6 multicast */
435   if(conf->ipv6_fd > 0) {
436     memset(&mreqv6, 0, sizeof(mreqv6));
437
438     inet_pton(AF_INET6, multiaddr6, &mreqv6.ipv6mr_multiaddr.s6_addr);
439
440     mreqv6.ipv6mr_interface = 0;
441
442     if(setsockopt(conf->ipv6_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreqv6, sizeof(mreqv6))) {
443       noitL(noit_error, "ganglia: ipv6 multicast join failed: %s\n", strerror(errno));
444       close(conf->ipv6_fd);
445       conf->ipv6_fd = -1;
446     }
447   }
448
449   if(conf->ipv6_fd > 0) {
450     eventer_t newe;
451     newe = eventer_alloc();
452     newe->fd = conf->ipv6_fd;
453     newe->mask = EVENTER_READ;
454     newe->callback = noit_ganglia_handler;
455     newe->closure = self;
456     eventer_add(newe);
457     noitL(noit_debug, "ganglia: Added ipv6 handler!\n");
458   }
459
460   noit_module_set_userdata(self, conf);
461
462   return 0;
463 }
464
465 #include "ganglia.xmlh"
466 noit_module_t ganglia = {
467   {
468     NOIT_MODULE_MAGIC,
469     NOIT_MODULE_ABI_VERSION,
470     "ganglia",
471     "ganglia collection",
472     ganglia_xml_description,
473     noit_ganglia_onload
474   },
475   noit_ganglia_config,
476   noit_ganglia_init,
477   noit_ganglia_initiate_check,
478   NULL
479 };
Note: See TracBrowser for help on using the browser.