root/src/modules/statsd.c

Revision c385b68fdd537ec15c1467e6c8541b32d3838bf6, 14.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

Add lookup by ip/module.

Make the counters into a rate like statsd does.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2012, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
31
32 #include "noit_defines.h"
33
34 #include <stdio.h>
35 #include <unistd.h>
36 #include <errno.h>
37 #include <assert.h>
38 #include <math.h>
39 #include <ctype.h>
40 #include <arpa/inet.h>
41
42 #include "noit_module.h"
43 #include "noit_check.h"
44 #include "noit_check_tools.h"
45 #include "utils/noit_log.h"
46 #include "utils/noit_hash.h"
47
48 #define MAX_CHECKS 3
49
50 static noit_log_stream_t nlerr = NULL;
51 static noit_log_stream_t nldeb = NULL;
52 static const char *COUNTER_STRING = "c";
53
54 typedef struct _mod_config {
55   noit_hash_table *options;
56   int packets_per_cycle;
57   unsigned short port;
58   uuid_t primary;
59   int primary_active;
60   noit_check_t *check;
61   char *payload;
62   int payload_len;
63   int ipv4_fd;
64   int ipv6_fd;
65 } statsd_mod_config_t;
66
67 typedef struct {
68   noit_module_t *self;
69   stats_t current;
70   int stats_count;
71 } statsd_closure_t;
72
73 static void
74 clear_closure(noit_check_t *check, statsd_closure_t *ccl) {
75   ccl->stats_count = 0;
76   noit_check_stats_clear(check, &ccl->current);
77 }
78
79 static int
80 statsd_submit(noit_module_t *self, noit_check_t *check,
81               noit_check_t *cause) {
82   statsd_closure_t *ccl;
83   struct timeval duration;
84   statsd_mod_config_t *conf;
85
86   conf = noit_module_get_userdata(self);
87   if(!conf->primary_active) conf->check = NULL;
88   if(0 == memcmp(conf->primary, check->checkid, sizeof(uuid_t))) {
89     conf->check = check;
90     if(NOIT_CHECK_DISABLED(check) || NOIT_CHECK_KILLED(check)) {
91       conf->check = NULL;
92       return 0;
93     }
94   }
95
96   /* We are passive, so we don't do anything for transient checks */
97   if(check->flags & NP_TRANSIENT) return 0;
98
99   if(!check->closure) {
100     ccl = check->closure = calloc(1, sizeof(*ccl));
101     ccl->self = self;
102     memset(&ccl->current, 0, sizeof(ccl->current));
103   } else {
104     // Don't count the first run
105     char human_buffer[256];
106     ccl = (statsd_closure_t*)check->closure;
107     gettimeofday(&ccl->current.whence, NULL);
108     sub_timeval(ccl->current.whence, check->last_fire_time, &duration);
109     ccl->current.duration = duration.tv_sec * 1000 + duration.tv_usec / 1000;
110
111     snprintf(human_buffer, sizeof(human_buffer),
112              "dur=%d,run=%d,stats=%d", ccl->current.duration,
113              check->generation, ccl->stats_count);
114     noitL(nldeb, "statsd(%s) [%s]\n", check->target, human_buffer);
115
116     // Not sure what to do here
117     ccl->current.available = (ccl->stats_count > 0) ?
118         NP_AVAILABLE : NP_UNAVAILABLE;
119     ccl->current.state = (ccl->stats_count > 0) ?
120         NP_GOOD : NP_BAD;
121     ccl->current.status = human_buffer;
122     if(check->last_fire_time.tv_sec)
123       noit_check_passive_set_stats(check, &ccl->current);
124
125     memcpy(&check->last_fire_time, &ccl->current.whence, sizeof(duration));
126   }
127   clear_closure(check, ccl);
128   return 0;
129 }
130
131 static void
132 update_check(noit_check_t *check, const char *key, char type,
133              double diff, double sample) {
134   u_int32_t one = 1;
135   char buff[256];
136   statsd_closure_t *ccl;
137   metric_t *m;
138
139   if (check->closure == NULL) return;
140   ccl = check->closure;
141
142   /* First key counts */
143   snprintf(buff, sizeof(buff), "%s`count", key);
144   m = noit_stats_get_metric(check, &ccl->current, buff);
145   if(!m) ccl->stats_count++;
146   if(m && m->metric_type == METRIC_UINT32 && m->metric_value.I != NULL) {
147     (*m->metric_value.I)++;
148     check_stats_set_metric_hook_invoke(check, &ccl->current, m);
149   }
150   else
151     noit_stats_set_metric(check, &ccl->current, buff, METRIC_UINT32, &one);
152
153   /* Next the actual data */
154   snprintf(buff, sizeof(buff), "%s`%s", key,
155            (type == 'c') ? "rate" : (type == 'g') ? "gauge" : "timing");
156   m = noit_stats_get_metric(check, &ccl->current, buff);
157   if(type == 'c') {
158     double v = diff * (1.0 / sample) / (check->period / 1000.0);
159     if(m && m->metric_type == METRIC_DOUBLE && m->metric_value.n != NULL) {
160       (*m->metric_value.n) += v;
161       check_stats_set_metric_hook_invoke(check, &ccl->current, m);
162     }
163     else
164       noit_stats_set_metric(check, &ccl->current, buff, METRIC_DOUBLE, &v);
165   }
166   else if(type == 'g' || type == 'm') {
167     double v = diff;
168     if(m && m->metric_type == METRIC_DOUBLE && m->metric_value.n != NULL) {
169       (*m->metric_value.n) = v;
170       check_stats_set_metric_hook_invoke(check, &ccl->current, m);
171     }
172     else
173       noit_stats_set_metric(check, &ccl->current, buff, METRIC_DOUBLE, &v);
174   }
175 }
176
177 static void
178 statsd_handle_payload(noit_check_t **checks, int nchecks,
179                       char *payload, int len) {
180   char *cp, *ecp, *endptr;
181   cp = ecp = payload;
182   endptr = payload + len - 1;
183   while(ecp != NULL && ecp < endptr) {
184     int i, idx = 0, last_space = 0;
185     char key[256], *value;
186     const char *type = NULL;
187     ecp = memchr(ecp, '\n', len - (ecp - payload));
188     if(ecp) *ecp++ = '\0';
189     while(idx < sizeof(key) - 2 && *cp != '\0' && *cp != ':') {
190       if(isspace(*cp)) {
191         if(!last_space) key[idx++] = '_';
192         cp++;
193         last_space = 1;
194         continue;
195       }
196       else if(*cp == '/') key[idx++] = '-';
197       else if((*cp >= 'a' && *cp <= 'z') ||
198               (*cp >= 'A' && *cp <= 'Z') ||
199               (*cp >= '0' && *cp <= '9') ||
200               *cp == '.' || *cp == '_' || *cp == '-') {
201         key[idx++] = *cp;
202       }
203       last_space = 0;
204       cp++;
205     }
206     key[idx] = '\0';
207
208     while((NULL != cp) && NULL != (value = strchr(cp, ':'))) {
209       double sampleRate = 1.0;
210       double diff = 1.0;
211       if(value) {
212         *value++ = '\0';
213         cp = strchr(value, '|');
214         if(cp) {
215           char *sample_string;
216           *cp++ = '\0';
217           type = cp;
218           sample_string = strchr(type, '|');
219           if(sample_string) {
220             *sample_string++ = '\0';
221             if(*sample_string == '@')
222               sampleRate = strtod(sample_string + 1, NULL);
223           }
224           if(*type == 'g') {
225             diff = 0.0;
226           }
227           else if(0 == strcmp(type, "ms")) {
228             diff = 0.0;
229           }
230           else {
231             type = NULL;
232           }
233         }
234         diff = strtod(value, NULL);
235       }
236       if(type == NULL) type = COUNTER_STRING;
237
238       switch(*type) {
239         case 'g':
240         case 'c':
241         case 'm':
242           for(i=0;i<nchecks;i++)
243             update_check(checks[i], key, *type, diff, sampleRate);
244           break;
245         default:
246           break;
247       }
248       cp = value;
249     }
250
251     cp = ecp;
252   }
253 }
254
255 static int
256 statsd_handler(eventer_t e, int mask, void *closure,
257                struct timeval *now) {
258   noit_module_t *self = (noit_module_t *)closure;
259   int packets_per_cycle;
260   statsd_mod_config_t *conf;
261   noit_check_t *parent = NULL;
262
263   conf = noit_module_get_userdata(self);
264   if(conf->primary_active) parent = noit_poller_lookup(conf->primary);
265
266   packets_per_cycle = MAX(conf->packets_per_cycle, 1);
267   for( ; packets_per_cycle > 0; packets_per_cycle--) {
268     noit_check_t *checks[MAX_CHECKS];
269     int nchecks = 0;
270     char ip[INET6_ADDRSTRLEN];
271     union {
272       struct sockaddr_in in;
273       struct sockaddr_in6 in6;
274     } addr;
275     socklen_t addrlen = sizeof(addr);
276     ssize_t len;
277     uuid_t check_id;
278     len = recvfrom(e->fd, conf->payload, conf->payload_len-1, 0,
279                    (struct sockaddr *)&addr, &addrlen);
280     if(len < 0) {
281       if(errno != EAGAIN)
282         noitL(nlerr, "statsd: recvfrom() -> %s\n", strerror(errno));
283       break;
284     }
285     switch(addr.in.sin_family) {
286       case AF_INET:
287         addrlen = sizeof(struct sockaddr_in);
288         inet_ntop(AF_INET, &((struct sockaddr_in *)&addr)->sin_addr, ip, addrlen);
289         break;
290       case AF_INET6:
291         addrlen = sizeof(struct sockaddr_in6);
292         inet_ntop(AF_INET6, &((struct sockaddr_in6 *)&addr)->sin6_addr, ip, addrlen);
293         break;
294       default:
295         ip[0] = '\0';
296     }
297     conf->payload[len] = '\0';
298     nchecks = 0;
299     if(*ip)
300       nchecks = noit_poller_lookup_by_ip_module(ip, self->hdr.name,
301                                                 checks, MAX_CHECKS-1);
302     if(parent) checks[nchecks++] = parent;
303     if(nchecks)
304       statsd_handle_payload(checks, nchecks, conf->payload, len);
305   }
306   return EVENTER_READ | EVENTER_EXCEPTION;
307 }
308
309 static int noit_statsd_initiate_check(noit_module_t *self,
310                                         noit_check_t *check,
311                                         int once, noit_check_t *cause) {
312   if (check->closure == NULL) {
313     statsd_closure_t *ccl;
314     ccl = check->closure = (void *)calloc(1, sizeof(statsd_closure_t));
315     ccl->self = self;
316   }
317   INITIATE_CHECK(statsd_submit, self, check, cause);
318   return 0;
319 }
320
321 static int noit_statsd_config(noit_module_t *self, noit_hash_table *options) {
322   statsd_mod_config_t *conf;
323   conf = noit_module_get_userdata(self);
324   if(conf) {
325     if(conf->options) {
326       noit_hash_destroy(conf->options, free, free);
327       free(conf->options);
328     }
329   }
330   else
331     conf = calloc(1, sizeof(*conf));
332   conf->options = options;
333   noit_module_set_userdata(self, conf);
334   return 1;
335 }
336
337 static int noit_statsd_onload(noit_image_t *self) {
338   if(!nlerr) nlerr = noit_log_stream_find("error/statsd");
339   if(!nldeb) nldeb = noit_log_stream_find("debug/statsd");
340   if(!nlerr) nlerr = noit_error;
341   if(!nldeb) nldeb = noit_debug;
342   return 0;
343 }
344
345 static int noit_statsd_init(noit_module_t *self) {
346   unsigned short port = 8125;
347   int packets_per_cycle = 100;
348   int payload_len = 256*1024;
349   struct sockaddr_in skaddr;
350   int sockaddr_len;
351   const char *config_val;
352   statsd_mod_config_t *conf;
353   conf = noit_module_get_userdata(self);
354
355   if(noit_hash_retr_str(conf->options, "check", strlen("check"),
356                         (const char **)&config_val)) {
357     if(uuid_parse(config_val, conf->primary) != 0)
358       noitL(noit_error, "statsd check isn't a UUID\n");
359     conf->primary_active = 1;
360     conf->check = NULL;
361   }
362   if(noit_hash_retr_str(conf->options, "port", strlen("port"),
363                         (const char **)&config_val)) {
364     port = atoi(config_val);
365   }
366   conf->port = port;
367
368   if(noit_hash_retr_str(conf->options, "packets_per_cycle",
369                         strlen("packets_per_cycle"),
370                         (const char **)&config_val)) {
371     packets_per_cycle = atoi(config_val);
372   }
373   conf->packets_per_cycle = packets_per_cycle;
374
375   conf->payload_len = payload_len;
376   conf->payload = malloc(conf->payload_len);
377   if(!conf->payload) {
378     noitL(noit_error, "statsd malloc() failed\n");
379     return -1;
380   }
381
382   conf->ipv4_fd = socket(PF_INET, SOCK_DGRAM, 0);
383   if(conf->ipv4_fd < 0) {
384     noitL(noit_error, "statsd: socket failed: %s\n", strerror(errno));
385     return -1;
386   }
387   else {
388     if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
389       close(conf->ipv4_fd);
390       conf->ipv4_fd = -1;
391       noitL(noit_error,
392             "collectd: could not set socket non-blocking: %s\n",
393             strerror(errno));
394       return -1;
395     }
396   }
397   skaddr.sin_family = AF_INET;
398   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
399   skaddr.sin_port = htons(conf->port);
400   sockaddr_len = sizeof(skaddr);
401   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sockaddr_len) < 0) {
402     noitL(noit_error, "bind failed[%d]: %s\n", conf->port, strerror(errno));
403     close(conf->ipv4_fd);
404     return -1;
405   }
406
407   if(conf->ipv4_fd >= 0) {
408     eventer_t newe;
409     newe = eventer_alloc();
410     newe->fd = conf->ipv4_fd;
411     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
412     newe->callback = statsd_handler;
413     newe->closure = self;
414     eventer_add(newe);
415   }
416
417   conf->ipv6_fd = socket(AF_INET6, SOCK_DGRAM, 0);
418   if(conf->ipv6_fd < 0) {
419     noitL(noit_error, "statsd: IPv6 socket failed: %s\n",
420           strerror(errno));
421   }
422   else {
423     if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
424       close(conf->ipv6_fd);
425       conf->ipv6_fd = -1;
426       noitL(noit_error,
427             "statsd: could not set socket non-blocking: %s\n",
428             strerror(errno));
429     }
430     else {
431       struct sockaddr_in6 skaddr6;
432       struct in6_addr in6addr_any;
433       sockaddr_len = sizeof(skaddr6);
434       memset(&skaddr6, 0, sizeof(skaddr6));
435       skaddr6.sin6_family = AF_INET6;
436       memset(&in6addr_any, 0, sizeof(in6addr_any));
437       skaddr6.sin6_addr = in6addr_any;
438       skaddr6.sin6_port = htons(conf->port);
439
440       if(bind(conf->ipv6_fd, (struct sockaddr *)&skaddr6, sockaddr_len) < 0) {
441         noitL(noit_error, "bind(IPv6) failed[%d]: %s\n",
442               conf->port, strerror(errno));
443         close(conf->ipv6_fd);
444         conf->ipv6_fd = -1;
445       }
446     }
447   }
448
449   if(conf->ipv6_fd >= 0) {
450     eventer_t newe;
451     newe = eventer_alloc();
452     newe->fd = conf->ipv6_fd;
453     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
454     newe->callback = statsd_handler;
455     newe->closure = self;
456     eventer_add(newe);
457   }
458
459   noit_module_set_userdata(self, conf);
460   return 0;
461 }
462
463 #include "statsd.xmlh"
464 noit_module_t statsd = {
465   {
466     NOIT_MODULE_MAGIC,
467     NOIT_MODULE_ABI_VERSION,
468     "statsd",
469     "statsd collection",
470     statsd_xml_description,
471     noit_statsd_onload
472   },
473   noit_statsd_config,
474   noit_statsd_init,
475   noit_statsd_initiate_check,
476   NULL
477 };
Note: See TracBrowser for help on using the browser.