root/src/modules/collectd.c

Revision 4496cc4e7850184a250647f0a956c335c15f5f18, 46.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

Allow checks to note that they are passive.

If a check doesn't "do" anything to collect metrics and instead
passively allows metrics to arrive, then when you clone the check
to a transient copy, the metrics will always be blank. By
allowing modules to note that the check acts this way, we can
be smarter elsewhere. We should be able to use this to allow
transient streaming of things like statsd, collectd and traps.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009  Florian Forster
3  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
4  * Copyright (c) 2009, Dan Di Spaltro
5  * All rights reserved.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library, in file named COPYING.LGPL; if not,
19  * write to the Free Software Foundation, Inc., 59 Temple Place,
20  * Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 #include "noit_defines.h"
25
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <math.h>
31 #include <ctype.h>
32 #ifdef HAVE_SYS_FILIO_H
33 #include <sys/filio.h>
34 #endif
35 #include <openssl/evp.h>
36 #include <openssl/hmac.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #include <arpa/inet.h>
40
41 #include "noit_module.h"
42 #include "noit_check.h"
43 #include "noit_check_tools.h"
44 #include "utils/noit_log.h"
45 #include "utils/noit_hash.h"
46
47
48 static noit_log_stream_t nlerr = NULL;
49 static noit_log_stream_t nldeb = NULL;
50
51 typedef struct _mod_config {
52   noit_hash_table *options;
53   noit_hash_table target_sessions;
54   noit_boolean support_notifications;
55   noit_boolean asynch_metrics;
56   int ipv4_fd;
57   int ipv6_fd;
58 } collectd_mod_config_t;
59
60 typedef struct collectd_closure_s {
61   char *username;
62   char *secret;
63   int security_level;
64   EVP_CIPHER_CTX ctx;
65   stats_t current;
66   int stats_count;
67   int ntfy_count;
68 } collectd_closure_t;
69
70 #define cmp_plugin(vl, val) \
71   (strncmp(vl->plugin, val, sizeof(val)) == 0)
72
73 #define cmp_type(vl, plugin, type) \
74   (cmp_plugin(vl, plugin) && \
75    cmp_plugin(vl, type))
76
77
78 /**
79  *
80  * Collectd Structs
81  *
82  *
83  **/
84
85 union meta_value_u
86 {
87   char    *mv_string;
88   int64_t  mv_signed_int;
89   uint64_t mv_unsigned_int;
90   double   mv_double;
91   _Bool    mv_boolean;
92 };
93 typedef union meta_value_u meta_value_t;
94
95 struct meta_entry_s;
96 typedef struct meta_entry_s meta_entry_t;
97 struct meta_entry_s
98 {
99   char         *key;
100   meta_value_t  value;
101   int           type;
102   meta_entry_t *next;
103 };
104
105 struct meta_data_s
106 {
107   meta_entry_t   *head;
108   pthread_mutex_t lock;
109 };
110
111 typedef struct meta_data_s meta_data_t;
112
113
114 #define NET_DEFAULT_V4_ADDR "239.192.74.66"
115 #define NET_DEFAULT_V6_ADDR "ff18::efc0:4a42"
116 #define NET_DEFAULT_PORT    25826
117
118 #define TYPE_HOST            0x0000
119 #define TYPE_TIME            0x0001
120 #define TYPE_PLUGIN          0x0002
121 #define TYPE_PLUGIN_INSTANCE 0x0003
122 #define TYPE_TYPE            0x0004
123 #define TYPE_TYPE_INSTANCE   0x0005
124 #define TYPE_VALUES          0x0006
125 #define TYPE_INTERVAL        0x0007
126
127 /* Types to transmit notifications */
128 #define TYPE_MESSAGE         0x0100
129 #define TYPE_SEVERITY        0x0101
130
131 #define TYPE_SIGN_SHA256     0x0200
132 #define TYPE_ENCR_AES256     0x0210
133
134 #define DATA_MAX_NAME_LEN 64
135
136 #define NOTIF_MAX_MSG_LEN 256
137 #define NOTIF_FAILURE 1
138 #define NOTIF_WARNING 2
139 #define NOTIF_OKAY    4
140
141 #define sfree(ptr) \
142   do { \
143     if((ptr) != NULL) { \
144       free(ptr); \
145     } \
146     (ptr) = NULL; \
147   } while (0)
148
149 #define sstrncpy(a, b, c) strncpy(a, b, c)
150
151 #define SECURITY_LEVEL_NONE     0
152 #define SECURITY_LEVEL_SIGN    1
153 #define SECURITY_LEVEL_ENCRYPT 2
154
155 #define DS_TYPE_COUNTER  0
156 #define DS_TYPE_GAUGE    1
157 #define DS_TYPE_DERIVE   2
158 #define DS_TYPE_ABSOLUTE 3
159
160 #define DS_TYPE_TO_STRING(t) (t == DS_TYPE_COUNTER)     ? "counter"  : \
161         (t == DS_TYPE_GAUGE)    ? "gauge"    : \
162         (t == DS_TYPE_DERIVE)   ? "derive"   : \
163         (t == DS_TYPE_ABSOLUTE) ? "absolute" : \
164         "unknown"
165
166 #define BUFF_SIZE 1024
167
168 typedef unsigned long long counter_t;
169 typedef double gauge_t;
170 typedef int64_t derive_t;
171 typedef uint64_t absolute_t;
172 int  interval_g;
173
174 union value_u
175 {
176   counter_t  counter;
177   gauge_t    gauge;
178   derive_t   derive;
179   absolute_t absolute;
180 };
181 typedef union value_u value_t;
182
183 struct value_list_s
184 {
185   value_t *values;
186   int      values_len;
187   time_t   time;
188   int      interval;
189   char     host[DATA_MAX_NAME_LEN];
190   char     plugin[DATA_MAX_NAME_LEN];
191   char     plugin_instance[DATA_MAX_NAME_LEN];
192   char     type[DATA_MAX_NAME_LEN];
193   char     type_instance[DATA_MAX_NAME_LEN];
194   meta_data_t *meta;
195   uint8_t  *types;
196 };
197 typedef struct value_list_s value_list_t;
198
199
200 enum notification_meta_type_e
201 {
202   NM_TYPE_STRING,
203   NM_TYPE_SIGNED_INT,
204   NM_TYPE_UNSIGNED_INT,
205   NM_TYPE_DOUBLE,
206   NM_TYPE_BOOLEAN
207 };
208
209 typedef struct notification_meta_s
210 {
211   char name[DATA_MAX_NAME_LEN];
212   enum notification_meta_type_e type;
213   union
214   {
215     const char *nm_string;
216     int64_t nm_signed_int;
217     uint64_t nm_unsigned_int;
218     double nm_double;
219     bool nm_boolean;
220   } nm_value;
221   struct notification_meta_s *next;
222 } notification_meta_t;
223
224 typedef struct notification_s
225 {
226   int    severity;
227   time_t time;
228   char   message[NOTIF_MAX_MSG_LEN];
229   char   host[DATA_MAX_NAME_LEN];
230   char   plugin[DATA_MAX_NAME_LEN];
231   char   plugin_instance[DATA_MAX_NAME_LEN];
232   char   type[DATA_MAX_NAME_LEN];
233   char   type_instance[DATA_MAX_NAME_LEN];
234   notification_meta_t *meta;
235 } notification_t;
236
237
238
239 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
240  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
241  * +-------+-----------------------+-------------------------------+
242  * ! Ver.  !                       ! Length                        !
243  * +-------+-----------------------+-------------------------------+
244  */
245 struct part_header_s
246 {
247   uint16_t type;
248   uint16_t length;
249 };
250 typedef struct part_header_s part_header_t;
251
252 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
253  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
254  * +-------------------------------+-------------------------------+
255  * ! Type                          ! Length                        !
256  * +-------------------------------+-------------------------------+
257  * : (Length - 4) Bytes                                            :
258  * +---------------------------------------------------------------+
259  */
260 struct part_string_s
261 {
262   part_header_t *head;
263   char *value;
264 };
265 typedef struct part_string_s part_string_t;
266
267 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
268  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
269  * +-------------------------------+-------------------------------+
270  * ! Type                          ! Length                        !
271  * +-------------------------------+-------------------------------+
272  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
273  * +---------------------------------------------------------------+
274  */
275 struct part_number_s
276 {
277   part_header_t *head;
278   uint64_t *value;
279 };
280 typedef struct part_number_s part_number_t;
281
282 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
283  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
284  * +-------------------------------+-------------------------------+
285  * ! Type                          ! Length                        !
286  * +-------------------------------+---------------+---------------+
287  * ! Num of values                 ! Type0         ! Type1         !
288  * +-------------------------------+---------------+---------------+
289  * ! Value0                                                        !
290  * !                                                               !
291  * +---------------------------------------------------------------+
292  * ! Value1                                                        !
293  * !                                                               !
294  * +---------------------------------------------------------------+
295  */
296 struct part_values_s
297 {
298   part_header_t *head;
299   uint16_t *num_values;
300   uint8_t  *values_types;
301   value_t  *values;
302 };
303 typedef struct part_values_s part_values_t;
304
305 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
306  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
307  * +-------------------------------+-------------------------------+
308  * ! Type                          ! Length                        !
309  * +-------------------------------+-------------------------------+
310  * ! Hash (Bits   0 -  31)                                         !
311  * : :                                                             :
312  * ! Hash (Bits 224 - 255)                                         !
313  * +---------------------------------------------------------------+
314  */
315 /* Minimum size */
316 #define PART_SIGNATURE_SHA256_SIZE 36
317 struct part_signature_sha256_s
318 {
319   part_header_t head;
320   unsigned char hash[32];
321   char *username;
322 };
323 typedef struct part_signature_sha256_s part_signature_sha256_t;
324
325 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
326  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
327  * +-------------------------------+-------------------------------+
328  * ! Type                          ! Length                        !
329  * +-------------------------------+-------------------------------+
330  * ! Original length               ! Padding (0 - 15 bytes)        !
331  * +-------------------------------+-------------------------------+
332  * ! Hash (Bits   0 -  31)                                         !
333  * : :                                                             :
334  * ! Hash (Bits 128 - 159)                                         !
335  * +---------------------------------------------------------------+
336  */
337 /* Minimum size */
338 #define PART_ENCRYPTION_AES256_SIZE 42
339 struct part_encryption_aes256_s
340 {
341   part_header_t head;
342   uint16_t username_length;
343   char *username;
344   unsigned char iv[16];
345   /* <encrypted> */
346   unsigned char hash[20];
347   /*   <payload /> */
348   /* </encrypted> */
349 };
350 typedef struct part_encryption_aes256_s part_encryption_aes256_t;
351
352 struct receive_list_entry_s
353 {
354   char data[BUFF_SIZE];
355   int  data_len;
356   int  fd;
357   struct receive_list_entry_s *next;
358 };
359 typedef struct receive_list_entry_s receive_list_entry_t;
360
361
362 /**
363  *
364  *  END Collectd Structs
365  *
366  */
367
368 static u_int64_t collectd_ntohll (u_int64_t n)
369 {
370 #if BYTE_ORDER == BIG_ENDIAN
371   return (n);
372 #else
373   u_int64_t retval;
374   retval = ((uint64_t) ntohl(n & 0xFFFFFFFFLLU)) << 32;
375   retval |= ntohl((n & 0xFFFFFFFF00000000LLU) >> 32);
376   return retval;
377 #endif
378 } /* u_int64_t collectd_ntohll */
379
380 #define ntohd(d) (d)
381
382 static noit_boolean
383 noit_collects_check_aynsch(noit_module_t *self,
384                            noit_check_t *check) {
385   const char *config_val;
386   collectd_mod_config_t *conf = noit_module_get_userdata(self);
387   noit_boolean is_asynch = conf->asynch_metrics;
388   if(noit_hash_retr_str(check->config,
389                         "asynch_metrics", strlen("asynch_metrics"),
390                         (const char **)&config_val)) {
391     if(!strcasecmp(config_val, "true") || !strcasecmp(config_val, "on"))
392       is_asynch = noit_true;
393   }
394
395   if(is_asynch) check->flags |= NP_SUPPRESS_METRICS;
396   else check->flags &= ~NP_SUPPRESS_METRICS;
397   return is_asynch;
398 }
399
400 /* Forward declare this so the parse function can use it */
401 static int queue_values(collectd_closure_t *ccl,
402   noit_module_t *self, noit_check_t *check, value_list_t *vl);
403 static int queue_notifications(collectd_closure_t *ccl,
404   noit_module_t *self, noit_check_t *check, notification_t *n);
405
406
407 static EVP_CIPHER_CTX* network_get_aes256_cypher (collectd_closure_t *ccl, /* {{{ */
408     const void *iv, size_t iv_size, const char *username)
409 {
410
411   if (ccl->secret == NULL)
412     return (NULL);
413   else
414   {
415     EVP_CIPHER_CTX *ctx_ptr;
416     EVP_MD_CTX ctx_md;
417     unsigned char password_hash[32];
418     unsigned int length = 0;
419     int success;
420
421     ctx_ptr = &ccl->ctx;
422
423     EVP_DigestInit(&ctx_md, EVP_sha256());
424     EVP_DigestUpdate(&ctx_md, ccl->secret, strlen(ccl->secret));
425     EVP_DigestFinal(&ctx_md, password_hash, &length);
426     EVP_MD_CTX_cleanup(&ctx_md);
427
428     assert(length <= 32);
429
430     success = EVP_DecryptInit(ctx_ptr, EVP_aes_256_ofb(), password_hash, iv);
431     if (success != 1)
432     {
433       noitL(noit_error, "collectd: EVP_DecryptInit returned: %d\n",
434           success);
435       return (NULL);
436     }
437     return (ctx_ptr);
438   }
439 } /* }}} int network_get_aes256_cypher */
440
441 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
442     value_t **ret_values, uint8_t **ret_types, int *ret_num_values)
443 {
444   char *buffer = *ret_buffer;
445   size_t buffer_len = *ret_buffer_len;
446
447   uint16_t tmp16;
448   size_t exp_size;
449   int   i;
450
451   uint16_t pkg_length;
452   uint16_t pkg_type;
453   uint16_t pkg_numval;
454
455   uint8_t *pkg_types;
456   value_t *pkg_values;
457
458   if (buffer_len < 15)
459   {
460     noitL(noit_error,"collectd: packet is too short: "
461         "buffer_len = %zu\n", buffer_len);
462     return (-1);
463   }
464
465   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
466   buffer += sizeof (tmp16);
467   pkg_type = ntohs (tmp16);
468
469   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
470   buffer += sizeof (tmp16);
471   pkg_length = ntohs (tmp16);
472
473   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
474   buffer += sizeof (tmp16);
475   pkg_numval = ntohs (tmp16);
476
477   assert (pkg_type == TYPE_VALUES);
478
479   exp_size = 3 * sizeof (uint16_t)
480     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
481   if (buffer_len < exp_size)
482   {
483     noitL(noit_error, "collectd: parse_part_values: "
484         "Packet too short: "
485         "Chunk of size %zu expected, "
486         "but buffer has only %zu bytes left.\n",
487         exp_size, buffer_len);
488     return (-1);
489   }
490
491   if (pkg_length != exp_size)
492   {
493     noitL(noit_debug, "collectd: parse_part_values: "
494         "Length and number of values "
495         "in the packet don't match.\n");
496     return (-1);
497   }
498
499   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
500   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
501   if ((pkg_types == NULL) || (pkg_values == NULL))
502   {
503     sfree (pkg_types);
504     sfree (pkg_values);
505     noitL(noit_error, "collectd: parse_part_values: malloc failed.\n");
506     return (-1);
507   }
508
509   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
510   buffer += pkg_numval * sizeof (uint8_t);
511   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
512   buffer += pkg_numval * sizeof (value_t);
513
514   for (i = 0; i < pkg_numval; i++)
515   {
516     switch (pkg_types[i])
517     {
518       case DS_TYPE_COUNTER:
519         pkg_values[i].counter = (counter_t) collectd_ntohll (pkg_values[i].counter);
520         break;
521
522       case DS_TYPE_GAUGE:
523         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
524         break;
525
526       case DS_TYPE_DERIVE:
527         pkg_values[i].derive = (derive_t) collectd_ntohll (pkg_values[i].derive);
528         break;
529
530       case DS_TYPE_ABSOLUTE:
531         pkg_values[i].absolute = (absolute_t) collectd_ntohll (pkg_values[i].absolute);
532         break;
533
534       default:
535         noitL(noit_debug, "collectd: parse_part_values: "
536       "Don't know how to handle data source type %"PRIu8 "\n",
537       pkg_types[i]);
538         sfree (pkg_types);
539         sfree (pkg_values);
540         return (-1);
541     } /* switch (pkg_types[i]) */
542   }
543
544   *ret_buffer     = buffer;
545   *ret_buffer_len = buffer_len - pkg_length;
546   *ret_num_values = pkg_numval;
547   *ret_types      = pkg_types;
548   *ret_values     = pkg_values;
549
550
551   return (0);
552 } /* int parse_part_values */
553
554 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
555     uint64_t *value)
556 {
557   char *buffer = *ret_buffer;
558   size_t buffer_len = *ret_buffer_len;
559
560   uint16_t tmp16;
561   uint64_t tmp64;
562   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
563
564   uint16_t pkg_length;
565
566   if ((size_t) buffer_len < exp_size)
567   {
568     noitL(noit_error, "collectd: parse_part_number: "
569         "Packet too short: "
570         "Chunk of size %zu expected, "
571         "but buffer has only %zu bytes left.\n",
572         exp_size, buffer_len);
573     return (-1);
574   }
575
576   /* skip pkg_type */
577   buffer += sizeof (tmp16);
578
579   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
580   buffer += sizeof (tmp16);
581   pkg_length = ntohs (tmp16);
582
583   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
584   buffer += sizeof (tmp64);
585   *value = collectd_ntohll (tmp64);
586
587   *ret_buffer = buffer;
588   *ret_buffer_len = buffer_len - pkg_length;
589
590   return (0);
591 } /* int parse_part_number */
592
593 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
594     char *output, int output_len)
595 {
596   char *buffer = *ret_buffer;
597   size_t buffer_len = *ret_buffer_len;
598
599   uint16_t tmp16;
600   size_t header_size = 2 * sizeof (uint16_t);
601
602   uint16_t pkg_length;
603
604   if (buffer_len < header_size)
605   {
606     noitL(noit_error, "collectd: parse_part_string: "
607         "Packet too short: "
608         "Chunk of at least size %zu expected, "
609         "but buffer has only %zu bytes left.\n",
610         header_size, buffer_len);
611     return (-1);
612   }
613
614   /* skip pkg_type */
615   buffer += sizeof (tmp16);
616
617   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
618   buffer += sizeof (tmp16);
619   pkg_length = ntohs (tmp16);
620
621   /* Check that packet fits in the input buffer */
622   if (pkg_length > buffer_len)
623   {
624     noitL(noit_error, "collectd: parse_part_string: "
625         "Packet too big: "
626         "Chunk of size %"PRIu16" received, "
627         "but buffer has only %zu bytes left.\n",
628         pkg_length, buffer_len);
629     return (-1);
630   }
631
632   /* Check that pkg_length is in the valid range */
633   if (pkg_length <= header_size)
634   {
635     noitL(noit_error, "collectd: parse_part_string: "
636         "Packet too short: "
637         "Header claims this packet is only %hu "
638         "bytes long.\n", pkg_length);
639     return (-1);
640   }
641
642   /* Check that the package data fits into the output buffer.
643    * The previous if-statement ensures that:
644    * `pkg_length > header_size' */
645   if ((output_len < 0)
646       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
647   {
648     noitL(noit_error, "collectd: parse_part_string: "
649         "Output buffer too small.\n");
650     return (-1);
651   }
652
653   /* All sanity checks successfull, let's copy the data over */
654   output_len = pkg_length - header_size;
655   memcpy ((void *) output, (void *) buffer, output_len);
656   buffer += output_len;
657
658   /* For some very weird reason '\0' doesn't do the trick on SPARC in
659    * this statement. */
660   if (output[output_len - 1] != 0)
661   {
662     noitL(noit_error, "collectd: parse_part_string: "
663         "Received string does not end "
664         "with a NULL-byte.\n");
665     return (-1);
666   }
667
668   *ret_buffer = buffer;
669   *ret_buffer_len = buffer_len - pkg_length;
670
671   return (0);
672 } /* int parse_part_string */
673
674
675 #define PP_SIGNED    0x01
676 #define PP_ENCRYPTED 0x02
677
678 #define BUFFER_READ(p,s) do { \
679   memcpy ((p), buffer + buffer_offset, (s)); \
680   buffer_offset += (s); \
681 } while (0)
682
683
684
685 // Forward declare
686 static int parse_packet (
687     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
688     void *buffer, size_t buffer_size, int flags);
689
690
691 static int parse_part_sign_sha256 (collectd_closure_t *ccl, noit_module_t *self,
692     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len, int flags)
693 {
694   unsigned char *buffer;
695   size_t buffer_len;
696   size_t buffer_offset;
697
698   size_t username_len;
699   part_signature_sha256_t pss;
700   uint16_t pss_head_length;
701   unsigned char hash[sizeof (pss.hash)];
702
703   unsigned char *hash_ptr;
704   unsigned int length;
705
706   buffer = *ret_buffer;
707   buffer_len = *ret_buffer_len;
708   buffer_offset = 0;
709
710   if (ccl->username == NULL)
711   {
712     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
713         "it because no user has been configured. Will accept it.\n");
714     return (0);
715   }
716
717   if (ccl->secret == NULL)
718   {
719     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
720         "it because no secret has been configured. Will accept it.\n");
721     return (0);
722   }
723
724   /* Check if the buffer has enough data for this structure. */
725   if (buffer_len <= PART_SIGNATURE_SHA256_SIZE)
726     return (-ENOMEM);
727
728   /* Read type and length header */
729   BUFFER_READ (&pss.head.type, sizeof (pss.head.type));
730   BUFFER_READ (&pss.head.length, sizeof (pss.head.length));
731   pss_head_length = ntohs (pss.head.length);
732
733   /* Check if the `pss_head_length' is within bounds. */
734   if ((pss_head_length <= PART_SIGNATURE_SHA256_SIZE)
735       || (pss_head_length > buffer_len))
736   {
737     noitL(noit_error, "collectd: HMAC-SHA-256 with invalid length received.\n");
738     return (-1);
739   }
740
741   /* Copy the hash. */
742   BUFFER_READ (pss.hash, sizeof (pss.hash));
743
744   /* Calculate username length (without null byte) and allocate memory */
745   username_len = pss_head_length - PART_SIGNATURE_SHA256_SIZE;
746   pss.username = malloc (username_len + 1);
747   if (pss.username == NULL)
748     return (-ENOMEM);
749
750   /* Read the username */
751   BUFFER_READ (pss.username, username_len);
752   pss.username[username_len] = 0;
753
754   assert (buffer_offset == pss_head_length);
755
756   /* Match up the username with the expected username */
757   if (strcmp(ccl->username, pss.username) != 0)
758   {
759     noitL(noit_error, "collectd: User: %s and Given User: %s don't match\n", ccl->username, pss.username);
760     sfree (pss.username);
761     return (-ENOENT);
762   }
763
764   /* Create a hash device and check the HMAC */
765   hash_ptr = HMAC(EVP_sha256(), ccl->secret, strlen(ccl->secret),
766       buffer     + PART_SIGNATURE_SHA256_SIZE,
767       buffer_len - PART_SIGNATURE_SHA256_SIZE,
768       hash,         &length);
769   if (hash_ptr == NULL)
770   {
771     noitL(noit_error, "collectd: Creating HMAC-SHA-256 object failed.\n");
772     sfree (pss.username);
773     return (-1);
774   }
775
776   /* Clean up */
777   sfree (pss.username);
778
779   if (memcmp (pss.hash, hash, sizeof (pss.hash)) != 0)
780   {
781     noitL(noit_error, "collectd: Verifying HMAC-SHA-256 signature failed: "
782         "Hash mismatch.\n");
783   }
784   else
785   {
786     parse_packet (ccl, self, check, buffer + buffer_offset, buffer_len - buffer_offset,
787         flags | PP_SIGNED);
788   }
789
790   *ret_buffer = buffer + buffer_len;
791   *ret_buffer_len = 0;
792
793   return (0);
794 } /* }}} int parse_part_sign_sha256 */
795 /* #endif HAVE_LIBGCRYPT */
796
797 static int parse_part_encr_aes256 (collectd_closure_t *ccl, noit_module_t *self,
798     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len,
799     int flags)
800 {
801   unsigned char  *buffer = *ret_buffer;
802   size_t buffer_len = *ret_buffer_len;
803   size_t payload_len;
804   size_t part_size;
805   size_t buffer_offset;
806   int    tmpbufsize;
807   unsigned char   *tmpbuf;
808   uint16_t username_len;
809   part_encryption_aes256_t pea;
810   unsigned char hash[sizeof (pea.hash)];
811   unsigned int hash_length;
812
813   EVP_CIPHER_CTX *ctx;
814   EVP_MD_CTX ctx_md;
815   int err;
816
817   /* Make sure at least the header if available. */
818   if (buffer_len <= PART_ENCRYPTION_AES256_SIZE)
819   {
820     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
821         "Discarding short packet.\n");
822     return (-1);
823   }
824
825   buffer_offset = 0;
826
827   /* Copy the unencrypted information into `pea'. */
828   BUFFER_READ (&pea.head.type, sizeof (pea.head.type));
829   BUFFER_READ (&pea.head.length, sizeof (pea.head.length));
830
831   /* Check the `part size'. */
832   part_size = ntohs (pea.head.length);
833   if ((part_size <= PART_ENCRYPTION_AES256_SIZE)
834       || (part_size > buffer_len))
835   {
836     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
837         "Discarding part with invalid size.\n");
838     return (-1);
839   }
840
841   /* Read the username */
842   BUFFER_READ (&username_len, sizeof (username_len));
843   username_len = ntohs (username_len);
844
845   if ((username_len <= 0)
846       || (username_len > (part_size - (PART_ENCRYPTION_AES256_SIZE + 1))))
847   {
848     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
849         "Discarding part with invalid username length.\n");
850     return (-1);
851   }
852
853   assert (username_len > 0);
854   pea.username = malloc (username_len + 1);
855   if (pea.username == NULL)
856     return (-ENOMEM);
857   BUFFER_READ (pea.username, username_len);
858   pea.username[username_len] = 0;
859
860   /* Last but not least, the initialization vector */
861   BUFFER_READ (pea.iv, sizeof (pea.iv));
862
863   /* Make sure we are at the right position */
864   assert (buffer_offset == (username_len +
865         PART_ENCRYPTION_AES256_SIZE - sizeof (pea.hash)));
866
867   /* Match up the username with the expected username */
868   if (strcmp(ccl->username, pea.username) != 0)
869   {
870     noitL(noit_error, "collectd: Username received and server side username don't match\n");
871     sfree (pea.username);
872     return (-ENOENT);
873   }
874
875   ctx = network_get_aes256_cypher (ccl, pea.iv, sizeof (pea.iv),
876       pea.username);
877   if (ctx == NULL)
878     return (-1);
879
880   payload_len = part_size - (PART_ENCRYPTION_AES256_SIZE + username_len);
881   assert (payload_len > 0);
882   tmpbuf = malloc(part_size - buffer_offset);
883
884   /* Decrypt the packet */
885   err = EVP_DecryptUpdate(ctx,
886       tmpbuf, &tmpbufsize,
887       buffer    + buffer_offset,
888       part_size - buffer_offset);
889   if (err != 1)
890   {
891     noitL(noit_error, "collectd: openssl returned: %d\n", err);
892     return (-1);
893   }
894
895   assert(part_size - buffer_offset == tmpbufsize);
896   /* Make it appear to be in place */
897   memcpy(buffer + buffer_offset, tmpbuf, part_size - buffer_offset);
898   sfree(tmpbuf);
899
900   /* Read the hash */
901   BUFFER_READ (pea.hash, sizeof (pea.hash));
902
903   /* Make sure we're at the right position - again */
904   assert (buffer_offset == (username_len + PART_ENCRYPTION_AES256_SIZE));
905   assert (buffer_offset == (part_size - payload_len));
906
907   /* Check hash sum */
908   memset (hash, 0, sizeof (hash));
909   EVP_DigestInit(&ctx_md, EVP_sha1());
910   EVP_DigestUpdate(&ctx_md, buffer + buffer_offset, payload_len);
911   EVP_DigestFinal(&ctx_md, hash, &hash_length);
912   if (memcmp (hash, pea.hash, sizeof (hash)) != 0)
913   {
914     noitL(noit_error, "collectd: Decryption failed: Checksum mismatch.\n");
915     return (-1);
916   }
917
918   parse_packet (ccl, self, check, buffer + buffer_offset, payload_len,
919       flags | PP_ENCRYPTED);
920
921   /* Update return values */
922   *ret_buffer =     buffer     + part_size;
923   *ret_buffer_len = buffer_len - part_size;
924   sfree(pea.username);
925
926   return (0);
927 } /* }}} int parse_part_encr_aes256 */
928
929
930 #undef BUFFER_READ
931
932
933 static int parse_packet (/* {{{ */
934     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
935     void *buffer, size_t buffer_size, int flags)
936 {
937   int status;
938
939   int packet_was_signed = (flags & PP_SIGNED);
940   int packet_was_encrypted = (flags & PP_ENCRYPTED);
941   int printed_ignore_warning = 0;
942
943 #define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL }
944   value_list_t vl = VALUE_LIST_INIT;
945   notification_t n;
946
947   memset (&vl, '\0', sizeof (vl));
948   memset (&n, '\0', sizeof (n));
949   status = 0;
950
951   while ((status == 0) && (0 < buffer_size)
952       && ((unsigned int) buffer_size > sizeof (part_header_t)))
953   {
954     uint16_t pkg_length;
955     uint16_t pkg_type;
956
957     memcpy ((void *) &pkg_type,
958         (void *) buffer,
959         sizeof (pkg_type));
960     memcpy ((void *) &pkg_length,
961         (void *) ((char *)buffer + sizeof (pkg_type)),
962         sizeof (pkg_length));
963
964     pkg_length = ntohs (pkg_length);
965     pkg_type = ntohs (pkg_type);
966
967     if (pkg_length > buffer_size)
968       break;
969     /* Ensure that this loop terminates eventually */
970     if (pkg_length < (2 * sizeof (uint16_t)))
971       break;
972
973
974     if (pkg_type == TYPE_ENCR_AES256)
975     {
976       status = parse_part_encr_aes256 (ccl, self, check,
977           &buffer, &buffer_size, flags);
978       if (status != 0)
979       {
980         noitL(noit_error, "collectd: Decrypting AES256 "
981             "part failed "
982             "with status %i.\n", status);
983         break;
984       }
985     }
986     else if ((ccl->security_level == SECURITY_LEVEL_ENCRYPT)
987         && (packet_was_encrypted == 0))
988     {
989       if (printed_ignore_warning == 0)
990       {
991         noitL(noit_debug, "collectd: Unencrypted packet or "
992             "part has been ignored.\n");
993         printed_ignore_warning = 1;
994       }
995       buffer = ((char *) buffer) + pkg_length;
996       continue;
997     }
998     else if (pkg_type == TYPE_SIGN_SHA256)
999     {
1000       status = parse_part_sign_sha256 (ccl, self, check,
1001                                         &buffer, &buffer_size, flags);
1002       if (status != 0)
1003       {
1004         noitL(noit_error, "collectd: Verifying HMAC-SHA-256 "
1005             "signature failed "
1006             "with status %i.\n", status);
1007         break;
1008       }
1009     }
1010     else if ((ccl->security_level == SECURITY_LEVEL_SIGN)
1011         && (packet_was_encrypted == 0)
1012         && (packet_was_signed == 0))
1013     {
1014       if (printed_ignore_warning == 0)
1015       {
1016         noitL(noit_debug, "collectd: Unsigned packet or "
1017             "part has been ignored.\n");
1018         printed_ignore_warning = 1;
1019       }
1020       buffer = ((char *) buffer) + pkg_length;
1021       continue;
1022     }
1023     else if (pkg_type == TYPE_VALUES)
1024     {
1025       status = parse_part_values (&buffer, &buffer_size,
1026           &vl.values, &vl.types, &vl.values_len);
1027
1028       if (status != 0)
1029         break;
1030
1031       if ((strlen (vl.host) > 0) &&
1032           (strlen (vl.plugin) > 0) &&
1033           (strlen (vl.type) > 0))
1034       {
1035         queue_values(ccl, self, check, &vl);
1036       }
1037       else
1038       {
1039         noitL(noit_error,
1040               "collectd: NOT dispatching values [%lld,%s:%s:%s]\n",
1041               (long long int)vl.time, vl.host, vl.plugin, vl.type);
1042       }
1043
1044       sfree (vl.values);
1045       sfree (vl.types);
1046     }
1047     else if (pkg_type == TYPE_TIME)
1048     {
1049       uint64_t tmp = 0;
1050       status = parse_part_number (&buffer, &buffer_size, &tmp);
1051       if (status == 0)
1052       {
1053         vl.time = (time_t) tmp;
1054         n.time = (time_t) tmp;
1055       }
1056     }
1057     else if (pkg_type == TYPE_INTERVAL)
1058     {
1059       uint64_t tmp = 0;
1060       status = parse_part_number (&buffer, &buffer_size,
1061           &tmp);
1062       if (status == 0)
1063         vl.interval = (int) tmp;
1064     }
1065     else if (pkg_type == TYPE_HOST)
1066     {
1067       status = parse_part_string (&buffer, &buffer_size,
1068           vl.host, sizeof (vl.host));
1069       if (status == 0)
1070         sstrncpy (n.host, vl.host, sizeof (n.host));
1071     }
1072     else if (pkg_type == TYPE_PLUGIN)
1073     {
1074       status = parse_part_string (&buffer, &buffer_size,
1075           vl.plugin, sizeof (vl.plugin));
1076       if (status == 0)
1077         sstrncpy (n.plugin, vl.plugin,
1078             sizeof (n.plugin));
1079     }
1080     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
1081     {
1082       status = parse_part_string (&buffer, &buffer_size,
1083           vl.plugin_instance,
1084           sizeof (vl.plugin_instance));
1085       if (status == 0)
1086         sstrncpy (n.plugin_instance,
1087             vl.plugin_instance,
1088             sizeof (n.plugin_instance));
1089     }
1090     else if (pkg_type == TYPE_TYPE)
1091     {
1092       status = parse_part_string (&buffer, &buffer_size,
1093           vl.type, sizeof (vl.type));
1094       if (status == 0)
1095         sstrncpy (n.type, vl.type, sizeof (n.type));
1096     }
1097     else if (pkg_type == TYPE_TYPE_INSTANCE)
1098     {
1099       status = parse_part_string (&buffer, &buffer_size,
1100           vl.type_instance,
1101           sizeof (vl.type_instance));
1102       if (status == 0)
1103         sstrncpy (n.type_instance, vl.type_instance,
1104             sizeof (n.type_instance));
1105     }
1106     else if (pkg_type == TYPE_MESSAGE)
1107     {
1108       status = parse_part_string (&buffer, &buffer_size,
1109           n.message, sizeof (n.message));
1110
1111       if (status != 0)
1112       {
1113         /* do nothing */
1114       }
1115       else if ((n.severity != NOTIF_FAILURE)
1116           && (n.severity != NOTIF_WARNING)
1117           && (n.severity != NOTIF_OKAY))
1118       {
1119         noitL(noit_error, "collectd: "
1120             "Ignoring notification with "
1121             "unknown severity %i.\n",
1122             n.severity);
1123       }
1124 /* Time proves untrustworthy... and we don't use it.
1125       else if (n.time <= 0)
1126       {
1127         noitL(noit_error, "collectd: "
1128             "Ignoring notification with "
1129             "time == 0.\n");
1130       }
1131 */
1132       else if (strlen (n.message) <= 0)
1133       {
1134         noitL(noit_error, "collectd: "
1135             "Ignoring notification with "
1136             "an empty message.\n");
1137       }
1138       else
1139       {
1140         queue_notifications(ccl, self, check, &n);
1141         noitL(noit_error, "collectd: "
1142             "DISPATCH NOTIFICATION\n");
1143       }
1144     }
1145     else if (pkg_type == TYPE_SEVERITY)
1146     {
1147       uint64_t tmp = 0;
1148       status = parse_part_number (&buffer, &buffer_size,
1149           &tmp);
1150       if (status == 0)
1151         n.severity = (int) tmp;
1152     }
1153     else
1154     {
1155       noitL(noit_error, "collectd: parse_packet: Unknown part"
1156           " type: 0x%04hx\n", pkg_type);
1157       buffer = ((char *) buffer) + pkg_length;
1158     }
1159   } /* while (buffer_size > sizeof (part_header_t)) */
1160
1161   return (status);
1162 } /* }}} int parse_packet */
1163
1164
1165 // Not proud of this at all however; I am not sure best how to address this.
1166 static int infer_type(char *buffer, int buffer_len, value_list_t *vl, int index) {
1167   int len = strlen(buffer);
1168   strcat(buffer, "`");
1169   if (cmp_type(vl, "load", "load")) {
1170     assert(vl->values_len == 3);
1171     switch (index) {
1172       case 0:
1173         strcat(buffer, "1min"); break;
1174       case 1:
1175         strcat(buffer, "5min"); break;
1176       case 2:
1177         strcat(buffer, "15min"); break;
1178     }
1179   } else if (cmp_plugin(vl, "interface")) {
1180     assert(vl->values_len == 2);
1181     switch (index) {
1182       case 0:
1183         strcat(buffer, "rx"); break;
1184       case 1:
1185         strcat(buffer, "tx"); break;
1186     }
1187   } else {
1188     char buf[20];
1189     snprintf(buf, sizeof(buf), "%d", index);
1190     strcat(buffer, buf);
1191     noitL(noit_debug, "collectd: parsing multiple values"
1192         " and guessing on the type for plugin[%s] and type[%s]"
1193         , vl->plugin, vl->type);
1194   }
1195   return len;
1196 }
1197
1198 static void concat_metrics(char *buffer, char* plugin, char* plugin_inst, char* type, char* type_inst) {
1199   strcpy(buffer, plugin);
1200
1201   if (strlen(plugin_inst)) {
1202     strcat(buffer, "`");
1203     strcat(buffer, plugin_inst);
1204   }
1205   if (strlen(type)) {
1206     strcat(buffer, "`");
1207     strcat(buffer, type);
1208   }
1209   if (strlen(type_inst)) {
1210     strcat(buffer, "`");
1211     strcat(buffer, type_inst);
1212   }
1213 }
1214
1215 static int queue_notifications(collectd_closure_t *ccl,
1216       noit_module_t *self, noit_check_t *check, notification_t *n) {
1217   stats_t current;
1218   noit_boolean immediate;
1219   char buffer[DATA_MAX_NAME_LEN*4 + 128];
1220   collectd_mod_config_t *conf;
1221   conf = noit_module_get_userdata(self);
1222
1223   if(!conf->support_notifications) return 0;
1224   /* We are passive, so we don't do anything for transient checks */
1225   if(check->flags & NP_TRANSIENT) return 0;
1226
1227   noit_check_stats_clear(check, &current);
1228   gettimeofday(&current.whence, NULL);
1229
1230   // Concat all the names together so they fit into the flat noitd model
1231   concat_metrics(buffer, n->plugin, n->plugin_instance, n->type, n->type_instance);
1232   noit_stats_set_metric(check, &ccl->current, buffer, METRIC_STRING, n->message);
1233   immediate = noit_collects_check_aynsch(self,check);
1234   if(immediate)
1235     noit_stats_log_immediate_metric(check, buffer, METRIC_STRING, n->message);
1236   noit_check_passive_set_stats(check, &current);
1237   noitL(nldeb, "collectd: dispatch_notifications(%s, %s, %s)\n",check->target, buffer, n->message);
1238   return 0;
1239 }
1240
1241
1242 static int queue_values(collectd_closure_t *ccl,
1243       noit_module_t *self, noit_check_t *check, value_list_t *vl) {
1244   noit_boolean immediate;
1245   char buffer[DATA_MAX_NAME_LEN*4 + 4 + 1 + 20];
1246   int i, len = 0;
1247
1248   // Concat all the names together so they fit into the flat noitd model
1249   immediate = noit_collects_check_aynsch(self,check);
1250   concat_metrics(buffer, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1251   for (i=0; i < vl->values_len; i++) {
1252  
1253     // Only infer the type if the amount of values is greater than one
1254     if (vl->values_len > 1) {
1255       // Trunc the string 
1256       if (len > 0)
1257         buffer[len] = 0;
1258       len = infer_type(buffer, sizeof(buffer), vl, i);
1259     }
1260
1261     switch (vl->types[i])
1262     {
1263       case DS_TYPE_COUNTER:
1264         noit_stats_set_metric(check, &ccl->current, buffer, METRIC_UINT64, &vl->values[i].counter);
1265         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_UINT64, &vl->values[i].counter);
1266         break;
1267
1268       case DS_TYPE_GAUGE:
1269         noit_stats_set_metric(check, &ccl->current, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1270         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1271         break;
1272
1273       case DS_TYPE_DERIVE:
1274         noit_stats_set_metric(check, &ccl->current, buffer, METRIC_INT64, &vl->values[i].derive);
1275         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_INT64, &vl->values[i].derive);
1276         break;
1277
1278       case DS_TYPE_ABSOLUTE:
1279         noit_stats_set_metric(check, &ccl->current, buffer, METRIC_INT64, &vl->values[i].absolute);
1280         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_INT64, &vl->values[i].absolute);
1281         break;
1282
1283       default:
1284         noitL(noit_debug, "collectd: parse_part_values: "
1285               "Don't know how to handle data source type %"PRIu8 "\n",
1286               vl->types[i]);
1287         return (-1);
1288     } /* switch (value_types[i]) */
1289     ccl->stats_count++;
1290     noitL(nldeb, "collectd: queue_values(%s, %s)\n", buffer, check->target);
1291   }
1292   return 0;
1293 }
1294
1295 static void clear_closure(noit_check_t *check, collectd_closure_t *ccl) {
1296   ccl->stats_count = 0;
1297   ccl->ntfy_count = 0;
1298   noit_check_stats_clear(check, &ccl->current);
1299
1300 }
1301
1302 static int collectd_submit(noit_module_t *self, noit_check_t *check,
1303                            noit_check_t *cause) {
1304   collectd_closure_t *ccl;
1305   struct timeval duration;
1306   /* We are passive, so we don't do anything for transient checks */
1307   if(check->flags & NP_TRANSIENT) return 0;
1308
1309   noit_collects_check_aynsch(self, check);
1310   if(!check->closure) {
1311     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1312     memset(ccl, 0, sizeof(collectd_closure_t));
1313   } else {
1314     // Don't count the first run
1315     char human_buffer[256];
1316     ccl = (collectd_closure_t*)check->closure;
1317     gettimeofday(&ccl->current.whence, NULL);
1318     sub_timeval(ccl->current.whence, check->last_fire_time, &duration);
1319     ccl->current.duration = duration.tv_sec; // + duration.tv_usec / (1000 * 1000);
1320
1321     snprintf(human_buffer, sizeof(human_buffer),
1322              "dur=%d,run=%d,stats=%d,ntfy=%d", ccl->current.duration,
1323              check->generation, ccl->stats_count, ccl->ntfy_count);
1324     noitL(nldeb, "collectd(%s) [%s]\n", check->target, human_buffer);
1325
1326     // Not sure what to do here
1327     ccl->current.available = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1328         NP_AVAILABLE : NP_UNAVAILABLE;
1329     ccl->current.state = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1330         NP_GOOD : NP_BAD;
1331     ccl->current.status = human_buffer;
1332     noit_check_passive_set_stats(check, &ccl->current);
1333
1334     memcpy(&check->last_fire_time, &ccl->current.whence, sizeof(duration));
1335   }
1336   clear_closure(check, ccl);
1337   return 0;
1338 }
1339
1340 struct collectd_pkt {
1341   noit_module_t *self;  /* which collect load context */
1342   char *payload;
1343   int len;
1344 };
1345
1346 static int
1347 push_packet_at_check(noit_check_t *check, void *closure) {
1348   struct collectd_pkt *pkt = closure;
1349   collectd_closure_t *ccl;
1350   char *security_buffer;
1351   collectd_mod_config_t *conf;
1352   conf = noit_module_get_userdata(pkt->self);
1353
1354   /* We need a check, and a collectd one at that */
1355   if (!check || strcmp(check->module, "collectd")) return 0;
1356
1357   // If its a new check retrieve some values
1358   if (check->closure == NULL) {
1359     // TODO: Verify if it could somehow retrieve data before the check closure exists
1360     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1361     memset(ccl, 0, sizeof(collectd_closure_t));
1362   } else {
1363     ccl = (collectd_closure_t*)check->closure;
1364   }
1365   // Default to NONE
1366   ccl->security_level = SECURITY_LEVEL_NONE;
1367   if (noit_hash_retr_str(check->config, "security_level", strlen("security_level"),
1368                          (const char**)&security_buffer))
1369   {
1370     ccl->security_level = atoi(security_buffer);
1371   }
1372
1373   // Is this outside to keep updates happening?
1374   if (!noit_hash_retr_str(check->config, "username", strlen("username"),
1375                          (const char**)&ccl->username) &&
1376       !noit_hash_retr_str(conf->options, "username", strlen("username"),
1377                          (const char**)&ccl->username))
1378   {
1379     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1380       noitL(nlerr, "collectd: no username defined for check.\n");
1381       return 0;
1382     } else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1383       noitL(nlerr, "collectd: no username defined for check, "
1384           "will accept any signed packet.\n");
1385     }
1386   }
1387
1388   if(!ccl->secret)
1389     noit_hash_retr_str(check->config, "secret", strlen("secret"),
1390                        (const char**)&ccl->secret);
1391   if(!ccl->secret)
1392     noit_hash_retr_str(conf->options, "secret", strlen("secret"),
1393                        (const char**)&ccl->secret);
1394   if(!ccl->secret) {
1395     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1396       noitL(nlerr, "collectd: no secret defined for check.\n");
1397       return 0;
1398     }
1399     else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1400       noitL(nlerr, "collectd: no secret defined for check, "
1401           "will accept any signed packet.\n");
1402     }
1403   }
1404
1405   parse_packet(ccl, pkt->self, check, pkt->payload, pkt->len, 0);
1406   return 1;
1407 }
1408
1409 static int noit_collectd_handler(eventer_t e, int mask, void *closure,
1410                              struct timeval *now) {
1411   union {
1412     struct sockaddr_in  skaddr;
1413     struct sockaddr_in6 skaddr6;
1414   } remote;
1415   char packet[1500];
1416   int packet_len = sizeof(packet);
1417   unsigned int from_len;
1418   char ip_p[INET6_ADDRSTRLEN];
1419   noit_module_t *self = (noit_module_t *)closure;
1420
1421   // Get the username and password of the string
1422
1423   while(1) {
1424     struct collectd_pkt pkt;
1425     int inlen, check_cnt;
1426
1427     from_len = sizeof(remote);
1428
1429     inlen = recvfrom(e->fd, packet, packet_len, 0,
1430                      (struct sockaddr *)&remote, &from_len);
1431     gettimeofday(now, NULL); /* set it, as we care about accuracy */
1432
1433     if(inlen < 0) {
1434       if(errno == EAGAIN || errno == EINTR) break;
1435       noitLT(nlerr, now, "collectd: recvfrom: %s\n", strerror(errno));
1436       break;
1437     }
1438     if (from_len == sizeof(remote.skaddr)) {
1439       if (!inet_ntop(AF_INET, &(remote.skaddr.sin_addr), ip_p, INET_ADDRSTRLEN)) {
1440         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1441         break;
1442       }
1443     }
1444     else if(from_len == sizeof(remote.skaddr6)) {
1445       if (!inet_ntop(AF_INET6, &(remote.skaddr6.sin6_addr), ip_p, INET6_ADDRSTRLEN)) {
1446         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1447         break;
1448       }
1449     }
1450     else {
1451       noitLT(nlerr, now, "collectd: could not determine address family of remote\n");
1452       break;
1453     }
1454
1455     pkt.self = self;
1456     pkt.payload = packet;
1457     pkt.len = inlen;
1458     check_cnt = noit_poller_target_do(ip_p, push_packet_at_check ,&pkt);
1459     if(check_cnt == 0)
1460       noitL(nlerr, "collectd: No defined check from ip [%s].\n", ip_p);
1461   }
1462   return EVENTER_READ | EVENTER_EXCEPTION;
1463 }
1464
1465 static int noit_collectd_initiate_check(noit_module_t *self,
1466                                         noit_check_t *check,
1467                                         int once, noit_check_t *cause) {
1468   /* The idea is to write the collectd stuff to the stats one every period
1469    * Then we can warn people if no stats where written in a period of time
1470    */
1471   check->flags |= NP_PASSIVE_COLLECTION;
1472   INITIATE_CHECK(collectd_submit, self, check, cause);
1473   return 0;
1474 }
1475
1476 static int noit_collectd_config(noit_module_t *self, noit_hash_table *options) {
1477   collectd_mod_config_t *conf;
1478   conf = noit_module_get_userdata(self);
1479   if(conf) {
1480     if(conf->options) {
1481       noit_hash_destroy(conf->options, free, free);
1482       free(conf->options);
1483     }
1484   }
1485   else
1486     conf = calloc(1, sizeof(*conf));
1487   conf->options = options;
1488   noit_module_set_userdata(self, conf);
1489   return 1;
1490 }
1491
1492 static int noit_collectd_onload(noit_image_t *self) {
1493   if(!nlerr) nlerr = noit_log_stream_find("error/collectd");
1494   if(!nldeb) nldeb = noit_log_stream_find("debug/collectd");
1495   if(!nlerr) nlerr = noit_error;
1496   if(!nldeb) nldeb = noit_debug;
1497   eventer_name_callback("noit_collectd/handler", noit_collectd_handler);
1498   return 0;
1499 }
1500
1501 static int noit_collectd_init(noit_module_t *self) {
1502   const char *config_val;
1503   int sockaddr_len;
1504   collectd_mod_config_t *conf;
1505   conf = noit_module_get_userdata(self);
1506   int portint = 0;
1507   struct sockaddr_in skaddr;
1508   struct sockaddr_in6 skaddr6;
1509   struct in6_addr in6addr_any;
1510   const char *host;
1511   unsigned short port;
1512
1513   memset(&in6addr_any, 0, sizeof(in6addr_any));
1514   conf->support_notifications = noit_true;
1515   if(noit_hash_retr_str(conf->options,
1516                         "notifications", strlen("notifications"),
1517                         (const char **)&config_val)) {
1518     if(!strcasecmp(config_val, "false") || !strcasecmp(config_val, "off"))
1519       conf->support_notifications = noit_false;
1520   }
1521   conf->asynch_metrics = noit_false;
1522   if(noit_hash_retr_str(conf->options,
1523                         "asynch_metrics", strlen("asynch_metrics"),
1524                         (const char **)&config_val)) {
1525     if(!strcasecmp(config_val, "true") || !strcasecmp(config_val, "on"))
1526       conf->asynch_metrics = noit_true;
1527   }
1528
1529   /* Default Collectd port */
1530   portint = NET_DEFAULT_PORT;
1531   if(noit_hash_retr_str(conf->options,
1532                          "collectd_port", strlen("collectd_port"),
1533                          (const char**)&config_val))
1534     portint = atoi(config_val);
1535
1536
1537   if(!noit_hash_retr_str(conf->options,
1538                          "collectd_host", strlen("collectd_host"),
1539                          (const char**)&host))
1540     host = "*";
1541
1542   port = (unsigned short) portint;
1543
1544   conf->ipv4_fd = conf->ipv6_fd = -1;
1545
1546   conf->ipv4_fd = socket(PF_INET, SOCK_DGRAM, 0);
1547   if(conf->ipv4_fd < 0) {
1548     noitL(noit_error, "collectd: socket failed: %s\n",
1549           strerror(errno));
1550   }
1551   else {
1552     if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
1553       close(conf->ipv4_fd);
1554       conf->ipv4_fd = -1;
1555       noitL(noit_error,
1556             "collectd: could not set socket non-blocking: %s\n",
1557             strerror(errno));
1558     }
1559   }
1560   skaddr.sin_family = AF_INET;
1561   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
1562   skaddr.sin_port = htons(port);
1563
1564   sockaddr_len = sizeof(skaddr);
1565   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sockaddr_len) < 0) {
1566     noitL(noit_error, "bind failed[%s]: %s\n", host, strerror(errno));
1567     close(conf->ipv4_fd);
1568     return -1;
1569   }
1570
1571   if(conf->ipv4_fd >= 0) {
1572     eventer_t newe;
1573     newe = eventer_alloc();
1574     newe->fd = conf->ipv4_fd;
1575     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
1576     newe->callback = noit_collectd_handler;
1577     newe->closure = self;
1578     eventer_add(newe);
1579   }
1580
1581   conf->ipv6_fd = socket(AF_INET6, SOCK_DGRAM, 0);
1582   if(conf->ipv6_fd < 0) {
1583     noitL(noit_error, "collectd: IPv6 socket failed: %s\n",
1584           strerror(errno));
1585   }
1586   else {
1587     if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
1588       close(conf->ipv6_fd);
1589       conf->ipv6_fd = -1;
1590       noitL(noit_error,
1591             "collectd: could not set socket non-blocking: %s\n",
1592                strerror(errno));
1593     }
1594   }
1595   sockaddr_len = sizeof(skaddr6);
1596   memset(&skaddr6, 0, sizeof(skaddr6));
1597   skaddr6.sin6_family = AF_INET6;
1598   skaddr6.sin6_addr = in6addr_any;
1599   skaddr6.sin6_port = htons(port);
1600
1601   if(bind(conf->ipv6_fd, (struct sockaddr *)&skaddr6, sockaddr_len) < 0) {
1602     noitL(noit_error, "bind(IPv6) failed[%s]: %s\n", host, strerror(errno));
1603     close(conf->ipv6_fd);
1604     conf->ipv6_fd = -1;
1605   }
1606
1607   if(conf->ipv6_fd >= 0) {
1608     eventer_t newe;
1609     newe = eventer_alloc();
1610     newe->fd = conf->ipv6_fd;
1611     newe->mask = EVENTER_READ;
1612     newe->callback = noit_collectd_handler;
1613     newe->closure = self;
1614     eventer_add(newe);
1615   }
1616
1617   noit_module_set_userdata(self, conf);
1618   return 0;
1619 }
1620
1621 #include "collectd.xmlh"
1622 noit_module_t collectd = {
1623   {
1624     NOIT_MODULE_MAGIC,
1625     NOIT_MODULE_ABI_VERSION,
1626     "collectd",
1627     "collectd collection",
1628     collectd_xml_description,
1629     noit_collectd_onload
1630   },
1631   noit_collectd_config,
1632   noit_collectd_init,
1633   noit_collectd_initiate_check,
1634   NULL /* noit_collectd_cleanup */
1635 };
Note: See TracBrowser for help on using the browser.