root/src/modules/collectd.c

Revision 9c84154649c4102c8cc9443c29c2d39276c71b3f, 43.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #284

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009  Florian Forster
3  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
4  * Copyright (c) 2009, Dan Di Spaltro
5  * All rights reserved.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library, in file named COPYING.LGPL; if not,
19  * write to the Free Software Foundation, Inc., 59 Temple Place,
20  * Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 #include "noit_defines.h"
25
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <math.h>
31 #include <ctype.h>
32 #ifdef HAVE_SYS_FILIO_H
33 #include <sys/filio.h>
34 #endif
35 #include <openssl/evp.h>
36 #include <openssl/hmac.h>
37 #include <inttypes.h>
38 #include <arpa/inet.h>
39
40 #include "noit_module.h"
41 #include "noit_check.h"
42 #include "noit_check_tools.h"
43 #include "utils/noit_log.h"
44 #include "utils/noit_hash.h"
45
46
47 static noit_log_stream_t nlerr = NULL;
48 static noit_log_stream_t nldeb = NULL;
49 static int __collectd_initialize_once = 0;
50
51
52 typedef struct _mod_config {
53   noit_hash_table *options;
54   noit_hash_table target_sessions;
55   int ipv4_fd;
56   int ipv6_fd;
57 } collectd_mod_config_t;
58
59 typedef struct collectd_closure_s {
60   char *username;
61   char *secret;
62   int security_level;
63   EVP_CIPHER_CTX ctx;
64   stats_t current;
65   int stats_count;
66   int ntfy_count;
67 } collectd_closure_t;
68
69 #define cmp_plugin(vl, val) \
70   (strncmp(vl->plugin, val, sizeof(val)) == 0)
71
72 #define cmp_type(vl, plugin, type) \
73   (cmp_plugin(vl, plugin) && \
74    cmp_plugin(vl, type))
75
76
77 /**
78  *
79  * Collectd Structs
80  *
81  *
82  **/
83
84 union meta_value_u
85 {
86   char    *mv_string;
87   int64_t  mv_signed_int;
88   uint64_t mv_unsigned_int;
89   double   mv_double;
90   _Bool    mv_boolean;
91 };
92 typedef union meta_value_u meta_value_t;
93
94 struct meta_entry_s;
95 typedef struct meta_entry_s meta_entry_t;
96 struct meta_entry_s
97 {
98   char         *key;
99   meta_value_t  value;
100   int           type;
101   meta_entry_t *next;
102 };
103
104 struct meta_data_s
105 {
106   meta_entry_t   *head;
107   pthread_mutex_t lock;
108 };
109
110 typedef struct meta_data_s meta_data_t;
111
112
113 #define NET_DEFAULT_V4_ADDR "239.192.74.66"
114 #define NET_DEFAULT_V6_ADDR "ff18::efc0:4a42"
115 #define NET_DEFAULT_PORT    "25826"
116
117 #define TYPE_HOST            0x0000
118 #define TYPE_TIME            0x0001
119 #define TYPE_PLUGIN          0x0002
120 #define TYPE_PLUGIN_INSTANCE 0x0003
121 #define TYPE_TYPE            0x0004
122 #define TYPE_TYPE_INSTANCE   0x0005
123 #define TYPE_VALUES          0x0006
124 #define TYPE_INTERVAL        0x0007
125
126 /* Types to transmit notifications */
127 #define TYPE_MESSAGE         0x0100
128 #define TYPE_SEVERITY        0x0101
129
130 #define TYPE_SIGN_SHA256     0x0200
131 #define TYPE_ENCR_AES256     0x0210
132
133 #define DATA_MAX_NAME_LEN 64
134
135 #define NOTIF_MAX_MSG_LEN 256
136 #define NOTIF_FAILURE 1
137 #define NOTIF_WARNING 2
138 #define NOTIF_OKAY    4
139
140 #define sfree(ptr) \
141   do { \
142     if((ptr) != NULL) { \
143       free(ptr); \
144     } \
145     (ptr) = NULL; \
146   } while (0)
147
148 #define sstrncpy(a, b, c) strncpy(a, b, c)
149
150 #define SECURITY_LEVEL_NONE     0
151 #define SECURITY_LEVEL_SIGN    1
152 #define SECURITY_LEVEL_ENCRYPT 2
153
154 #define DS_TYPE_COUNTER  0
155 #define DS_TYPE_GAUGE    1
156 #define DS_TYPE_DERIVE   2
157 #define DS_TYPE_ABSOLUTE 3
158
159 #define DS_TYPE_TO_STRING(t) (t == DS_TYPE_COUNTER)     ? "counter"  : \
160         (t == DS_TYPE_GAUGE)    ? "gauge"    : \
161         (t == DS_TYPE_DERIVE)   ? "derive"   : \
162         (t == DS_TYPE_ABSOLUTE) ? "absolute" : \
163         "unknown"
164
165 #define BUFF_SIZE 1024
166
167 typedef unsigned long long counter_t;
168 typedef double gauge_t;
169 typedef int64_t derive_t;
170 typedef uint64_t absolute_t;
171 int  interval_g;
172
173 union value_u
174 {
175   counter_t  counter;
176   gauge_t    gauge;
177   derive_t   derive;
178   absolute_t absolute;
179 };
180 typedef union value_u value_t;
181
182 struct value_list_s
183 {
184   value_t *values;
185   int      values_len;
186   time_t   time;
187   int      interval;
188   char     host[DATA_MAX_NAME_LEN];
189   char     plugin[DATA_MAX_NAME_LEN];
190   char     plugin_instance[DATA_MAX_NAME_LEN];
191   char     type[DATA_MAX_NAME_LEN];
192   char     type_instance[DATA_MAX_NAME_LEN];
193   meta_data_t *meta;
194   uint8_t  *types;
195 };
196 typedef struct value_list_s value_list_t;
197
198
199 enum notification_meta_type_e
200 {
201   NM_TYPE_STRING,
202   NM_TYPE_SIGNED_INT,
203   NM_TYPE_UNSIGNED_INT,
204   NM_TYPE_DOUBLE,
205   NM_TYPE_BOOLEAN
206 };
207
208 typedef struct notification_meta_s
209 {
210   char name[DATA_MAX_NAME_LEN];
211   enum notification_meta_type_e type;
212   union
213   {
214     const char *nm_string;
215     int64_t nm_signed_int;
216     uint64_t nm_unsigned_int;
217     double nm_double;
218     bool nm_boolean;
219   } nm_value;
220   struct notification_meta_s *next;
221 } notification_meta_t;
222
223 typedef struct notification_s
224 {
225   int    severity;
226   time_t time;
227   char   message[NOTIF_MAX_MSG_LEN];
228   char   host[DATA_MAX_NAME_LEN];
229   char   plugin[DATA_MAX_NAME_LEN];
230   char   plugin_instance[DATA_MAX_NAME_LEN];
231   char   type[DATA_MAX_NAME_LEN];
232   char   type_instance[DATA_MAX_NAME_LEN];
233   notification_meta_t *meta;
234 } notification_t;
235
236
237
238 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
239  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
240  * +-------+-----------------------+-------------------------------+
241  * ! Ver.  !                       ! Length                        !
242  * +-------+-----------------------+-------------------------------+
243  */
244 struct part_header_s
245 {
246   uint16_t type;
247   uint16_t length;
248 };
249 typedef struct part_header_s part_header_t;
250
251 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
252  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
253  * +-------------------------------+-------------------------------+
254  * ! Type                          ! Length                        !
255  * +-------------------------------+-------------------------------+
256  * : (Length - 4) Bytes                                            :
257  * +---------------------------------------------------------------+
258  */
259 struct part_string_s
260 {
261   part_header_t *head;
262   char *value;
263 };
264 typedef struct part_string_s part_string_t;
265
266 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
267  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
268  * +-------------------------------+-------------------------------+
269  * ! Type                          ! Length                        !
270  * +-------------------------------+-------------------------------+
271  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
272  * +---------------------------------------------------------------+
273  */
274 struct part_number_s
275 {
276   part_header_t *head;
277   uint64_t *value;
278 };
279 typedef struct part_number_s part_number_t;
280
281 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
282  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
283  * +-------------------------------+-------------------------------+
284  * ! Type                          ! Length                        !
285  * +-------------------------------+---------------+---------------+
286  * ! Num of values                 ! Type0         ! Type1         !
287  * +-------------------------------+---------------+---------------+
288  * ! Value0                                                        !
289  * !                                                               !
290  * +---------------------------------------------------------------+
291  * ! Value1                                                        !
292  * !                                                               !
293  * +---------------------------------------------------------------+
294  */
295 struct part_values_s
296 {
297   part_header_t *head;
298   uint16_t *num_values;
299   uint8_t  *values_types;
300   value_t  *values;
301 };
302 typedef struct part_values_s part_values_t;
303
304 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
305  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
306  * +-------------------------------+-------------------------------+
307  * ! Type                          ! Length                        !
308  * +-------------------------------+-------------------------------+
309  * ! Hash (Bits   0 -  31)                                         !
310  * : :                                                             :
311  * ! Hash (Bits 224 - 255)                                         !
312  * +---------------------------------------------------------------+
313  */
314 /* Minimum size */
315 #define PART_SIGNATURE_SHA256_SIZE 36
316 struct part_signature_sha256_s
317 {
318   part_header_t head;
319   unsigned char hash[32];
320   char *username;
321 };
322 typedef struct part_signature_sha256_s part_signature_sha256_t;
323
324 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
325  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
326  * +-------------------------------+-------------------------------+
327  * ! Type                          ! Length                        !
328  * +-------------------------------+-------------------------------+
329  * ! Original length               ! Padding (0 - 15 bytes)        !
330  * +-------------------------------+-------------------------------+
331  * ! Hash (Bits   0 -  31)                                         !
332  * : :                                                             :
333  * ! Hash (Bits 128 - 159)                                         !
334  * +---------------------------------------------------------------+
335  */
336 /* Minimum size */
337 #define PART_ENCRYPTION_AES256_SIZE 42
338 struct part_encryption_aes256_s
339 {
340   part_header_t head;
341   uint16_t username_length;
342   char *username;
343   unsigned char iv[16];
344   /* <encrypted> */
345   unsigned char hash[20];
346   /*   <payload /> */
347   /* </encrypted> */
348 };
349 typedef struct part_encryption_aes256_s part_encryption_aes256_t;
350
351 struct receive_list_entry_s
352 {
353   char data[BUFF_SIZE];
354   int  data_len;
355   int  fd;
356   struct receive_list_entry_s *next;
357 };
358 typedef struct receive_list_entry_s receive_list_entry_t;
359
360
361 /**
362  *
363  *  END Collectd Structs
364  *
365  */
366
367 static unsigned long long collectd_ntohll (unsigned long long n)
368 {
369 #if BYTE_ORDER == BIG_ENDIAN
370   return (n);
371 #else
372   return (((unsigned long long) ntohl (n)) << 32) + ntohl (n >> 32);
373 #endif
374 } /* unsigned long long collectd_ntohll */
375
376 #define ntohd(d) (d)
377
378
379 /* Forward declare this so the parse function can use it */
380 static int queue_values(collectd_closure_t *ccl,
381   noit_module_t *self, noit_check_t *check, value_list_t *vl);
382 static int queue_notifications(collectd_closure_t *ccl,
383   noit_module_t *self, noit_check_t *check, notification_t *n);
384
385
386 static EVP_CIPHER_CTX* network_get_aes256_cypher (collectd_closure_t *ccl, /* {{{ */
387     const void *iv, size_t iv_size, const char *username)
388 {
389
390   if (ccl->username == NULL && ccl->secret == NULL)
391     return (NULL);
392   else
393   {
394     EVP_CIPHER_CTX *ctx_ptr;
395     EVP_MD_CTX ctx_md;
396     unsigned char password_hash[32];
397     unsigned int length = 0;
398     int success;
399
400     ctx_ptr = &ccl->ctx;
401
402     EVP_DigestInit(&ctx_md, EVP_sha256());
403     EVP_DigestUpdate(&ctx_md, ccl->secret, strlen(ccl->secret));
404     EVP_DigestFinal(&ctx_md, password_hash, &length);
405     EVP_MD_CTX_cleanup(&ctx_md);
406
407     assert(length <= 32);
408
409     success = EVP_DecryptInit(ctx_ptr, EVP_aes_256_ofb(), password_hash, iv);
410     if (success != 1)
411     {
412       noitL(noit_error, "collectd: EVP_DecryptInit returned: %d\n",
413           success);
414       return (NULL);
415     }
416     return (ctx_ptr);
417   }
418 } /* }}} int network_get_aes256_cypher */
419
420 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
421     value_t **ret_values, uint8_t **ret_types, int *ret_num_values)
422 {
423   char *buffer = *ret_buffer;
424   size_t buffer_len = *ret_buffer_len;
425
426   uint16_t tmp16;
427   size_t exp_size;
428   int   i;
429
430   uint16_t pkg_length;
431   uint16_t pkg_type;
432   uint16_t pkg_numval;
433
434   uint8_t *pkg_types;
435   value_t *pkg_values;
436
437   if (buffer_len < 15)
438   {
439     noitL(noit_error,"collectd: packet is too short: "
440         "buffer_len = %zu\n", buffer_len);
441     return (-1);
442   }
443
444   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
445   buffer += sizeof (tmp16);
446   pkg_type = ntohs (tmp16);
447
448   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
449   buffer += sizeof (tmp16);
450   pkg_length = ntohs (tmp16);
451
452   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
453   buffer += sizeof (tmp16);
454   pkg_numval = ntohs (tmp16);
455
456   assert (pkg_type == TYPE_VALUES);
457
458   exp_size = 3 * sizeof (uint16_t)
459     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
460   if ((buffer_len < 0) || (buffer_len < exp_size))
461   {
462     noitL(noit_error, "collectd: parse_part_values: "
463         "Packet too short: "
464         "Chunk of size %zu expected, "
465         "but buffer has only %zu bytes left.\n",
466         exp_size, buffer_len);
467     return (-1);
468   }
469
470   if (pkg_length != exp_size)
471   {
472     noitL(noit_debug, "collectd: parse_part_values: "
473         "Length and number of values "
474         "in the packet don't match.\n");
475     return (-1);
476   }
477
478   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
479   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
480   if ((pkg_types == NULL) || (pkg_values == NULL))
481   {
482     sfree (pkg_types);
483     sfree (pkg_values);
484     noitL(noit_error, "collectd: parse_part_values: malloc failed.\n");
485     return (-1);
486   }
487
488   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
489   buffer += pkg_numval * sizeof (uint8_t);
490   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
491   buffer += pkg_numval * sizeof (value_t);
492
493   for (i = 0; i < pkg_numval; i++)
494   {
495     switch (pkg_types[i])
496     {
497       case DS_TYPE_COUNTER:
498         pkg_values[i].counter = (counter_t) collectd_ntohll (pkg_values[i].counter);
499         break;
500
501       case DS_TYPE_GAUGE:
502         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
503         break;
504
505       case DS_TYPE_DERIVE:
506         pkg_values[i].derive = (derive_t) collectd_ntohll (pkg_values[i].derive);
507         break;
508
509       case DS_TYPE_ABSOLUTE:
510         pkg_values[i].absolute = (absolute_t) collectd_ntohll (pkg_values[i].absolute);
511         break;
512
513       default:
514         sfree (pkg_types);
515         sfree (pkg_values);
516         noitL(noit_debug, "collectd: parse_part_values: "
517       "Don't know how to handle data source type %"PRIu8 "\n",
518       pkg_types[i]);
519         return (-1);
520     } /* switch (pkg_types[i]) */
521   }
522
523   *ret_buffer     = buffer;
524   *ret_buffer_len = buffer_len - pkg_length;
525   *ret_num_values = pkg_numval;
526   *ret_types      = pkg_types;
527   *ret_values     = pkg_values;
528
529
530   return (0);
531 } /* int parse_part_values */
532
533 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
534     uint64_t *value)
535 {
536   char *buffer = *ret_buffer;
537   size_t buffer_len = *ret_buffer_len;
538
539   uint16_t tmp16;
540   uint64_t tmp64;
541   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
542
543   uint16_t pkg_length;
544   uint16_t pkg_type;
545
546   if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
547   {
548     noitL(noit_error, "collectd: parse_part_number: "
549         "Packet too short: "
550         "Chunk of size %zu expected, "
551         "but buffer has only %zu bytes left.\n",
552         exp_size, buffer_len);
553     return (-1);
554   }
555
556   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
557   buffer += sizeof (tmp16);
558   pkg_type = ntohs (tmp16);
559
560   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
561   buffer += sizeof (tmp16);
562   pkg_length = ntohs (tmp16);
563
564   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
565   buffer += sizeof (tmp64);
566   *value = collectd_ntohll (tmp64);
567
568   *ret_buffer = buffer;
569   *ret_buffer_len = buffer_len - pkg_length;
570
571   return (0);
572 } /* int parse_part_number */
573
574 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
575     char *output, int output_len)
576 {
577   char *buffer = *ret_buffer;
578   size_t buffer_len = *ret_buffer_len;
579
580   uint16_t tmp16;
581   size_t header_size = 2 * sizeof (uint16_t);
582
583   uint16_t pkg_length;
584   uint16_t pkg_type;
585
586   if ((buffer_len < 0) || (buffer_len < header_size))
587   {
588     noitL(noit_error, "collectd: parse_part_string: "
589         "Packet too short: "
590         "Chunk of at least size %zu expected, "
591         "but buffer has only %zu bytes left.\n",
592         header_size, buffer_len);
593     return (-1);
594   }
595
596   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
597   buffer += sizeof (tmp16);
598   pkg_type = ntohs (tmp16);
599
600   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
601   buffer += sizeof (tmp16);
602   pkg_length = ntohs (tmp16);
603
604   /* Check that packet fits in the input buffer */
605   if (pkg_length > buffer_len)
606   {
607     noitL(noit_error, "collectd: parse_part_string: "
608         "Packet too big: "
609         "Chunk of size %"PRIu16" received, "
610         "but buffer has only %zu bytes left.\n",
611         pkg_length, buffer_len);
612     return (-1);
613   }
614
615   /* Check that pkg_length is in the valid range */
616   if (pkg_length <= header_size)
617   {
618     noitL(noit_error, "collectd: parse_part_string: "
619         "Packet too short: "
620         "Header claims this packet is only %hu "
621         "bytes long.\n", pkg_length);
622     return (-1);
623   }
624
625   /* Check that the package data fits into the output buffer.
626    * The previous if-statement ensures that:
627    * `pkg_length > header_size' */
628   if ((output_len < 0)
629       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
630   {
631     noitL(noit_error, "collectd: parse_part_string: "
632         "Output buffer too small.\n");
633     return (-1);
634   }
635
636   /* All sanity checks successfull, let's copy the data over */
637   output_len = pkg_length - header_size;
638   memcpy ((void *) output, (void *) buffer, output_len);
639   buffer += output_len;
640
641   /* For some very weird reason '\0' doesn't do the trick on SPARC in
642    * this statement. */
643   if (output[output_len - 1] != 0)
644   {
645     noitL(noit_error, "collectd: parse_part_string: "
646         "Received string does not end "
647         "with a NULL-byte.\n");
648     return (-1);
649   }
650
651   *ret_buffer = buffer;
652   *ret_buffer_len = buffer_len - pkg_length;
653
654   return (0);
655 } /* int parse_part_string */
656
657
658 #define PP_SIGNED    0x01
659 #define PP_ENCRYPTED 0x02
660
661 #define BUFFER_READ(p,s) do { \
662   memcpy ((p), buffer + buffer_offset, (s)); \
663   buffer_offset += (s); \
664 } while (0)
665
666
667
668 // Forward declare
669 static int parse_packet (
670     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
671     void *buffer, size_t buffer_size, int flags);
672
673
674 static int parse_part_sign_sha256 (collectd_closure_t *ccl, noit_module_t *self,
675     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len, int flags)
676 {
677   unsigned char *buffer;
678   size_t buffer_len;
679   size_t buffer_offset;
680
681   size_t username_len;
682   part_signature_sha256_t pss;
683   uint16_t pss_head_length;
684   unsigned char hash[sizeof (pss.hash)];
685
686   unsigned char *hash_ptr;
687   unsigned int length;
688
689   buffer = *ret_buffer;
690   buffer_len = *ret_buffer_len;
691   buffer_offset = 0;
692
693   if (ccl->username == NULL)
694   {
695     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
696         "it because no user has been configured. Will accept it.\n");
697     return (0);
698   }
699
700   /* Check if the buffer has enough data for this structure. */
701   if (buffer_len <= PART_SIGNATURE_SHA256_SIZE)
702     return (-ENOMEM);
703
704   /* Read type and length header */
705   BUFFER_READ (&pss.head.type, sizeof (pss.head.type));
706   BUFFER_READ (&pss.head.length, sizeof (pss.head.length));
707   pss_head_length = ntohs (pss.head.length);
708
709   /* Check if the `pss_head_length' is within bounds. */
710   if ((pss_head_length <= PART_SIGNATURE_SHA256_SIZE)
711       || (pss_head_length > buffer_len))
712   {
713     noitL(noit_error, "collectd: HMAC-SHA-256 with invalid length received.\n");
714     return (-1);
715   }
716
717   /* Copy the hash. */
718   BUFFER_READ (pss.hash, sizeof (pss.hash));
719
720   /* Calculate username length (without null byte) and allocate memory */
721   username_len = pss_head_length - PART_SIGNATURE_SHA256_SIZE;
722   pss.username = malloc (username_len + 1);
723   if (pss.username == NULL)
724     return (-ENOMEM);
725
726   /* Read the username */
727   BUFFER_READ (pss.username, username_len);
728   pss.username[username_len] = 0;
729
730   assert (buffer_offset == pss_head_length);
731
732   /* Match up the username with the expected username */
733   if (strcmp(ccl->username, pss.username) != 0)
734   {
735     noitL(noit_error, "collectd: User: %s and Given User: %s don't match\n", ccl->username, pss.username);
736     sfree (pss.username);
737     return (-ENOENT);
738   }
739
740   /* Create a hash device and check the HMAC */
741   hash_ptr = HMAC(EVP_sha256(), ccl->secret, strlen(ccl->secret),
742       buffer     + PART_SIGNATURE_SHA256_SIZE,
743       buffer_len - PART_SIGNATURE_SHA256_SIZE,
744       hash,         &length);
745   if (hash_ptr == NULL)
746   {
747     noitL(noit_error, "collectd: Creating HMAC-SHA-256 object failed.\n");
748     sfree (pss.username);
749     return (-1);
750   }
751
752   /* Clean up */
753   sfree (pss.username);
754
755   if (memcmp (pss.hash, hash, sizeof (pss.hash)) != 0)
756   {
757     noitL(noit_error, "collectd: Verifying HMAC-SHA-256 signature failed: "
758         "Hash mismatch.\n");
759   }
760   else
761   {
762     parse_packet (ccl, self, check, buffer + buffer_offset, buffer_len - buffer_offset,
763         flags | PP_SIGNED);
764   }
765
766   *ret_buffer = buffer + buffer_len;
767   *ret_buffer_len = 0;
768
769   return (0);
770 } /* }}} int parse_part_sign_sha256 */
771 /* #endif HAVE_LIBGCRYPT */
772
773 static int parse_part_encr_aes256 (collectd_closure_t *ccl, noit_module_t *self,
774     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len,
775     int flags)
776 {
777   unsigned char  *buffer = *ret_buffer;
778   size_t buffer_len = *ret_buffer_len;
779   size_t payload_len;
780   size_t part_size;
781   size_t buffer_offset;
782   int    tmpbufsize;
783   unsigned char   *tmpbuf;
784   uint16_t username_len;
785   part_encryption_aes256_t pea;
786   unsigned char hash[sizeof (pea.hash)];
787   unsigned int hash_length;
788
789   EVP_CIPHER_CTX *ctx;
790   EVP_MD_CTX ctx_md;
791   int err;
792
793   /* Make sure at least the header if available. */
794   if (buffer_len <= PART_ENCRYPTION_AES256_SIZE)
795   {
796     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
797         "Discarding short packet.\n");
798     return (-1);
799   }
800
801   buffer_offset = 0;
802
803   /* Copy the unencrypted information into `pea'. */
804   BUFFER_READ (&pea.head.type, sizeof (pea.head.type));
805   BUFFER_READ (&pea.head.length, sizeof (pea.head.length));
806
807   /* Check the `part size'. */
808   part_size = ntohs (pea.head.length);
809   if ((part_size <= PART_ENCRYPTION_AES256_SIZE)
810       || (part_size > buffer_len))
811   {
812     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
813         "Discarding part with invalid size.\n");
814     return (-1);
815   }
816
817   /* Read the username */
818   BUFFER_READ (&username_len, sizeof (username_len));
819   username_len = ntohs (username_len);
820
821   if ((username_len <= 0)
822       || (username_len > (part_size - (PART_ENCRYPTION_AES256_SIZE + 1))))
823   {
824     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
825         "Discarding part with invalid username length.\n");
826     return (-1);
827   }
828
829   assert (username_len > 0);
830   pea.username = malloc (username_len + 1);
831   if (pea.username == NULL)
832     return (-ENOMEM);
833   BUFFER_READ (pea.username, username_len);
834   pea.username[username_len] = 0;
835
836   /* Last but not least, the initialization vector */
837   BUFFER_READ (pea.iv, sizeof (pea.iv));
838
839   /* Make sure we are at the right position */
840   assert (buffer_offset == (username_len +
841         PART_ENCRYPTION_AES256_SIZE - sizeof (pea.hash)));
842
843   /* Match up the username with the expected username */
844   if (strcmp(ccl->username, pea.username) != 0)
845   {
846     noitL(noit_error, "collectd: Username received and server side username don't match\n");
847     sfree (pea.username);
848     return (-ENOENT);
849   }
850
851   ctx = network_get_aes256_cypher (ccl, pea.iv, sizeof (pea.iv),
852       pea.username);
853   if (ctx == NULL)
854     return (-1);
855
856   payload_len = part_size - (PART_ENCRYPTION_AES256_SIZE + username_len);
857   assert (payload_len > 0);
858   tmpbuf = malloc(part_size - buffer_offset);
859
860   /* Decrypt the packet */
861   err = EVP_DecryptUpdate(ctx,
862       tmpbuf, &tmpbufsize,
863       buffer    + buffer_offset,
864       part_size - buffer_offset);
865   if (err != 1)
866   {
867     noitL(noit_error, "collectd: openssl returned: %d\n", err);
868     return (-1);
869   }
870
871   assert(part_size - buffer_offset == tmpbufsize);
872   /* Make it appear to be in place */
873   memcpy(buffer + buffer_offset, tmpbuf, part_size - buffer_offset);
874   sfree(tmpbuf);
875
876   /* Read the hash */
877   BUFFER_READ (pea.hash, sizeof (pea.hash));
878
879   /* Make sure we're at the right position - again */
880   assert (buffer_offset == (username_len + PART_ENCRYPTION_AES256_SIZE));
881   assert (buffer_offset == (part_size - payload_len));
882
883   /* Check hash sum */
884   memset (hash, 0, sizeof (hash));
885   EVP_DigestInit(&ctx_md, EVP_sha1());
886   EVP_DigestUpdate(&ctx_md, buffer + buffer_offset, payload_len);
887   err = EVP_DigestFinal(&ctx_md, hash, &hash_length);
888   if (memcmp (hash, pea.hash, sizeof (hash)) != 0)
889   {
890     noitL(noit_error, "collectd: Decryption failed: Checksum mismatch.\n");
891     return (-1);
892   }
893
894   parse_packet (ccl, self, check, buffer + buffer_offset, payload_len,
895       flags | PP_ENCRYPTED);
896
897   /* Update return values */
898   *ret_buffer =     buffer     + part_size;
899   *ret_buffer_len = buffer_len - part_size;
900   sfree(pea.username);
901
902   return (0);
903 } /* }}} int parse_part_encr_aes256 */
904
905
906 #undef BUFFER_READ
907
908
909 static int parse_packet (/* {{{ */
910     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
911     void *buffer, size_t buffer_size, int flags)
912 {
913   int status;
914
915   int packet_was_signed = (flags & PP_SIGNED);
916   int packet_was_encrypted = (flags & PP_ENCRYPTED);
917   int printed_ignore_warning = 0;
918
919 #define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL }
920   value_list_t vl = VALUE_LIST_INIT;
921   notification_t n;
922
923   memset (&vl, '\0', sizeof (vl));
924   memset (&n, '\0', sizeof (n));
925   status = 0;
926
927   while ((status == 0) && (0 < buffer_size)
928       && ((unsigned int) buffer_size > sizeof (part_header_t)))
929   {
930     uint16_t pkg_length;
931     uint16_t pkg_type;
932
933     memcpy ((void *) &pkg_type,
934         (void *) buffer,
935         sizeof (pkg_type));
936     memcpy ((void *) &pkg_length,
937         (void *) (buffer + sizeof (pkg_type)),
938         sizeof (pkg_length));
939
940     pkg_length = ntohs (pkg_length);
941     pkg_type = ntohs (pkg_type);
942
943     if (pkg_length > buffer_size)
944       break;
945     /* Ensure that this loop terminates eventually */
946     if (pkg_length < (2 * sizeof (uint16_t)))
947       break;
948
949
950     if (pkg_type == TYPE_ENCR_AES256)
951     {
952       status = parse_part_encr_aes256 (ccl, self, check,
953           &buffer, &buffer_size, flags);
954       if (status != 0)
955       {
956         noitL(noit_error, "collectd: Decrypting AES256 "
957             "part failed "
958             "with status %i.\n", status);
959         break;
960       }
961     }
962     else if ((ccl->security_level == SECURITY_LEVEL_ENCRYPT)
963         && (packet_was_encrypted == 0))
964     {
965       if (printed_ignore_warning == 0)
966       {
967         noitL(noit_debug, "collectd: Unencrypted packet or "
968             "part has been ignored.\n");
969         printed_ignore_warning = 1;
970       }
971       buffer = ((char *) buffer) + pkg_length;
972       continue;
973     }
974     else if (pkg_type == TYPE_SIGN_SHA256)
975     {
976       status = parse_part_sign_sha256 (ccl, self, check,
977                                         &buffer, &buffer_size, flags);
978       if (status != 0)
979       {
980         noitL(noit_error, "collectd: Verifying HMAC-SHA-256 "
981             "signature failed "
982             "with status %i.\n", status);
983         break;
984       }
985     }
986     else if ((ccl->security_level == SECURITY_LEVEL_SIGN)
987         && (packet_was_encrypted == 0)
988         && (packet_was_signed == 0))
989     {
990       if (printed_ignore_warning == 0)
991       {
992         noitL(noit_debug, "collectd: Unsigned packet or "
993             "part has been ignored.\n");
994         printed_ignore_warning = 1;
995       }
996       buffer = ((char *) buffer) + pkg_length;
997       continue;
998     }
999     else if (pkg_type == TYPE_VALUES)
1000     {
1001       status = parse_part_values (&buffer, &buffer_size,
1002           &vl.values, &vl.types, &vl.values_len);
1003
1004       if (status != 0)
1005         break;
1006
1007       if ((vl.time > 0)
1008           && (strlen (vl.host) > 0)
1009           && (strlen (vl.plugin) > 0)
1010           && (strlen (vl.type) > 0))
1011       {
1012         queue_values(ccl, self, check, &vl);
1013       }
1014       else
1015       {
1016         noitL(noit_error,
1017             "collectd:  NOT dispatching values\n");
1018       }
1019
1020       sfree (vl.values);
1021       sfree (vl.types);
1022     }
1023     else if (pkg_type == TYPE_TIME)
1024     {
1025       uint64_t tmp = 0;
1026       status = parse_part_number (&buffer, &buffer_size,
1027           &tmp);
1028       if (status == 0)
1029       {
1030         vl.time = (time_t) tmp;
1031         n.time = (time_t) tmp;
1032       }
1033     }
1034     else if (pkg_type == TYPE_INTERVAL)
1035     {
1036       uint64_t tmp = 0;
1037       status = parse_part_number (&buffer, &buffer_size,
1038           &tmp);
1039       if (status == 0)
1040         vl.interval = (int) tmp;
1041     }
1042     else if (pkg_type == TYPE_HOST)
1043     {
1044       status = parse_part_string (&buffer, &buffer_size,
1045           vl.host, sizeof (vl.host));
1046       if (status == 0)
1047         sstrncpy (n.host, vl.host, sizeof (n.host));
1048     }
1049     else if (pkg_type == TYPE_PLUGIN)
1050     {
1051       status = parse_part_string (&buffer, &buffer_size,
1052           vl.plugin, sizeof (vl.plugin));
1053       if (status == 0)
1054         sstrncpy (n.plugin, vl.plugin,
1055             sizeof (n.plugin));
1056     }
1057     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
1058     {
1059       status = parse_part_string (&buffer, &buffer_size,
1060           vl.plugin_instance,
1061           sizeof (vl.plugin_instance));
1062       if (status == 0)
1063         sstrncpy (n.plugin_instance,
1064             vl.plugin_instance,
1065             sizeof (n.plugin_instance));
1066     }
1067     else if (pkg_type == TYPE_TYPE)
1068     {
1069       status = parse_part_string (&buffer, &buffer_size,
1070           vl.type, sizeof (vl.type));
1071       if (status == 0)
1072         sstrncpy (n.type, vl.type, sizeof (n.type));
1073     }
1074     else if (pkg_type == TYPE_TYPE_INSTANCE)
1075     {
1076       status = parse_part_string (&buffer, &buffer_size,
1077           vl.type_instance,
1078           sizeof (vl.type_instance));
1079       if (status == 0)
1080         sstrncpy (n.type_instance, vl.type_instance,
1081             sizeof (n.type_instance));
1082     }
1083     else if (pkg_type == TYPE_MESSAGE)
1084     {
1085       status = parse_part_string (&buffer, &buffer_size,
1086           n.message, sizeof (n.message));
1087
1088       if (status != 0)
1089       {
1090         /* do nothing */
1091       }
1092       else if ((n.severity != NOTIF_FAILURE)
1093           && (n.severity != NOTIF_WARNING)
1094           && (n.severity != NOTIF_OKAY))
1095       {
1096         noitL(noit_error, "collectd: "
1097             "Ignoring notification with "
1098             "unknown severity %i.\n",
1099             n.severity);
1100       }
1101       else if (n.time <= 0)
1102       {
1103         noitL(noit_error, "collectd: "
1104             "Ignoring notification with "
1105             "time == 0.\n");
1106       }
1107       else if (strlen (n.message) <= 0)
1108       {
1109         noitL(noit_error, "collectd: "
1110             "Ignoring notification with "
1111             "an empty message.\n");
1112       }
1113       else
1114       {
1115         queue_notifications(ccl, self, check, &n);
1116         noitL(noit_error, "collectd: "
1117             "DISPATCH NOTIFICATION\n");
1118       }
1119     }
1120     else if (pkg_type == TYPE_SEVERITY)
1121     {
1122       uint64_t tmp = 0;
1123       status = parse_part_number (&buffer, &buffer_size,
1124           &tmp);
1125       if (status == 0)
1126         n.severity = (int) tmp;
1127     }
1128     else
1129     {
1130       noitL(noit_error, "collectd: parse_packet: Unknown part"
1131           " type: 0x%04hx\n", pkg_type);
1132       buffer = ((char *) buffer) + pkg_length;
1133     }
1134   } /* while (buffer_size > sizeof (part_header_t)) */
1135
1136   return (status);
1137 } /* }}} int parse_packet */
1138
1139
1140 // Not proud of this at all however; I am not sure best how to address this.
1141 static int infer_type(char *buffer, int buffer_len, value_list_t *vl, int index) {
1142   int len = strlen(buffer);
1143   strcat(buffer, "`");
1144   if (cmp_type(vl, "load", "load")) {
1145     assert(vl->values_len == 3);
1146     switch (index) {
1147       case 0:
1148         strcat(buffer, "1min"); break;
1149       case 1:
1150         strcat(buffer, "5min"); break;
1151       case 2:
1152         strcat(buffer, "15min"); break;
1153     }
1154   } else if (cmp_plugin(vl, "interface")) {
1155     assert(vl->values_len == 2);
1156     switch (index) {
1157       case 0:
1158         strcat(buffer, "rx"); break;
1159       case 1:
1160         strcat(buffer, "tx"); break;
1161     }
1162   } else {
1163     char buf[20];
1164     snprintf(buf, sizeof(buf), "%d", index);
1165     strcat(buffer, buf);
1166     noitL(noit_debug, "collectd: parsing multiple values"
1167         " and guessing on the type for plugin[%s] and type[%s]"
1168         , vl->plugin, vl->type);
1169   }
1170   return len;
1171 }
1172
1173 static void concat_metrics(char *buffer, char* plugin, char* plugin_inst, char* type, char* type_inst) {
1174   strcpy(buffer, plugin);
1175
1176   if (strlen(plugin_inst)) {
1177     strcat(buffer, "`");
1178     strcat(buffer, plugin_inst);
1179   }
1180   if (strlen(type)) {
1181     strcat(buffer, "`");
1182     strcat(buffer, type);
1183   }
1184   if (strlen(type_inst)) {
1185     strcat(buffer, "`");
1186     strcat(buffer, type_inst);
1187   }
1188 }
1189
1190 static int queue_notifications(collectd_closure_t *ccl,
1191       noit_module_t *self, noit_check_t *check, notification_t *n) {
1192   stats_t current;
1193   char buffer[DATA_MAX_NAME_LEN*4 + 128];
1194
1195   noit_check_stats_clear(&current);
1196   gettimeofday(&current.whence, NULL);
1197   // Concat all the names together so they fit into the flat noitd model
1198   concat_metrics(buffer, n->plugin, n->plugin_instance, n->type, n->type_instance);
1199   noit_stats_set_metric(&ccl->current, buffer, METRIC_STRING, n->message);
1200   noit_check_set_stats(self, check, &current);
1201   noitL(nldeb, "collectd: dispatch_notifications(%s, %s, %s)\n",check->target, buffer, n->message);
1202   return 0;
1203 }
1204
1205
1206 static int queue_values(collectd_closure_t *ccl,
1207       noit_module_t *self, noit_check_t *check, value_list_t *vl) {
1208   char buffer[DATA_MAX_NAME_LEN*4 + 4 + 1 + 20];
1209   int i, len = 0;
1210
1211   // Concat all the names together so they fit into the flat noitd model
1212   concat_metrics(buffer, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1213   for (i=0; i < vl->values_len; i++) {
1214  
1215     // Only infer the type if the amount of values is greater than one
1216     if (vl->values_len > 1) {
1217       // Trunc the string 
1218       if (len > 0)
1219         buffer[len] = 0;
1220       len = infer_type(buffer, sizeof(buffer), vl, i);
1221     }
1222
1223     switch (vl->types[i])
1224     {
1225       case DS_TYPE_COUNTER:
1226         noit_stats_set_metric(&ccl->current, buffer, METRIC_UINT64, &vl->values[i].counter);
1227         break;
1228
1229       case DS_TYPE_GAUGE:
1230         noit_stats_set_metric(&ccl->current, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1231         break;
1232
1233       case DS_TYPE_DERIVE:
1234         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].derive);
1235         break;
1236
1237       case DS_TYPE_ABSOLUTE:
1238         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].absolute);
1239         break;
1240
1241         noitL(noit_debug, "collectd: parse_part_values: "
1242       "Don't know how to handle data source type %"PRIu8 "\n",
1243       vl->types[i]);
1244         return (-1);
1245     } /* switch (value_types[i]) */
1246     ccl->stats_count++;
1247     noitL(nldeb, "collectd: queue_values(%s, %s)\n", buffer, check->target);
1248   }
1249   //noit_check_set_stats(self, check, &current);
1250   return 0;
1251 }
1252
1253 static void clear_closure(collectd_closure_t *ccl) {
1254   ccl->stats_count = 0;
1255   ccl->ntfy_count = 0;
1256   noit_check_stats_clear(&ccl->current);
1257
1258 }
1259
1260 static int collectd_submit(noit_module_t *self, noit_check_t *check) {
1261   collectd_closure_t *ccl;
1262   struct timeval duration;
1263   if(!check->closure) {
1264     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1265     memset(ccl, 0, sizeof(collectd_closure_t));
1266   } else {
1267     // Don't count the first run
1268     char human_buffer[256];
1269     ccl = (collectd_closure_t*)check->closure;
1270     gettimeofday(&ccl->current.whence, NULL);
1271     sub_timeval(ccl->current.whence, check->last_fire_time, &duration);
1272     ccl->current.duration = duration.tv_sec; // + duration.tv_usec / (1000 * 1000);
1273
1274     snprintf(human_buffer, sizeof(human_buffer),
1275              "dur=%d,run=%d,stats=%d,ntfy=%d", ccl->current.duration,
1276              check->generation, ccl->stats_count, ccl->ntfy_count);
1277     noitL(nldeb, "collectd(%s) [%s]\n", check->target, human_buffer);
1278
1279     // Not sure what to do here
1280     ccl->current.available = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1281         NP_AVAILABLE : NP_UNAVAILABLE;
1282     ccl->current.state = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1283         NP_GOOD : NP_BAD;
1284     ccl->current.status = human_buffer;
1285     noit_check_set_stats(self, check, &ccl->current);
1286
1287     memcpy(&check->last_fire_time, &ccl->current.whence, sizeof(duration));
1288   }
1289   clear_closure(ccl);
1290   return 0;
1291 }
1292
1293 static int noit_collectd_handler(eventer_t e, int mask, void *closure,
1294                              struct timeval *now) {
1295   struct sockaddr_in  skaddr;
1296   char packet[1500];
1297   int packet_len = sizeof(packet);
1298   unsigned int from_len;
1299   char ip_p[INET_ADDRSTRLEN];
1300   noit_module_t *self = (noit_module_t *)closure;
1301   collectd_mod_config_t *conf;
1302   conf = noit_module_get_userdata(self);
1303
1304
1305   // Get the username and password of the string
1306
1307   while(1) {
1308     int inlen;
1309     noit_check_t *check;
1310     collectd_closure_t *ccl;
1311     char *security_buffer;
1312
1313     from_len = sizeof(skaddr);
1314
1315     inlen = recvfrom(e->fd, packet, packet_len, 0,
1316                      (struct sockaddr *)&skaddr, &from_len);
1317     gettimeofday(now, NULL); /* set it, as we care about accuracy */
1318
1319     if(inlen < 0) {
1320       if(errno == EAGAIN || errno == EINTR) break;
1321       noitLT(nlerr, now, "collectd: recvfrom: %s\n", strerror(errno));
1322       break;
1323     }
1324     if (!inet_ntop(AF_INET, &(skaddr.sin_addr), ip_p, INET_ADDRSTRLEN)) {
1325       noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1326       break;
1327     }
1328     check = noit_poller_lookup_by_name(ip_p, "collectd");
1329     if (!check) {
1330       noitL(nlerr, "collectd: No defined check from ip [%s].\n", ip_p);
1331       break;
1332     }
1333
1334     // If its a new check retrieve some values
1335     if (check->closure == NULL) {
1336       // TODO: Verify if it could somehow retrieve data before the check closure exists
1337       ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1338       memset(ccl, 0, sizeof(collectd_closure_t));
1339     } else {
1340       ccl = (collectd_closure_t*)check->closure;
1341     }
1342     // Default to NONE
1343     ccl->security_level = SECURITY_LEVEL_NONE;     
1344     if (noit_hash_retr_str(check->config, "security_level", strlen("security_level"),
1345                            (const char**)&security_buffer))
1346     {
1347       ccl->security_level = atoi(security_buffer);
1348     }
1349
1350     // Is this outside to keep updates happening?
1351     if (!noit_hash_retr_str(check->config, "username", strlen("username"),
1352                            (const char**)&ccl->username) &&
1353         !noit_hash_retr_str(conf->options, "username", strlen("username"),
1354                            (const char**)&ccl->username))
1355     {
1356       if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1357         noitL(nlerr, "collectd: no username defined for check.\n");
1358         goto cleanup;
1359       } else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1360         noitL(nlerr, "collectd: no username defined for check, "
1361             "will accept any signed packet.\n");
1362       }
1363     }
1364
1365     if (!noit_hash_retr_str(check->config, "secret", strlen("secret"),
1366                            (const char**)&ccl->secret) &&
1367         !noit_hash_retr_str(conf->options, "secret", strlen("secret"),
1368                            (const char**)&ccl->secret))
1369     {
1370       if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1371         noitL(nlerr, "collectd: no secret defined for check.\n");
1372         goto cleanup;
1373       } else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1374         noitL(nlerr, "collectd: no secret defined for check, "
1375             "will accept any signed packet.\n");
1376       }
1377     }
1378
1379     parse_packet(ccl, self, check, packet, inlen, 0);
1380   }
1381   return EVENTER_READ | EVENTER_EXCEPTION;
1382
1383 cleanup:
1384   return 0;
1385 }
1386
1387
1388 static int noit_collectd_initiate_check(noit_module_t *self,
1389                                         noit_check_t *check,
1390                                         int once, noit_check_t *cause) {
1391   /* The idea is to write the collectd stuff to the stats one every period
1392    * Then we can warn people if no stats where written in a period of time
1393    */
1394   INITIATE_CHECK(collectd_submit, self, check);
1395   return 0;
1396 }
1397
1398 static int noit_collectd_config(noit_module_t *self, noit_hash_table *options) {
1399   collectd_mod_config_t *conf;
1400   conf = noit_module_get_userdata(self);
1401   if(conf) {
1402     if(conf->options) {
1403       noit_hash_destroy(conf->options, free, free);
1404       free(conf->options);
1405     }
1406   }
1407   else
1408     conf = calloc(1, sizeof(*conf));
1409   conf->options = options;
1410   noit_module_set_userdata(self, conf);
1411   return 1;
1412 }
1413
1414 static int noit_collectd_onload(noit_image_t *self) {
1415   if(!nlerr) nlerr = noit_log_stream_find("error/collectd");
1416   if(!nldeb) nldeb = noit_log_stream_find("debug/collectd");
1417   if(!nlerr) nlerr = noit_stderr;
1418   if(!nldeb) nldeb = noit_debug;
1419   eventer_name_callback("noit_collectd/handler", noit_collectd_handler);
1420   return 0;
1421 }
1422
1423 static int noit_collectd_init(noit_module_t *self) {
1424   const char *config_val;
1425   int sockaddr_len;
1426   collectd_mod_config_t *conf;
1427   conf = noit_module_get_userdata(self);
1428   int portint = 0;
1429   struct sockaddr_in skaddr;
1430   const char *host;
1431   unsigned short port;
1432
1433   if(!__collectd_initialize_once) {
1434     __collectd_initialize_once = 1;
1435   }
1436
1437
1438   /* Default Collectd port */
1439   portint = 25826;
1440   if(noit_hash_retr_str(conf->options,
1441                          "collectd_port", strlen("collectd_port"),
1442                          (const char**)&config_val))
1443     portint = atoi(config_val);
1444
1445
1446   if(!noit_hash_retr_str(conf->options,
1447                          "collectd_host", strlen("collectd_host"),
1448                          (const char**)&host))
1449     host = "*";
1450
1451   port = (unsigned short) portint;
1452
1453   conf->ipv4_fd = conf->ipv6_fd = -1;
1454
1455   conf->ipv4_fd = socket(PF_INET, SOCK_DGRAM, 0);
1456   if(conf->ipv4_fd < 0) {
1457     noitL(noit_error, "collectd: socket failed: %s\n",
1458           strerror(errno));
1459   }
1460   else {
1461    /*
1462     socklen_t slen = sizeof(on);
1463     if(getsockopt(conf->ipv4_fd, SOL_SOCKET, SO_SNDBUF, &on, &slen) == 0) {
1464       while(on < (1 << 20)) {
1465         on <<= 1;
1466         if(setsockopt(conf->ipv4_fd, SOL_SOCKET, SO_SNDBUF,
1467                       &on, sizeof(on)) != 0) {
1468           on >>= 1;
1469           break;
1470         }
1471       }
1472       noitL(noit_debug, ": send buffer set to %d\n", on);
1473     }
1474     else
1475       noitL(noit_error, "Cannot get sndbuf size: %s\n", strerror(errno));
1476     */
1477     if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
1478       close(conf->ipv4_fd);
1479       conf->ipv4_fd = -1;
1480       noitL(noit_error,
1481             "collectd: could not set socket non-blocking: %s\n",
1482             strerror(errno));
1483     }
1484   }
1485   skaddr.sin_family = AF_INET;
1486   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
1487   skaddr.sin_port = htons(port);
1488
1489   sockaddr_len = sizeof(skaddr);
1490   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sockaddr_len) < 0) {
1491     noitL(noit_stderr, "bind failed[%s]: %s\n", host, strerror(errno));
1492     close(conf->ipv4_fd);
1493     return -1;
1494   }
1495
1496   if(conf->ipv4_fd >= 0) {
1497     eventer_t newe;
1498     newe = eventer_alloc();
1499     newe->fd = conf->ipv4_fd;
1500     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
1501     newe->callback = noit_collectd_handler;
1502     newe->closure = self;
1503     eventer_add(newe);
1504   }
1505 /*
1506   conf->ipv6_fd = socket(AF_INET6, SOCK_DGRAM, 0);
1507   if(conf->ipv6_fd < 0) {
1508     noitL(noit_error, "collectd: socket failed: %s\n",
1509           strerror(errno));
1510   }
1511   else {
1512     if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
1513       close(conf->ipv6_fd);
1514       conf->ipv6_fd = -1;
1515       noitL(noit_error,
1516             "collectd: could not set socket non-blocking: %s\n",
1517                strerror(errno));
1518     }
1519   }
1520   if(conf->ipv6_fd >= 0) {
1521     eventer_t newe;
1522     newe = eventer_alloc();
1523     newe->fd = conf->ipv6_fd;
1524     newe->mask = EVENTER_READ;
1525     newe->callback = ping_icmp_handler;
1526     newe->closure = self;
1527     eventer_add(newe);
1528   }
1529   */
1530   noit_module_set_userdata(self, conf);
1531   return 0;
1532 }
1533
1534 #include "collectd.xmlh"
1535 noit_module_t collectd = {
1536   {
1537     NOIT_MODULE_MAGIC,
1538     NOIT_MODULE_ABI_VERSION,
1539     "collectd",
1540     "collectd collection",
1541     collectd_xml_description,
1542     noit_collectd_onload
1543   },
1544   noit_collectd_config,
1545   noit_collectd_init,
1546   noit_collectd_initiate_check,
1547   NULL /* noit_collectd_cleanup */
1548 };
Note: See TracBrowser for help on using the browser.