root/src/modules/collectd.c

Revision ce6a0600761ddbd0ee44fdc8d488a24e5a020fa1, 44.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

fix two warnings

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009  Florian Forster
3  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
4  * Copyright (c) 2009, Dan Di Spaltro
5  * All rights reserved.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library, in file named COPYING.LGPL; if not,
19  * write to the Free Software Foundation, Inc., 59 Temple Place,
20  * Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 #include "noit_defines.h"
25
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <math.h>
31 #include <ctype.h>
32 #ifdef HAVE_SYS_FILIO_H
33 #include <sys/filio.h>
34 #endif
35 #include <openssl/evp.h>
36 #include <openssl/hmac.h>
37 #include <inttypes.h>
38 #include <arpa/inet.h>
39
40 #include "noit_module.h"
41 #include "noit_check.h"
42 #include "noit_check_tools.h"
43 #include "utils/noit_log.h"
44 #include "utils/noit_hash.h"
45
46
47 static noit_log_stream_t nlerr = NULL;
48 static noit_log_stream_t nldeb = NULL;
49
50 typedef struct _mod_config {
51   noit_hash_table *options;
52   noit_hash_table target_sessions;
53   noit_boolean support_notifications;
54   int ipv4_fd;
55   int ipv6_fd;
56 } collectd_mod_config_t;
57
58 typedef struct collectd_closure_s {
59   char *username;
60   char *secret;
61   int security_level;
62   EVP_CIPHER_CTX ctx;
63   stats_t current;
64   int stats_count;
65   int ntfy_count;
66 } collectd_closure_t;
67
68 #define cmp_plugin(vl, val) \
69   (strncmp(vl->plugin, val, sizeof(val)) == 0)
70
71 #define cmp_type(vl, plugin, type) \
72   (cmp_plugin(vl, plugin) && \
73    cmp_plugin(vl, type))
74
75
76 /**
77  *
78  * Collectd Structs
79  *
80  *
81  **/
82
83 union meta_value_u
84 {
85   char    *mv_string;
86   int64_t  mv_signed_int;
87   uint64_t mv_unsigned_int;
88   double   mv_double;
89   _Bool    mv_boolean;
90 };
91 typedef union meta_value_u meta_value_t;
92
93 struct meta_entry_s;
94 typedef struct meta_entry_s meta_entry_t;
95 struct meta_entry_s
96 {
97   char         *key;
98   meta_value_t  value;
99   int           type;
100   meta_entry_t *next;
101 };
102
103 struct meta_data_s
104 {
105   meta_entry_t   *head;
106   pthread_mutex_t lock;
107 };
108
109 typedef struct meta_data_s meta_data_t;
110
111
112 #define NET_DEFAULT_V4_ADDR "239.192.74.66"
113 #define NET_DEFAULT_V6_ADDR "ff18::efc0:4a42"
114 #define NET_DEFAULT_PORT    25826
115
116 #define TYPE_HOST            0x0000
117 #define TYPE_TIME            0x0001
118 #define TYPE_PLUGIN          0x0002
119 #define TYPE_PLUGIN_INSTANCE 0x0003
120 #define TYPE_TYPE            0x0004
121 #define TYPE_TYPE_INSTANCE   0x0005
122 #define TYPE_VALUES          0x0006
123 #define TYPE_INTERVAL        0x0007
124
125 /* Types to transmit notifications */
126 #define TYPE_MESSAGE         0x0100
127 #define TYPE_SEVERITY        0x0101
128
129 #define TYPE_SIGN_SHA256     0x0200
130 #define TYPE_ENCR_AES256     0x0210
131
132 #define DATA_MAX_NAME_LEN 64
133
134 #define NOTIF_MAX_MSG_LEN 256
135 #define NOTIF_FAILURE 1
136 #define NOTIF_WARNING 2
137 #define NOTIF_OKAY    4
138
139 #define sfree(ptr) \
140   do { \
141     if((ptr) != NULL) { \
142       free(ptr); \
143     } \
144     (ptr) = NULL; \
145   } while (0)
146
147 #define sstrncpy(a, b, c) strncpy(a, b, c)
148
149 #define SECURITY_LEVEL_NONE     0
150 #define SECURITY_LEVEL_SIGN    1
151 #define SECURITY_LEVEL_ENCRYPT 2
152
153 #define DS_TYPE_COUNTER  0
154 #define DS_TYPE_GAUGE    1
155 #define DS_TYPE_DERIVE   2
156 #define DS_TYPE_ABSOLUTE 3
157
158 #define DS_TYPE_TO_STRING(t) (t == DS_TYPE_COUNTER)     ? "counter"  : \
159         (t == DS_TYPE_GAUGE)    ? "gauge"    : \
160         (t == DS_TYPE_DERIVE)   ? "derive"   : \
161         (t == DS_TYPE_ABSOLUTE) ? "absolute" : \
162         "unknown"
163
164 #define BUFF_SIZE 1024
165
166 typedef unsigned long long counter_t;
167 typedef double gauge_t;
168 typedef int64_t derive_t;
169 typedef uint64_t absolute_t;
170 int  interval_g;
171
172 union value_u
173 {
174   counter_t  counter;
175   gauge_t    gauge;
176   derive_t   derive;
177   absolute_t absolute;
178 };
179 typedef union value_u value_t;
180
181 struct value_list_s
182 {
183   value_t *values;
184   int      values_len;
185   time_t   time;
186   int      interval;
187   char     host[DATA_MAX_NAME_LEN];
188   char     plugin[DATA_MAX_NAME_LEN];
189   char     plugin_instance[DATA_MAX_NAME_LEN];
190   char     type[DATA_MAX_NAME_LEN];
191   char     type_instance[DATA_MAX_NAME_LEN];
192   meta_data_t *meta;
193   uint8_t  *types;
194 };
195 typedef struct value_list_s value_list_t;
196
197
198 enum notification_meta_type_e
199 {
200   NM_TYPE_STRING,
201   NM_TYPE_SIGNED_INT,
202   NM_TYPE_UNSIGNED_INT,
203   NM_TYPE_DOUBLE,
204   NM_TYPE_BOOLEAN
205 };
206
207 typedef struct notification_meta_s
208 {
209   char name[DATA_MAX_NAME_LEN];
210   enum notification_meta_type_e type;
211   union
212   {
213     const char *nm_string;
214     int64_t nm_signed_int;
215     uint64_t nm_unsigned_int;
216     double nm_double;
217     bool nm_boolean;
218   } nm_value;
219   struct notification_meta_s *next;
220 } notification_meta_t;
221
222 typedef struct notification_s
223 {
224   int    severity;
225   time_t time;
226   char   message[NOTIF_MAX_MSG_LEN];
227   char   host[DATA_MAX_NAME_LEN];
228   char   plugin[DATA_MAX_NAME_LEN];
229   char   plugin_instance[DATA_MAX_NAME_LEN];
230   char   type[DATA_MAX_NAME_LEN];
231   char   type_instance[DATA_MAX_NAME_LEN];
232   notification_meta_t *meta;
233 } notification_t;
234
235
236
237 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
238  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
239  * +-------+-----------------------+-------------------------------+
240  * ! Ver.  !                       ! Length                        !
241  * +-------+-----------------------+-------------------------------+
242  */
243 struct part_header_s
244 {
245   uint16_t type;
246   uint16_t length;
247 };
248 typedef struct part_header_s part_header_t;
249
250 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
251  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
252  * +-------------------------------+-------------------------------+
253  * ! Type                          ! Length                        !
254  * +-------------------------------+-------------------------------+
255  * : (Length - 4) Bytes                                            :
256  * +---------------------------------------------------------------+
257  */
258 struct part_string_s
259 {
260   part_header_t *head;
261   char *value;
262 };
263 typedef struct part_string_s part_string_t;
264
265 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
266  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
267  * +-------------------------------+-------------------------------+
268  * ! Type                          ! Length                        !
269  * +-------------------------------+-------------------------------+
270  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
271  * +---------------------------------------------------------------+
272  */
273 struct part_number_s
274 {
275   part_header_t *head;
276   uint64_t *value;
277 };
278 typedef struct part_number_s part_number_t;
279
280 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
281  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
282  * +-------------------------------+-------------------------------+
283  * ! Type                          ! Length                        !
284  * +-------------------------------+---------------+---------------+
285  * ! Num of values                 ! Type0         ! Type1         !
286  * +-------------------------------+---------------+---------------+
287  * ! Value0                                                        !
288  * !                                                               !
289  * +---------------------------------------------------------------+
290  * ! Value1                                                        !
291  * !                                                               !
292  * +---------------------------------------------------------------+
293  */
294 struct part_values_s
295 {
296   part_header_t *head;
297   uint16_t *num_values;
298   uint8_t  *values_types;
299   value_t  *values;
300 };
301 typedef struct part_values_s part_values_t;
302
303 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
304  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
305  * +-------------------------------+-------------------------------+
306  * ! Type                          ! Length                        !
307  * +-------------------------------+-------------------------------+
308  * ! Hash (Bits   0 -  31)                                         !
309  * : :                                                             :
310  * ! Hash (Bits 224 - 255)                                         !
311  * +---------------------------------------------------------------+
312  */
313 /* Minimum size */
314 #define PART_SIGNATURE_SHA256_SIZE 36
315 struct part_signature_sha256_s
316 {
317   part_header_t head;
318   unsigned char hash[32];
319   char *username;
320 };
321 typedef struct part_signature_sha256_s part_signature_sha256_t;
322
323 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
324  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
325  * +-------------------------------+-------------------------------+
326  * ! Type                          ! Length                        !
327  * +-------------------------------+-------------------------------+
328  * ! Original length               ! Padding (0 - 15 bytes)        !
329  * +-------------------------------+-------------------------------+
330  * ! Hash (Bits   0 -  31)                                         !
331  * : :                                                             :
332  * ! Hash (Bits 128 - 159)                                         !
333  * +---------------------------------------------------------------+
334  */
335 /* Minimum size */
336 #define PART_ENCRYPTION_AES256_SIZE 42
337 struct part_encryption_aes256_s
338 {
339   part_header_t head;
340   uint16_t username_length;
341   char *username;
342   unsigned char iv[16];
343   /* <encrypted> */
344   unsigned char hash[20];
345   /*   <payload /> */
346   /* </encrypted> */
347 };
348 typedef struct part_encryption_aes256_s part_encryption_aes256_t;
349
350 struct receive_list_entry_s
351 {
352   char data[BUFF_SIZE];
353   int  data_len;
354   int  fd;
355   struct receive_list_entry_s *next;
356 };
357 typedef struct receive_list_entry_s receive_list_entry_t;
358
359
360 /**
361  *
362  *  END Collectd Structs
363  *
364  */
365
366 static unsigned long long collectd_ntohll (unsigned long long n)
367 {
368 #if BYTE_ORDER == BIG_ENDIAN
369   return (n);
370 #else
371   return (((unsigned long long) ntohl (n)) << 32) + ntohl (n >> 32);
372 #endif
373 } /* unsigned long long collectd_ntohll */
374
375 #define ntohd(d) (d)
376
377
378 /* Forward declare this so the parse function can use it */
379 static int queue_values(collectd_closure_t *ccl,
380   noit_module_t *self, noit_check_t *check, value_list_t *vl);
381 static int queue_notifications(collectd_closure_t *ccl,
382   noit_module_t *self, noit_check_t *check, notification_t *n);
383
384
385 static EVP_CIPHER_CTX* network_get_aes256_cypher (collectd_closure_t *ccl, /* {{{ */
386     const void *iv, size_t iv_size, const char *username)
387 {
388
389   if (ccl->secret == NULL)
390     return (NULL);
391   else
392   {
393     EVP_CIPHER_CTX *ctx_ptr;
394     EVP_MD_CTX ctx_md;
395     unsigned char password_hash[32];
396     unsigned int length = 0;
397     int success;
398
399     ctx_ptr = &ccl->ctx;
400
401     EVP_DigestInit(&ctx_md, EVP_sha256());
402     EVP_DigestUpdate(&ctx_md, ccl->secret, strlen(ccl->secret));
403     EVP_DigestFinal(&ctx_md, password_hash, &length);
404     EVP_MD_CTX_cleanup(&ctx_md);
405
406     assert(length <= 32);
407
408     success = EVP_DecryptInit(ctx_ptr, EVP_aes_256_ofb(), password_hash, iv);
409     if (success != 1)
410     {
411       noitL(noit_error, "collectd: EVP_DecryptInit returned: %d\n",
412           success);
413       return (NULL);
414     }
415     return (ctx_ptr);
416   }
417 } /* }}} int network_get_aes256_cypher */
418
419 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
420     value_t **ret_values, uint8_t **ret_types, int *ret_num_values)
421 {
422   char *buffer = *ret_buffer;
423   size_t buffer_len = *ret_buffer_len;
424
425   uint16_t tmp16;
426   size_t exp_size;
427   int   i;
428
429   uint16_t pkg_length;
430   uint16_t pkg_type;
431   uint16_t pkg_numval;
432
433   uint8_t *pkg_types;
434   value_t *pkg_values;
435
436   if (buffer_len < 15)
437   {
438     noitL(noit_error,"collectd: packet is too short: "
439         "buffer_len = %zu\n", buffer_len);
440     return (-1);
441   }
442
443   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
444   buffer += sizeof (tmp16);
445   pkg_type = ntohs (tmp16);
446
447   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
448   buffer += sizeof (tmp16);
449   pkg_length = ntohs (tmp16);
450
451   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
452   buffer += sizeof (tmp16);
453   pkg_numval = ntohs (tmp16);
454
455   assert (pkg_type == TYPE_VALUES);
456
457   exp_size = 3 * sizeof (uint16_t)
458     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
459   if ((buffer_len < 0) || (buffer_len < exp_size))
460   {
461     noitL(noit_error, "collectd: parse_part_values: "
462         "Packet too short: "
463         "Chunk of size %zu expected, "
464         "but buffer has only %zu bytes left.\n",
465         exp_size, buffer_len);
466     return (-1);
467   }
468
469   if (pkg_length != exp_size)
470   {
471     noitL(noit_debug, "collectd: parse_part_values: "
472         "Length and number of values "
473         "in the packet don't match.\n");
474     return (-1);
475   }
476
477   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
478   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
479   if ((pkg_types == NULL) || (pkg_values == NULL))
480   {
481     sfree (pkg_types);
482     sfree (pkg_values);
483     noitL(noit_error, "collectd: parse_part_values: malloc failed.\n");
484     return (-1);
485   }
486
487   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
488   buffer += pkg_numval * sizeof (uint8_t);
489   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
490   buffer += pkg_numval * sizeof (value_t);
491
492   for (i = 0; i < pkg_numval; i++)
493   {
494     switch (pkg_types[i])
495     {
496       case DS_TYPE_COUNTER:
497         pkg_values[i].counter = (counter_t) collectd_ntohll (pkg_values[i].counter);
498         break;
499
500       case DS_TYPE_GAUGE:
501         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
502         break;
503
504       case DS_TYPE_DERIVE:
505         pkg_values[i].derive = (derive_t) collectd_ntohll (pkg_values[i].derive);
506         break;
507
508       case DS_TYPE_ABSOLUTE:
509         pkg_values[i].absolute = (absolute_t) collectd_ntohll (pkg_values[i].absolute);
510         break;
511
512       default:
513         noitL(noit_debug, "collectd: parse_part_values: "
514       "Don't know how to handle data source type %"PRIu8 "\n",
515       pkg_types[i]);
516         sfree (pkg_types);
517         sfree (pkg_values);
518         return (-1);
519     } /* switch (pkg_types[i]) */
520   }
521
522   *ret_buffer     = buffer;
523   *ret_buffer_len = buffer_len - pkg_length;
524   *ret_num_values = pkg_numval;
525   *ret_types      = pkg_types;
526   *ret_values     = pkg_values;
527
528
529   return (0);
530 } /* int parse_part_values */
531
532 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
533     uint64_t *value)
534 {
535   char *buffer = *ret_buffer;
536   size_t buffer_len = *ret_buffer_len;
537
538   uint16_t tmp16;
539   uint64_t tmp64;
540   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
541
542   uint16_t pkg_length;
543
544   if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
545   {
546     noitL(noit_error, "collectd: parse_part_number: "
547         "Packet too short: "
548         "Chunk of size %zu expected, "
549         "but buffer has only %zu bytes left.\n",
550         exp_size, buffer_len);
551     return (-1);
552   }
553
554   /* skip pkg_type */
555   buffer += sizeof (tmp16);
556
557   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
558   buffer += sizeof (tmp16);
559   pkg_length = ntohs (tmp16);
560
561   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
562   buffer += sizeof (tmp64);
563   *value = collectd_ntohll (tmp64);
564
565   *ret_buffer = buffer;
566   *ret_buffer_len = buffer_len - pkg_length;
567
568   return (0);
569 } /* int parse_part_number */
570
571 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
572     char *output, int output_len)
573 {
574   char *buffer = *ret_buffer;
575   size_t buffer_len = *ret_buffer_len;
576
577   uint16_t tmp16;
578   size_t header_size = 2 * sizeof (uint16_t);
579
580   uint16_t pkg_length;
581
582   if ((buffer_len < 0) || (buffer_len < header_size))
583   {
584     noitL(noit_error, "collectd: parse_part_string: "
585         "Packet too short: "
586         "Chunk of at least size %zu expected, "
587         "but buffer has only %zu bytes left.\n",
588         header_size, buffer_len);
589     return (-1);
590   }
591
592   /* skip pkg_type */
593   buffer += sizeof (tmp16);
594
595   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
596   buffer += sizeof (tmp16);
597   pkg_length = ntohs (tmp16);
598
599   /* Check that packet fits in the input buffer */
600   if (pkg_length > buffer_len)
601   {
602     noitL(noit_error, "collectd: parse_part_string: "
603         "Packet too big: "
604         "Chunk of size %"PRIu16" received, "
605         "but buffer has only %zu bytes left.\n",
606         pkg_length, buffer_len);
607     return (-1);
608   }
609
610   /* Check that pkg_length is in the valid range */
611   if (pkg_length <= header_size)
612   {
613     noitL(noit_error, "collectd: parse_part_string: "
614         "Packet too short: "
615         "Header claims this packet is only %hu "
616         "bytes long.\n", pkg_length);
617     return (-1);
618   }
619
620   /* Check that the package data fits into the output buffer.
621    * The previous if-statement ensures that:
622    * `pkg_length > header_size' */
623   if ((output_len < 0)
624       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
625   {
626     noitL(noit_error, "collectd: parse_part_string: "
627         "Output buffer too small.\n");
628     return (-1);
629   }
630
631   /* All sanity checks successfull, let's copy the data over */
632   output_len = pkg_length - header_size;
633   memcpy ((void *) output, (void *) buffer, output_len);
634   buffer += output_len;
635
636   /* For some very weird reason '\0' doesn't do the trick on SPARC in
637    * this statement. */
638   if (output[output_len - 1] != 0)
639   {
640     noitL(noit_error, "collectd: parse_part_string: "
641         "Received string does not end "
642         "with a NULL-byte.\n");
643     return (-1);
644   }
645
646   *ret_buffer = buffer;
647   *ret_buffer_len = buffer_len - pkg_length;
648
649   return (0);
650 } /* int parse_part_string */
651
652
653 #define PP_SIGNED    0x01
654 #define PP_ENCRYPTED 0x02
655
656 #define BUFFER_READ(p,s) do { \
657   memcpy ((p), buffer + buffer_offset, (s)); \
658   buffer_offset += (s); \
659 } while (0)
660
661
662
663 // Forward declare
664 static int parse_packet (
665     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
666     void *buffer, size_t buffer_size, int flags);
667
668
669 static int parse_part_sign_sha256 (collectd_closure_t *ccl, noit_module_t *self,
670     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len, int flags)
671 {
672   unsigned char *buffer;
673   size_t buffer_len;
674   size_t buffer_offset;
675
676   size_t username_len;
677   part_signature_sha256_t pss;
678   uint16_t pss_head_length;
679   unsigned char hash[sizeof (pss.hash)];
680
681   unsigned char *hash_ptr;
682   unsigned int length;
683
684   buffer = *ret_buffer;
685   buffer_len = *ret_buffer_len;
686   buffer_offset = 0;
687
688   if (ccl->username == NULL)
689   {
690     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
691         "it because no user has been configured. Will accept it.\n");
692     return (0);
693   }
694
695   if (ccl->secret == NULL)
696   {
697     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
698         "it because no secret has been configured. Will accept it.\n");
699     return (0);
700   }
701
702   /* Check if the buffer has enough data for this structure. */
703   if (buffer_len <= PART_SIGNATURE_SHA256_SIZE)
704     return (-ENOMEM);
705
706   /* Read type and length header */
707   BUFFER_READ (&pss.head.type, sizeof (pss.head.type));
708   BUFFER_READ (&pss.head.length, sizeof (pss.head.length));
709   pss_head_length = ntohs (pss.head.length);
710
711   /* Check if the `pss_head_length' is within bounds. */
712   if ((pss_head_length <= PART_SIGNATURE_SHA256_SIZE)
713       || (pss_head_length > buffer_len))
714   {
715     noitL(noit_error, "collectd: HMAC-SHA-256 with invalid length received.\n");
716     return (-1);
717   }
718
719   /* Copy the hash. */
720   BUFFER_READ (pss.hash, sizeof (pss.hash));
721
722   /* Calculate username length (without null byte) and allocate memory */
723   username_len = pss_head_length - PART_SIGNATURE_SHA256_SIZE;
724   pss.username = malloc (username_len + 1);
725   if (pss.username == NULL)
726     return (-ENOMEM);
727
728   /* Read the username */
729   BUFFER_READ (pss.username, username_len);
730   pss.username[username_len] = 0;
731
732   assert (buffer_offset == pss_head_length);
733
734   /* Match up the username with the expected username */
735   if (strcmp(ccl->username, pss.username) != 0)
736   {
737     noitL(noit_error, "collectd: User: %s and Given User: %s don't match\n", ccl->username, pss.username);
738     sfree (pss.username);
739     return (-ENOENT);
740   }
741
742   /* Create a hash device and check the HMAC */
743   hash_ptr = HMAC(EVP_sha256(), ccl->secret, strlen(ccl->secret),
744       buffer     + PART_SIGNATURE_SHA256_SIZE,
745       buffer_len - PART_SIGNATURE_SHA256_SIZE,
746       hash,         &length);
747   if (hash_ptr == NULL)
748   {
749     noitL(noit_error, "collectd: Creating HMAC-SHA-256 object failed.\n");
750     sfree (pss.username);
751     return (-1);
752   }
753
754   /* Clean up */
755   sfree (pss.username);
756
757   if (memcmp (pss.hash, hash, sizeof (pss.hash)) != 0)
758   {
759     noitL(noit_error, "collectd: Verifying HMAC-SHA-256 signature failed: "
760         "Hash mismatch.\n");
761   }
762   else
763   {
764     parse_packet (ccl, self, check, buffer + buffer_offset, buffer_len - buffer_offset,
765         flags | PP_SIGNED);
766   }
767
768   *ret_buffer = buffer + buffer_len;
769   *ret_buffer_len = 0;
770
771   return (0);
772 } /* }}} int parse_part_sign_sha256 */
773 /* #endif HAVE_LIBGCRYPT */
774
775 static int parse_part_encr_aes256 (collectd_closure_t *ccl, noit_module_t *self,
776     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len,
777     int flags)
778 {
779   unsigned char  *buffer = *ret_buffer;
780   size_t buffer_len = *ret_buffer_len;
781   size_t payload_len;
782   size_t part_size;
783   size_t buffer_offset;
784   int    tmpbufsize;
785   unsigned char   *tmpbuf;
786   uint16_t username_len;
787   part_encryption_aes256_t pea;
788   unsigned char hash[sizeof (pea.hash)];
789   unsigned int hash_length;
790
791   EVP_CIPHER_CTX *ctx;
792   EVP_MD_CTX ctx_md;
793   int err;
794
795   /* Make sure at least the header if available. */
796   if (buffer_len <= PART_ENCRYPTION_AES256_SIZE)
797   {
798     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
799         "Discarding short packet.\n");
800     return (-1);
801   }
802
803   buffer_offset = 0;
804
805   /* Copy the unencrypted information into `pea'. */
806   BUFFER_READ (&pea.head.type, sizeof (pea.head.type));
807   BUFFER_READ (&pea.head.length, sizeof (pea.head.length));
808
809   /* Check the `part size'. */
810   part_size = ntohs (pea.head.length);
811   if ((part_size <= PART_ENCRYPTION_AES256_SIZE)
812       || (part_size > buffer_len))
813   {
814     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
815         "Discarding part with invalid size.\n");
816     return (-1);
817   }
818
819   /* Read the username */
820   BUFFER_READ (&username_len, sizeof (username_len));
821   username_len = ntohs (username_len);
822
823   if ((username_len <= 0)
824       || (username_len > (part_size - (PART_ENCRYPTION_AES256_SIZE + 1))))
825   {
826     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
827         "Discarding part with invalid username length.\n");
828     return (-1);
829   }
830
831   assert (username_len > 0);
832   pea.username = malloc (username_len + 1);
833   if (pea.username == NULL)
834     return (-ENOMEM);
835   BUFFER_READ (pea.username, username_len);
836   pea.username[username_len] = 0;
837
838   /* Last but not least, the initialization vector */
839   BUFFER_READ (pea.iv, sizeof (pea.iv));
840
841   /* Make sure we are at the right position */
842   assert (buffer_offset == (username_len +
843         PART_ENCRYPTION_AES256_SIZE - sizeof (pea.hash)));
844
845   /* Match up the username with the expected username */
846   if (strcmp(ccl->username, pea.username) != 0)
847   {
848     noitL(noit_error, "collectd: Username received and server side username don't match\n");
849     sfree (pea.username);
850     return (-ENOENT);
851   }
852
853   ctx = network_get_aes256_cypher (ccl, pea.iv, sizeof (pea.iv),
854       pea.username);
855   if (ctx == NULL)
856     return (-1);
857
858   payload_len = part_size - (PART_ENCRYPTION_AES256_SIZE + username_len);
859   assert (payload_len > 0);
860   tmpbuf = malloc(part_size - buffer_offset);
861
862   /* Decrypt the packet */
863   err = EVP_DecryptUpdate(ctx,
864       tmpbuf, &tmpbufsize,
865       buffer    + buffer_offset,
866       part_size - buffer_offset);
867   if (err != 1)
868   {
869     noitL(noit_error, "collectd: openssl returned: %d\n", err);
870     return (-1);
871   }
872
873   assert(part_size - buffer_offset == tmpbufsize);
874   /* Make it appear to be in place */
875   memcpy(buffer + buffer_offset, tmpbuf, part_size - buffer_offset);
876   sfree(tmpbuf);
877
878   /* Read the hash */
879   BUFFER_READ (pea.hash, sizeof (pea.hash));
880
881   /* Make sure we're at the right position - again */
882   assert (buffer_offset == (username_len + PART_ENCRYPTION_AES256_SIZE));
883   assert (buffer_offset == (part_size - payload_len));
884
885   /* Check hash sum */
886   memset (hash, 0, sizeof (hash));
887   EVP_DigestInit(&ctx_md, EVP_sha1());
888   EVP_DigestUpdate(&ctx_md, buffer + buffer_offset, payload_len);
889   EVP_DigestFinal(&ctx_md, hash, &hash_length);
890   if (memcmp (hash, pea.hash, sizeof (hash)) != 0)
891   {
892     noitL(noit_error, "collectd: Decryption failed: Checksum mismatch.\n");
893     return (-1);
894   }
895
896   parse_packet (ccl, self, check, buffer + buffer_offset, payload_len,
897       flags | PP_ENCRYPTED);
898
899   /* Update return values */
900   *ret_buffer =     buffer     + part_size;
901   *ret_buffer_len = buffer_len - part_size;
902   sfree(pea.username);
903
904   return (0);
905 } /* }}} int parse_part_encr_aes256 */
906
907
908 #undef BUFFER_READ
909
910
911 static int parse_packet (/* {{{ */
912     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
913     void *buffer, size_t buffer_size, int flags)
914 {
915   int status;
916
917   int packet_was_signed = (flags & PP_SIGNED);
918   int packet_was_encrypted = (flags & PP_ENCRYPTED);
919   int printed_ignore_warning = 0;
920
921 #define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL }
922   value_list_t vl = VALUE_LIST_INIT;
923   notification_t n;
924
925   memset (&vl, '\0', sizeof (vl));
926   memset (&n, '\0', sizeof (n));
927   status = 0;
928
929   while ((status == 0) && (0 < buffer_size)
930       && ((unsigned int) buffer_size > sizeof (part_header_t)))
931   {
932     uint16_t pkg_length;
933     uint16_t pkg_type;
934
935     memcpy ((void *) &pkg_type,
936         (void *) buffer,
937         sizeof (pkg_type));
938     memcpy ((void *) &pkg_length,
939         (void *) ((char *)buffer + sizeof (pkg_type)),
940         sizeof (pkg_length));
941
942     pkg_length = ntohs (pkg_length);
943     pkg_type = ntohs (pkg_type);
944
945     if (pkg_length > buffer_size)
946       break;
947     /* Ensure that this loop terminates eventually */
948     if (pkg_length < (2 * sizeof (uint16_t)))
949       break;
950
951
952     if (pkg_type == TYPE_ENCR_AES256)
953     {
954       status = parse_part_encr_aes256 (ccl, self, check,
955           &buffer, &buffer_size, flags);
956       if (status != 0)
957       {
958         noitL(noit_error, "collectd: Decrypting AES256 "
959             "part failed "
960             "with status %i.\n", status);
961         break;
962       }
963     }
964     else if ((ccl->security_level == SECURITY_LEVEL_ENCRYPT)
965         && (packet_was_encrypted == 0))
966     {
967       if (printed_ignore_warning == 0)
968       {
969         noitL(noit_debug, "collectd: Unencrypted packet or "
970             "part has been ignored.\n");
971         printed_ignore_warning = 1;
972       }
973       buffer = ((char *) buffer) + pkg_length;
974       continue;
975     }
976     else if (pkg_type == TYPE_SIGN_SHA256)
977     {
978       status = parse_part_sign_sha256 (ccl, self, check,
979                                         &buffer, &buffer_size, flags);
980       if (status != 0)
981       {
982         noitL(noit_error, "collectd: Verifying HMAC-SHA-256 "
983             "signature failed "
984             "with status %i.\n", status);
985         break;
986       }
987     }
988     else if ((ccl->security_level == SECURITY_LEVEL_SIGN)
989         && (packet_was_encrypted == 0)
990         && (packet_was_signed == 0))
991     {
992       if (printed_ignore_warning == 0)
993       {
994         noitL(noit_debug, "collectd: Unsigned packet or "
995             "part has been ignored.\n");
996         printed_ignore_warning = 1;
997       }
998       buffer = ((char *) buffer) + pkg_length;
999       continue;
1000     }
1001     else if (pkg_type == TYPE_VALUES)
1002     {
1003       status = parse_part_values (&buffer, &buffer_size,
1004           &vl.values, &vl.types, &vl.values_len);
1005
1006       if (status != 0)
1007         break;
1008
1009       if ((vl.time > 0)
1010           && (strlen (vl.host) > 0)
1011           && (strlen (vl.plugin) > 0)
1012           && (strlen (vl.type) > 0))
1013       {
1014         queue_values(ccl, self, check, &vl);
1015       }
1016       else
1017       {
1018         noitL(noit_error,
1019             "collectd:  NOT dispatching values\n");
1020       }
1021
1022       sfree (vl.values);
1023       sfree (vl.types);
1024     }
1025     else if (pkg_type == TYPE_TIME)
1026     {
1027       uint64_t tmp = 0;
1028       status = parse_part_number (&buffer, &buffer_size,
1029           &tmp);
1030       if (status == 0)
1031       {
1032         vl.time = (time_t) tmp;
1033         n.time = (time_t) tmp;
1034       }
1035     }
1036     else if (pkg_type == TYPE_INTERVAL)
1037     {
1038       uint64_t tmp = 0;
1039       status = parse_part_number (&buffer, &buffer_size,
1040           &tmp);
1041       if (status == 0)
1042         vl.interval = (int) tmp;
1043     }
1044     else if (pkg_type == TYPE_HOST)
1045     {
1046       status = parse_part_string (&buffer, &buffer_size,
1047           vl.host, sizeof (vl.host));
1048       if (status == 0)
1049         sstrncpy (n.host, vl.host, sizeof (n.host));
1050     }
1051     else if (pkg_type == TYPE_PLUGIN)
1052     {
1053       status = parse_part_string (&buffer, &buffer_size,
1054           vl.plugin, sizeof (vl.plugin));
1055       if (status == 0)
1056         sstrncpy (n.plugin, vl.plugin,
1057             sizeof (n.plugin));
1058     }
1059     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
1060     {
1061       status = parse_part_string (&buffer, &buffer_size,
1062           vl.plugin_instance,
1063           sizeof (vl.plugin_instance));
1064       if (status == 0)
1065         sstrncpy (n.plugin_instance,
1066             vl.plugin_instance,
1067             sizeof (n.plugin_instance));
1068     }
1069     else if (pkg_type == TYPE_TYPE)
1070     {
1071       status = parse_part_string (&buffer, &buffer_size,
1072           vl.type, sizeof (vl.type));
1073       if (status == 0)
1074         sstrncpy (n.type, vl.type, sizeof (n.type));
1075     }
1076     else if (pkg_type == TYPE_TYPE_INSTANCE)
1077     {
1078       status = parse_part_string (&buffer, &buffer_size,
1079           vl.type_instance,
1080           sizeof (vl.type_instance));
1081       if (status == 0)
1082         sstrncpy (n.type_instance, vl.type_instance,
1083             sizeof (n.type_instance));
1084     }
1085     else if (pkg_type == TYPE_MESSAGE)
1086     {
1087       status = parse_part_string (&buffer, &buffer_size,
1088           n.message, sizeof (n.message));
1089
1090       if (status != 0)
1091       {
1092         /* do nothing */
1093       }
1094       else if ((n.severity != NOTIF_FAILURE)
1095           && (n.severity != NOTIF_WARNING)
1096           && (n.severity != NOTIF_OKAY))
1097       {
1098         noitL(noit_error, "collectd: "
1099             "Ignoring notification with "
1100             "unknown severity %i.\n",
1101             n.severity);
1102       }
1103       else if (n.time <= 0)
1104       {
1105         noitL(noit_error, "collectd: "
1106             "Ignoring notification with "
1107             "time == 0.\n");
1108       }
1109       else if (strlen (n.message) <= 0)
1110       {
1111         noitL(noit_error, "collectd: "
1112             "Ignoring notification with "
1113             "an empty message.\n");
1114       }
1115       else
1116       {
1117         queue_notifications(ccl, self, check, &n);
1118         noitL(noit_error, "collectd: "
1119             "DISPATCH NOTIFICATION\n");
1120       }
1121     }
1122     else if (pkg_type == TYPE_SEVERITY)
1123     {
1124       uint64_t tmp = 0;
1125       status = parse_part_number (&buffer, &buffer_size,
1126           &tmp);
1127       if (status == 0)
1128         n.severity = (int) tmp;
1129     }
1130     else
1131     {
1132       noitL(noit_error, "collectd: parse_packet: Unknown part"
1133           " type: 0x%04hx\n", pkg_type);
1134       buffer = ((char *) buffer) + pkg_length;
1135     }
1136   } /* while (buffer_size > sizeof (part_header_t)) */
1137
1138   return (status);
1139 } /* }}} int parse_packet */
1140
1141
1142 // Not proud of this at all however; I am not sure best how to address this.
1143 static int infer_type(char *buffer, int buffer_len, value_list_t *vl, int index) {
1144   int len = strlen(buffer);
1145   strcat(buffer, "`");
1146   if (cmp_type(vl, "load", "load")) {
1147     assert(vl->values_len == 3);
1148     switch (index) {
1149       case 0:
1150         strcat(buffer, "1min"); break;
1151       case 1:
1152         strcat(buffer, "5min"); break;
1153       case 2:
1154         strcat(buffer, "15min"); break;
1155     }
1156   } else if (cmp_plugin(vl, "interface")) {
1157     assert(vl->values_len == 2);
1158     switch (index) {
1159       case 0:
1160         strcat(buffer, "rx"); break;
1161       case 1:
1162         strcat(buffer, "tx"); break;
1163     }
1164   } else {
1165     char buf[20];
1166     snprintf(buf, sizeof(buf), "%d", index);
1167     strcat(buffer, buf);
1168     noitL(noit_debug, "collectd: parsing multiple values"
1169         " and guessing on the type for plugin[%s] and type[%s]"
1170         , vl->plugin, vl->type);
1171   }
1172   return len;
1173 }
1174
1175 static void concat_metrics(char *buffer, char* plugin, char* plugin_inst, char* type, char* type_inst) {
1176   strcpy(buffer, plugin);
1177
1178   if (strlen(plugin_inst)) {
1179     strcat(buffer, "`");
1180     strcat(buffer, plugin_inst);
1181   }
1182   if (strlen(type)) {
1183     strcat(buffer, "`");
1184     strcat(buffer, type);
1185   }
1186   if (strlen(type_inst)) {
1187     strcat(buffer, "`");
1188     strcat(buffer, type_inst);
1189   }
1190 }
1191
1192 static int queue_notifications(collectd_closure_t *ccl,
1193       noit_module_t *self, noit_check_t *check, notification_t *n) {
1194   stats_t current;
1195   char buffer[DATA_MAX_NAME_LEN*4 + 128];
1196   collectd_mod_config_t *conf;
1197   conf = noit_module_get_userdata(self);
1198
1199   if(!conf->support_notifications) return 0;
1200   /* We are passive, so we don't do anything for transient checks */
1201   if(check->flags & NP_TRANSIENT) return 0;
1202
1203   noit_check_stats_clear(&current);
1204   gettimeofday(&current.whence, NULL);
1205
1206   // Concat all the names together so they fit into the flat noitd model
1207   concat_metrics(buffer, n->plugin, n->plugin_instance, n->type, n->type_instance);
1208   noit_stats_set_metric(&ccl->current, buffer, METRIC_STRING, n->message);
1209   noit_check_passive_set_stats(self, check, &current);
1210   noitL(nldeb, "collectd: dispatch_notifications(%s, %s, %s)\n",check->target, buffer, n->message);
1211   return 0;
1212 }
1213
1214
1215 static int queue_values(collectd_closure_t *ccl,
1216       noit_module_t *self, noit_check_t *check, value_list_t *vl) {
1217   char buffer[DATA_MAX_NAME_LEN*4 + 4 + 1 + 20];
1218   int i, len = 0;
1219
1220   // Concat all the names together so they fit into the flat noitd model
1221   concat_metrics(buffer, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1222   for (i=0; i < vl->values_len; i++) {
1223  
1224     // Only infer the type if the amount of values is greater than one
1225     if (vl->values_len > 1) {
1226       // Trunc the string 
1227       if (len > 0)
1228         buffer[len] = 0;
1229       len = infer_type(buffer, sizeof(buffer), vl, i);
1230     }
1231
1232     switch (vl->types[i])
1233     {
1234       case DS_TYPE_COUNTER:
1235         noit_stats_set_metric(&ccl->current, buffer, METRIC_UINT64, &vl->values[i].counter);
1236         break;
1237
1238       case DS_TYPE_GAUGE:
1239         noit_stats_set_metric(&ccl->current, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1240         break;
1241
1242       case DS_TYPE_DERIVE:
1243         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].derive);
1244         break;
1245
1246       case DS_TYPE_ABSOLUTE:
1247         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].absolute);
1248         break;
1249
1250       default:
1251         noitL(noit_debug, "collectd: parse_part_values: "
1252               "Don't know how to handle data source type %"PRIu8 "\n",
1253               vl->types[i]);
1254         return (-1);
1255     } /* switch (value_types[i]) */
1256     ccl->stats_count++;
1257     noitL(nldeb, "collectd: queue_values(%s, %s)\n", buffer, check->target);
1258   }
1259   return 0;
1260 }
1261
1262 static void clear_closure(collectd_closure_t *ccl) {
1263   ccl->stats_count = 0;
1264   ccl->ntfy_count = 0;
1265   noit_check_stats_clear(&ccl->current);
1266
1267 }
1268
1269 static int collectd_submit(noit_module_t *self, noit_check_t *check) {
1270   collectd_closure_t *ccl;
1271   struct timeval duration;
1272   /* We are passive, so we don't do anything for transient checks */
1273   if(check->flags & NP_TRANSIENT) return 0;
1274
1275   if(!check->closure) {
1276     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1277     memset(ccl, 0, sizeof(collectd_closure_t));
1278   } else {
1279     // Don't count the first run
1280     char human_buffer[256];
1281     ccl = (collectd_closure_t*)check->closure;
1282     gettimeofday(&ccl->current.whence, NULL);
1283     sub_timeval(ccl->current.whence, check->last_fire_time, &duration);
1284     ccl->current.duration = duration.tv_sec; // + duration.tv_usec / (1000 * 1000);
1285
1286     snprintf(human_buffer, sizeof(human_buffer),
1287              "dur=%d,run=%d,stats=%d,ntfy=%d", ccl->current.duration,
1288              check->generation, ccl->stats_count, ccl->ntfy_count);
1289     noitL(nldeb, "collectd(%s) [%s]\n", check->target, human_buffer);
1290
1291     // Not sure what to do here
1292     ccl->current.available = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1293         NP_AVAILABLE : NP_UNAVAILABLE;
1294     ccl->current.state = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1295         NP_GOOD : NP_BAD;
1296     ccl->current.status = human_buffer;
1297     noit_check_passive_set_stats(self, check, &ccl->current);
1298
1299     memcpy(&check->last_fire_time, &ccl->current.whence, sizeof(duration));
1300   }
1301   clear_closure(ccl);
1302   return 0;
1303 }
1304
1305 static int noit_collectd_handler(eventer_t e, int mask, void *closure,
1306                              struct timeval *now) {
1307   union {
1308     struct sockaddr_in  skaddr;
1309     struct sockaddr_in6 skaddr6;
1310   } remote;
1311   char packet[1500];
1312   int packet_len = sizeof(packet);
1313   unsigned int from_len;
1314   char ip_p[INET6_ADDRSTRLEN];
1315   noit_module_t *self = (noit_module_t *)closure;
1316   collectd_mod_config_t *conf;
1317   conf = noit_module_get_userdata(self);
1318
1319
1320   // Get the username and password of the string
1321
1322   while(1) {
1323     int inlen;
1324     noit_check_t *check;
1325     collectd_closure_t *ccl;
1326     char *security_buffer;
1327
1328     from_len = sizeof(remote);
1329
1330     inlen = recvfrom(e->fd, packet, packet_len, 0,
1331                      (struct sockaddr *)&remote, &from_len);
1332     gettimeofday(now, NULL); /* set it, as we care about accuracy */
1333
1334     if(inlen < 0) {
1335       if(errno == EAGAIN || errno == EINTR) break;
1336       noitLT(nlerr, now, "collectd: recvfrom: %s\n", strerror(errno));
1337       break;
1338     }
1339     if (from_len == sizeof(remote.skaddr)) {
1340       if (!inet_ntop(AF_INET, &(remote.skaddr.sin_addr), ip_p, INET_ADDRSTRLEN)) {
1341         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1342         break;
1343       }
1344     }
1345     else if(from_len == sizeof(remote.skaddr6)) {
1346       if (!inet_ntop(AF_INET6, &(remote.skaddr6.sin6_addr), ip_p, INET6_ADDRSTRLEN)) {
1347         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1348         break;
1349       }
1350     }
1351     else {
1352       noitLT(nlerr, now, "collectd: could not determine address family of remote\n");
1353       break;
1354     }
1355     check = noit_poller_lookup_by_name(ip_p, "collectd");
1356     if (!check) {
1357       noitL(nlerr, "collectd: No defined check from ip [%s].\n", ip_p);
1358       break;
1359     }
1360
1361     // If its a new check retrieve some values
1362     if (check->closure == NULL) {
1363       // TODO: Verify if it could somehow retrieve data before the check closure exists
1364       ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1365       memset(ccl, 0, sizeof(collectd_closure_t));
1366     } else {
1367       ccl = (collectd_closure_t*)check->closure;
1368     }
1369     // Default to NONE
1370     ccl->security_level = SECURITY_LEVEL_NONE;
1371     if (noit_hash_retr_str(check->config, "security_level", strlen("security_level"),
1372                            (const char**)&security_buffer))
1373     {
1374       ccl->security_level = atoi(security_buffer);
1375     }
1376
1377     // Is this outside to keep updates happening?
1378     if (!noit_hash_retr_str(check->config, "username", strlen("username"),
1379                            (const char**)&ccl->username) &&
1380         !noit_hash_retr_str(conf->options, "username", strlen("username"),
1381                            (const char**)&ccl->username))
1382     {
1383       if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1384         noitL(nlerr, "collectd: no username defined for check.\n");
1385         goto cleanup;
1386       } else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1387         noitL(nlerr, "collectd: no username defined for check, "
1388             "will accept any signed packet.\n");
1389       }
1390     }
1391
1392     if(!ccl->secret)
1393       noit_hash_retr_str(check->config, "secret", strlen("secret"),
1394                          (const char**)&ccl->secret);
1395     if(!ccl->secret)
1396       noit_hash_retr_str(conf->options, "secret", strlen("secret"),
1397                          (const char**)&ccl->secret);
1398     if(!ccl->secret) {
1399       if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1400         noitL(nlerr, "collectd: no secret defined for check.\n");
1401         goto cleanup;
1402       }
1403       else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1404         noitL(nlerr, "collectd: no secret defined for check, "
1405             "will accept any signed packet.\n");
1406       }
1407     }
1408
1409     parse_packet(ccl, self, check, packet, inlen, 0);
1410   }
1411   return EVENTER_READ | EVENTER_EXCEPTION;
1412
1413 cleanup:
1414   return 0;
1415 }
1416
1417
1418 static int noit_collectd_initiate_check(noit_module_t *self,
1419                                         noit_check_t *check,
1420                                         int once, noit_check_t *cause) {
1421   /* The idea is to write the collectd stuff to the stats one every period
1422    * Then we can warn people if no stats where written in a period of time
1423    */
1424   INITIATE_CHECK(collectd_submit, self, check);
1425   return 0;
1426 }
1427
1428 static int noit_collectd_config(noit_module_t *self, noit_hash_table *options) {
1429   collectd_mod_config_t *conf;
1430   conf = noit_module_get_userdata(self);
1431   if(conf) {
1432     if(conf->options) {
1433       noit_hash_destroy(conf->options, free, free);
1434       free(conf->options);
1435     }
1436   }
1437   else
1438     conf = calloc(1, sizeof(*conf));
1439   conf->options = options;
1440   noit_module_set_userdata(self, conf);
1441   return 1;
1442 }
1443
1444 static int noit_collectd_onload(noit_image_t *self) {
1445   if(!nlerr) nlerr = noit_log_stream_find("error/collectd");
1446   if(!nldeb) nldeb = noit_log_stream_find("debug/collectd");
1447   if(!nlerr) nlerr = noit_error;
1448   if(!nldeb) nldeb = noit_debug;
1449   eventer_name_callback("noit_collectd/handler", noit_collectd_handler);
1450   return 0;
1451 }
1452
1453 static int noit_collectd_init(noit_module_t *self) {
1454   const char *config_val;
1455   int sockaddr_len;
1456   collectd_mod_config_t *conf;
1457   conf = noit_module_get_userdata(self);
1458   int portint = 0;
1459   struct sockaddr_in skaddr;
1460   struct sockaddr_in6 skaddr6;
1461   struct in6_addr in6addr_any = IN6ADDR_ANY_INIT;
1462   const char *host;
1463   unsigned short port;
1464
1465   conf->support_notifications = noit_true;
1466   if(noit_hash_retr_str(conf->options,
1467                         "notifications", strlen("notifications"),
1468                         (const char **)&config_val)) {
1469     if(!strcasecmp(config_val, "false") || !strcasecmp(config_val, "off"))
1470       conf->support_notifications = noit_false;
1471   }
1472   /* Default Collectd port */
1473   portint = NET_DEFAULT_PORT;
1474   if(noit_hash_retr_str(conf->options,
1475                          "collectd_port", strlen("collectd_port"),
1476                          (const char**)&config_val))
1477     portint = atoi(config_val);
1478
1479
1480   if(!noit_hash_retr_str(conf->options,
1481                          "collectd_host", strlen("collectd_host"),
1482                          (const char**)&host))
1483     host = "*";
1484
1485   port = (unsigned short) portint;
1486
1487   conf->ipv4_fd = conf->ipv6_fd = -1;
1488
1489   conf->ipv4_fd = socket(PF_INET, SOCK_DGRAM, 0);
1490   if(conf->ipv4_fd < 0) {
1491     noitL(noit_error, "collectd: socket failed: %s\n",
1492           strerror(errno));
1493   }
1494   else {
1495     if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
1496       close(conf->ipv4_fd);
1497       conf->ipv4_fd = -1;
1498       noitL(noit_error,
1499             "collectd: could not set socket non-blocking: %s\n",
1500             strerror(errno));
1501     }
1502   }
1503   skaddr.sin_family = AF_INET;
1504   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
1505   skaddr.sin_port = htons(port);
1506
1507   sockaddr_len = sizeof(skaddr);
1508   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sockaddr_len) < 0) {
1509     noitL(noit_error, "bind failed[%s]: %s\n", host, strerror(errno));
1510     close(conf->ipv4_fd);
1511     return -1;
1512   }
1513
1514   if(conf->ipv4_fd >= 0) {
1515     eventer_t newe;
1516     newe = eventer_alloc();
1517     newe->fd = conf->ipv4_fd;
1518     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
1519     newe->callback = noit_collectd_handler;
1520     newe->closure = self;
1521     eventer_add(newe);
1522   }
1523
1524   conf->ipv6_fd = socket(AF_INET6, SOCK_DGRAM, 0);
1525   if(conf->ipv6_fd < 0) {
1526     noitL(noit_error, "collectd: IPv6 socket failed: %s\n",
1527           strerror(errno));
1528   }
1529   else {
1530     if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
1531       close(conf->ipv6_fd);
1532       conf->ipv6_fd = -1;
1533       noitL(noit_error,
1534             "collectd: could not set socket non-blocking: %s\n",
1535                strerror(errno));
1536     }
1537   }
1538   sockaddr_len = sizeof(skaddr6);
1539   memset(&skaddr6, 0, sizeof(skaddr6));
1540   skaddr6.sin6_family = AF_INET6;
1541   skaddr6.sin6_addr = in6addr_any;
1542   skaddr6.sin6_port = htons(port);
1543
1544   if(bind(conf->ipv6_fd, (struct sockaddr *)&skaddr6, sockaddr_len) < 0) {
1545     noitL(noit_error, "bind(IPv6) failed[%s]: %s\n", host, strerror(errno));
1546     close(conf->ipv6_fd);
1547     conf->ipv6_fd = -1;
1548   }
1549
1550   if(conf->ipv6_fd >= 0) {
1551     eventer_t newe;
1552     newe = eventer_alloc();
1553     newe->fd = conf->ipv6_fd;
1554     newe->mask = EVENTER_READ;
1555     newe->callback = noit_collectd_handler;
1556     newe->closure = self;
1557     eventer_add(newe);
1558   }
1559
1560   noit_module_set_userdata(self, conf);
1561   return 0;
1562 }
1563
1564 #include "collectd.xmlh"
1565 noit_module_t collectd = {
1566   {
1567     NOIT_MODULE_MAGIC,
1568     NOIT_MODULE_ABI_VERSION,
1569     "collectd",
1570     "collectd collection",
1571     collectd_xml_description,
1572     noit_collectd_onload
1573   },
1574   noit_collectd_config,
1575   noit_collectd_init,
1576   noit_collectd_initiate_check,
1577   NULL /* noit_collectd_cleanup */
1578 };
Note: See TracBrowser for help on using the browser.