root/src/modules/collectd.c

Revision 9337e1f9d0aae029e90f90d6978b9b03a8f10d28, 45.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

throw me a bone

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009  Florian Forster
3  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
4  * Copyright (c) 2009, Dan Di Spaltro
5  * All rights reserved.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library, in file named COPYING.LGPL; if not,
19  * write to the Free Software Foundation, Inc., 59 Temple Place,
20  * Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 #include "noit_defines.h"
25
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <math.h>
31 #include <ctype.h>
32 #ifdef HAVE_SYS_FILIO_H
33 #include <sys/filio.h>
34 #endif
35 #include <openssl/evp.h>
36 #include <openssl/hmac.h>
37 #include <inttypes.h>
38 #include <arpa/inet.h>
39
40 #include "noit_module.h"
41 #include "noit_check.h"
42 #include "noit_check_tools.h"
43 #include "utils/noit_log.h"
44 #include "utils/noit_hash.h"
45
46
47 static noit_log_stream_t nlerr = NULL;
48 static noit_log_stream_t nldeb = NULL;
49
50 typedef struct _mod_config {
51   noit_hash_table *options;
52   noit_hash_table target_sessions;
53   noit_boolean support_notifications;
54   int ipv4_fd;
55   int ipv6_fd;
56 } collectd_mod_config_t;
57
58 typedef struct collectd_closure_s {
59   char *username;
60   char *secret;
61   int security_level;
62   EVP_CIPHER_CTX ctx;
63   stats_t current;
64   int stats_count;
65   int ntfy_count;
66 } collectd_closure_t;
67
68 #define cmp_plugin(vl, val) \
69   (strncmp(vl->plugin, val, sizeof(val)) == 0)
70
71 #define cmp_type(vl, plugin, type) \
72   (cmp_plugin(vl, plugin) && \
73    cmp_plugin(vl, type))
74
75
76 /**
77  *
78  * Collectd Structs
79  *
80  *
81  **/
82
83 union meta_value_u
84 {
85   char    *mv_string;
86   int64_t  mv_signed_int;
87   uint64_t mv_unsigned_int;
88   double   mv_double;
89   _Bool    mv_boolean;
90 };
91 typedef union meta_value_u meta_value_t;
92
93 struct meta_entry_s;
94 typedef struct meta_entry_s meta_entry_t;
95 struct meta_entry_s
96 {
97   char         *key;
98   meta_value_t  value;
99   int           type;
100   meta_entry_t *next;
101 };
102
103 struct meta_data_s
104 {
105   meta_entry_t   *head;
106   pthread_mutex_t lock;
107 };
108
109 typedef struct meta_data_s meta_data_t;
110
111
112 #define NET_DEFAULT_V4_ADDR "239.192.74.66"
113 #define NET_DEFAULT_V6_ADDR "ff18::efc0:4a42"
114 #define NET_DEFAULT_PORT    25826
115
116 #define TYPE_HOST            0x0000
117 #define TYPE_TIME            0x0001
118 #define TYPE_PLUGIN          0x0002
119 #define TYPE_PLUGIN_INSTANCE 0x0003
120 #define TYPE_TYPE            0x0004
121 #define TYPE_TYPE_INSTANCE   0x0005
122 #define TYPE_VALUES          0x0006
123 #define TYPE_INTERVAL        0x0007
124
125 /* Types to transmit notifications */
126 #define TYPE_MESSAGE         0x0100
127 #define TYPE_SEVERITY        0x0101
128
129 #define TYPE_SIGN_SHA256     0x0200
130 #define TYPE_ENCR_AES256     0x0210
131
132 #define DATA_MAX_NAME_LEN 64
133
134 #define NOTIF_MAX_MSG_LEN 256
135 #define NOTIF_FAILURE 1
136 #define NOTIF_WARNING 2
137 #define NOTIF_OKAY    4
138
139 #define sfree(ptr) \
140   do { \
141     if((ptr) != NULL) { \
142       free(ptr); \
143     } \
144     (ptr) = NULL; \
145   } while (0)
146
147 #define sstrncpy(a, b, c) strncpy(a, b, c)
148
149 #define SECURITY_LEVEL_NONE     0
150 #define SECURITY_LEVEL_SIGN    1
151 #define SECURITY_LEVEL_ENCRYPT 2
152
153 #define DS_TYPE_COUNTER  0
154 #define DS_TYPE_GAUGE    1
155 #define DS_TYPE_DERIVE   2
156 #define DS_TYPE_ABSOLUTE 3
157
158 #define DS_TYPE_TO_STRING(t) (t == DS_TYPE_COUNTER)     ? "counter"  : \
159         (t == DS_TYPE_GAUGE)    ? "gauge"    : \
160         (t == DS_TYPE_DERIVE)   ? "derive"   : \
161         (t == DS_TYPE_ABSOLUTE) ? "absolute" : \
162         "unknown"
163
164 #define BUFF_SIZE 1024
165
166 typedef unsigned long long counter_t;
167 typedef double gauge_t;
168 typedef int64_t derive_t;
169 typedef uint64_t absolute_t;
170 int  interval_g;
171
172 union value_u
173 {
174   counter_t  counter;
175   gauge_t    gauge;
176   derive_t   derive;
177   absolute_t absolute;
178 };
179 typedef union value_u value_t;
180
181 struct value_list_s
182 {
183   value_t *values;
184   int      values_len;
185   time_t   time;
186   int      interval;
187   char     host[DATA_MAX_NAME_LEN];
188   char     plugin[DATA_MAX_NAME_LEN];
189   char     plugin_instance[DATA_MAX_NAME_LEN];
190   char     type[DATA_MAX_NAME_LEN];
191   char     type_instance[DATA_MAX_NAME_LEN];
192   meta_data_t *meta;
193   uint8_t  *types;
194 };
195 typedef struct value_list_s value_list_t;
196
197
198 enum notification_meta_type_e
199 {
200   NM_TYPE_STRING,
201   NM_TYPE_SIGNED_INT,
202   NM_TYPE_UNSIGNED_INT,
203   NM_TYPE_DOUBLE,
204   NM_TYPE_BOOLEAN
205 };
206
207 typedef struct notification_meta_s
208 {
209   char name[DATA_MAX_NAME_LEN];
210   enum notification_meta_type_e type;
211   union
212   {
213     const char *nm_string;
214     int64_t nm_signed_int;
215     uint64_t nm_unsigned_int;
216     double nm_double;
217     bool nm_boolean;
218   } nm_value;
219   struct notification_meta_s *next;
220 } notification_meta_t;
221
222 typedef struct notification_s
223 {
224   int    severity;
225   time_t time;
226   char   message[NOTIF_MAX_MSG_LEN];
227   char   host[DATA_MAX_NAME_LEN];
228   char   plugin[DATA_MAX_NAME_LEN];
229   char   plugin_instance[DATA_MAX_NAME_LEN];
230   char   type[DATA_MAX_NAME_LEN];
231   char   type_instance[DATA_MAX_NAME_LEN];
232   notification_meta_t *meta;
233 } notification_t;
234
235
236
237 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
238  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
239  * +-------+-----------------------+-------------------------------+
240  * ! Ver.  !                       ! Length                        !
241  * +-------+-----------------------+-------------------------------+
242  */
243 struct part_header_s
244 {
245   uint16_t type;
246   uint16_t length;
247 };
248 typedef struct part_header_s part_header_t;
249
250 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
251  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
252  * +-------------------------------+-------------------------------+
253  * ! Type                          ! Length                        !
254  * +-------------------------------+-------------------------------+
255  * : (Length - 4) Bytes                                            :
256  * +---------------------------------------------------------------+
257  */
258 struct part_string_s
259 {
260   part_header_t *head;
261   char *value;
262 };
263 typedef struct part_string_s part_string_t;
264
265 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
266  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
267  * +-------------------------------+-------------------------------+
268  * ! Type                          ! Length                        !
269  * +-------------------------------+-------------------------------+
270  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
271  * +---------------------------------------------------------------+
272  */
273 struct part_number_s
274 {
275   part_header_t *head;
276   uint64_t *value;
277 };
278 typedef struct part_number_s part_number_t;
279
280 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
281  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
282  * +-------------------------------+-------------------------------+
283  * ! Type                          ! Length                        !
284  * +-------------------------------+---------------+---------------+
285  * ! Num of values                 ! Type0         ! Type1         !
286  * +-------------------------------+---------------+---------------+
287  * ! Value0                                                        !
288  * !                                                               !
289  * +---------------------------------------------------------------+
290  * ! Value1                                                        !
291  * !                                                               !
292  * +---------------------------------------------------------------+
293  */
294 struct part_values_s
295 {
296   part_header_t *head;
297   uint16_t *num_values;
298   uint8_t  *values_types;
299   value_t  *values;
300 };
301 typedef struct part_values_s part_values_t;
302
303 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
304  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
305  * +-------------------------------+-------------------------------+
306  * ! Type                          ! Length                        !
307  * +-------------------------------+-------------------------------+
308  * ! Hash (Bits   0 -  31)                                         !
309  * : :                                                             :
310  * ! Hash (Bits 224 - 255)                                         !
311  * +---------------------------------------------------------------+
312  */
313 /* Minimum size */
314 #define PART_SIGNATURE_SHA256_SIZE 36
315 struct part_signature_sha256_s
316 {
317   part_header_t head;
318   unsigned char hash[32];
319   char *username;
320 };
321 typedef struct part_signature_sha256_s part_signature_sha256_t;
322
323 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
324  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
325  * +-------------------------------+-------------------------------+
326  * ! Type                          ! Length                        !
327  * +-------------------------------+-------------------------------+
328  * ! Original length               ! Padding (0 - 15 bytes)        !
329  * +-------------------------------+-------------------------------+
330  * ! Hash (Bits   0 -  31)                                         !
331  * : :                                                             :
332  * ! Hash (Bits 128 - 159)                                         !
333  * +---------------------------------------------------------------+
334  */
335 /* Minimum size */
336 #define PART_ENCRYPTION_AES256_SIZE 42
337 struct part_encryption_aes256_s
338 {
339   part_header_t head;
340   uint16_t username_length;
341   char *username;
342   unsigned char iv[16];
343   /* <encrypted> */
344   unsigned char hash[20];
345   /*   <payload /> */
346   /* </encrypted> */
347 };
348 typedef struct part_encryption_aes256_s part_encryption_aes256_t;
349
350 struct receive_list_entry_s
351 {
352   char data[BUFF_SIZE];
353   int  data_len;
354   int  fd;
355   struct receive_list_entry_s *next;
356 };
357 typedef struct receive_list_entry_s receive_list_entry_t;
358
359
360 /**
361  *
362  *  END Collectd Structs
363  *
364  */
365
366 static unsigned long long collectd_ntohll (unsigned long long n)
367 {
368 #if BYTE_ORDER == BIG_ENDIAN
369   return (n);
370 #else
371   return (((unsigned long long) ntohl (n)) << 32) + ntohl (n >> 32);
372 #endif
373 } /* unsigned long long collectd_ntohll */
374
375 #define ntohd(d) (d)
376
377
378 /* Forward declare this so the parse function can use it */
379 static int queue_values(collectd_closure_t *ccl,
380   noit_module_t *self, noit_check_t *check, value_list_t *vl);
381 static int queue_notifications(collectd_closure_t *ccl,
382   noit_module_t *self, noit_check_t *check, notification_t *n);
383
384
385 static EVP_CIPHER_CTX* network_get_aes256_cypher (collectd_closure_t *ccl, /* {{{ */
386     const void *iv, size_t iv_size, const char *username)
387 {
388
389   if (ccl->secret == NULL)
390     return (NULL);
391   else
392   {
393     EVP_CIPHER_CTX *ctx_ptr;
394     EVP_MD_CTX ctx_md;
395     unsigned char password_hash[32];
396     unsigned int length = 0;
397     int success;
398
399     ctx_ptr = &ccl->ctx;
400
401     EVP_DigestInit(&ctx_md, EVP_sha256());
402     EVP_DigestUpdate(&ctx_md, ccl->secret, strlen(ccl->secret));
403     EVP_DigestFinal(&ctx_md, password_hash, &length);
404     EVP_MD_CTX_cleanup(&ctx_md);
405
406     assert(length <= 32);
407
408     success = EVP_DecryptInit(ctx_ptr, EVP_aes_256_ofb(), password_hash, iv);
409     if (success != 1)
410     {
411       noitL(noit_error, "collectd: EVP_DecryptInit returned: %d\n",
412           success);
413       return (NULL);
414     }
415     return (ctx_ptr);
416   }
417 } /* }}} int network_get_aes256_cypher */
418
419 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
420     value_t **ret_values, uint8_t **ret_types, int *ret_num_values)
421 {
422   char *buffer = *ret_buffer;
423   size_t buffer_len = *ret_buffer_len;
424
425   uint16_t tmp16;
426   size_t exp_size;
427   int   i;
428
429   uint16_t pkg_length;
430   uint16_t pkg_type;
431   uint16_t pkg_numval;
432
433   uint8_t *pkg_types;
434   value_t *pkg_values;
435
436   if (buffer_len < 15)
437   {
438     noitL(noit_error,"collectd: packet is too short: "
439         "buffer_len = %zu\n", buffer_len);
440     return (-1);
441   }
442
443   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
444   buffer += sizeof (tmp16);
445   pkg_type = ntohs (tmp16);
446
447   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
448   buffer += sizeof (tmp16);
449   pkg_length = ntohs (tmp16);
450
451   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
452   buffer += sizeof (tmp16);
453   pkg_numval = ntohs (tmp16);
454
455   assert (pkg_type == TYPE_VALUES);
456
457   exp_size = 3 * sizeof (uint16_t)
458     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
459   if ((buffer_len < 0) || (buffer_len < exp_size))
460   {
461     noitL(noit_error, "collectd: parse_part_values: "
462         "Packet too short: "
463         "Chunk of size %zu expected, "
464         "but buffer has only %zu bytes left.\n",
465         exp_size, buffer_len);
466     return (-1);
467   }
468
469   if (pkg_length != exp_size)
470   {
471     noitL(noit_debug, "collectd: parse_part_values: "
472         "Length and number of values "
473         "in the packet don't match.\n");
474     return (-1);
475   }
476
477   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
478   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
479   if ((pkg_types == NULL) || (pkg_values == NULL))
480   {
481     sfree (pkg_types);
482     sfree (pkg_values);
483     noitL(noit_error, "collectd: parse_part_values: malloc failed.\n");
484     return (-1);
485   }
486
487   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
488   buffer += pkg_numval * sizeof (uint8_t);
489   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
490   buffer += pkg_numval * sizeof (value_t);
491
492   for (i = 0; i < pkg_numval; i++)
493   {
494     switch (pkg_types[i])
495     {
496       case DS_TYPE_COUNTER:
497         pkg_values[i].counter = (counter_t) collectd_ntohll (pkg_values[i].counter);
498         break;
499
500       case DS_TYPE_GAUGE:
501         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
502         break;
503
504       case DS_TYPE_DERIVE:
505         pkg_values[i].derive = (derive_t) collectd_ntohll (pkg_values[i].derive);
506         break;
507
508       case DS_TYPE_ABSOLUTE:
509         pkg_values[i].absolute = (absolute_t) collectd_ntohll (pkg_values[i].absolute);
510         break;
511
512       default:
513         noitL(noit_debug, "collectd: parse_part_values: "
514       "Don't know how to handle data source type %"PRIu8 "\n",
515       pkg_types[i]);
516         sfree (pkg_types);
517         sfree (pkg_values);
518         return (-1);
519     } /* switch (pkg_types[i]) */
520   }
521
522   *ret_buffer     = buffer;
523   *ret_buffer_len = buffer_len - pkg_length;
524   *ret_num_values = pkg_numval;
525   *ret_types      = pkg_types;
526   *ret_values     = pkg_values;
527
528
529   return (0);
530 } /* int parse_part_values */
531
532 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
533     uint64_t *value)
534 {
535   char *buffer = *ret_buffer;
536   size_t buffer_len = *ret_buffer_len;
537
538   uint16_t tmp16;
539   uint64_t tmp64;
540   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
541
542   uint16_t pkg_length;
543
544   if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
545   {
546     noitL(noit_error, "collectd: parse_part_number: "
547         "Packet too short: "
548         "Chunk of size %zu expected, "
549         "but buffer has only %zu bytes left.\n",
550         exp_size, buffer_len);
551     return (-1);
552   }
553
554   /* skip pkg_type */
555   buffer += sizeof (tmp16);
556
557   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
558   buffer += sizeof (tmp16);
559   pkg_length = ntohs (tmp16);
560
561   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
562   buffer += sizeof (tmp64);
563   *value = collectd_ntohll (tmp64);
564
565   *ret_buffer = buffer;
566   *ret_buffer_len = buffer_len - pkg_length;
567
568   return (0);
569 } /* int parse_part_number */
570
571 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
572     char *output, int output_len)
573 {
574   char *buffer = *ret_buffer;
575   size_t buffer_len = *ret_buffer_len;
576
577   uint16_t tmp16;
578   size_t header_size = 2 * sizeof (uint16_t);
579
580   uint16_t pkg_length;
581
582   if ((buffer_len < 0) || (buffer_len < header_size))
583   {
584     noitL(noit_error, "collectd: parse_part_string: "
585         "Packet too short: "
586         "Chunk of at least size %zu expected, "
587         "but buffer has only %zu bytes left.\n",
588         header_size, buffer_len);
589     return (-1);
590   }
591
592   /* skip pkg_type */
593   buffer += sizeof (tmp16);
594
595   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
596   buffer += sizeof (tmp16);
597   pkg_length = ntohs (tmp16);
598
599   /* Check that packet fits in the input buffer */
600   if (pkg_length > buffer_len)
601   {
602     noitL(noit_error, "collectd: parse_part_string: "
603         "Packet too big: "
604         "Chunk of size %"PRIu16" received, "
605         "but buffer has only %zu bytes left.\n",
606         pkg_length, buffer_len);
607     return (-1);
608   }
609
610   /* Check that pkg_length is in the valid range */
611   if (pkg_length <= header_size)
612   {
613     noitL(noit_error, "collectd: parse_part_string: "
614         "Packet too short: "
615         "Header claims this packet is only %hu "
616         "bytes long.\n", pkg_length);
617     return (-1);
618   }
619
620   /* Check that the package data fits into the output buffer.
621    * The previous if-statement ensures that:
622    * `pkg_length > header_size' */
623   if ((output_len < 0)
624       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
625   {
626     noitL(noit_error, "collectd: parse_part_string: "
627         "Output buffer too small.\n");
628     return (-1);
629   }
630
631   /* All sanity checks successfull, let's copy the data over */
632   output_len = pkg_length - header_size;
633   memcpy ((void *) output, (void *) buffer, output_len);
634   buffer += output_len;
635
636   /* For some very weird reason '\0' doesn't do the trick on SPARC in
637    * this statement. */
638   if (output[output_len - 1] != 0)
639   {
640     noitL(noit_error, "collectd: parse_part_string: "
641         "Received string does not end "
642         "with a NULL-byte.\n");
643     return (-1);
644   }
645
646   *ret_buffer = buffer;
647   *ret_buffer_len = buffer_len - pkg_length;
648
649   return (0);
650 } /* int parse_part_string */
651
652
653 #define PP_SIGNED    0x01
654 #define PP_ENCRYPTED 0x02
655
656 #define BUFFER_READ(p,s) do { \
657   memcpy ((p), buffer + buffer_offset, (s)); \
658   buffer_offset += (s); \
659 } while (0)
660
661
662
663 // Forward declare
664 static int parse_packet (
665     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
666     void *buffer, size_t buffer_size, int flags);
667
668
669 static int parse_part_sign_sha256 (collectd_closure_t *ccl, noit_module_t *self,
670     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len, int flags)
671 {
672   unsigned char *buffer;
673   size_t buffer_len;
674   size_t buffer_offset;
675
676   size_t username_len;
677   part_signature_sha256_t pss;
678   uint16_t pss_head_length;
679   unsigned char hash[sizeof (pss.hash)];
680
681   unsigned char *hash_ptr;
682   unsigned int length;
683
684   buffer = *ret_buffer;
685   buffer_len = *ret_buffer_len;
686   buffer_offset = 0;
687
688   if (ccl->username == NULL)
689   {
690     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
691         "it because no user has been configured. Will accept it.\n");
692     return (0);
693   }
694
695   if (ccl->secret == NULL)
696   {
697     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
698         "it because no secret has been configured. Will accept it.\n");
699     return (0);
700   }
701
702   /* Check if the buffer has enough data for this structure. */
703   if (buffer_len <= PART_SIGNATURE_SHA256_SIZE)
704     return (-ENOMEM);
705
706   /* Read type and length header */
707   BUFFER_READ (&pss.head.type, sizeof (pss.head.type));
708   BUFFER_READ (&pss.head.length, sizeof (pss.head.length));
709   pss_head_length = ntohs (pss.head.length);
710
711   /* Check if the `pss_head_length' is within bounds. */
712   if ((pss_head_length <= PART_SIGNATURE_SHA256_SIZE)
713       || (pss_head_length > buffer_len))
714   {
715     noitL(noit_error, "collectd: HMAC-SHA-256 with invalid length received.\n");
716     return (-1);
717   }
718
719   /* Copy the hash. */
720   BUFFER_READ (pss.hash, sizeof (pss.hash));
721
722   /* Calculate username length (without null byte) and allocate memory */
723   username_len = pss_head_length - PART_SIGNATURE_SHA256_SIZE;
724   pss.username = malloc (username_len + 1);
725   if (pss.username == NULL)
726     return (-ENOMEM);
727
728   /* Read the username */
729   BUFFER_READ (pss.username, username_len);
730   pss.username[username_len] = 0;
731
732   assert (buffer_offset == pss_head_length);
733
734   /* Match up the username with the expected username */
735   if (strcmp(ccl->username, pss.username) != 0)
736   {
737     noitL(noit_error, "collectd: User: %s and Given User: %s don't match\n", ccl->username, pss.username);
738     sfree (pss.username);
739     return (-ENOENT);
740   }
741
742   /* Create a hash device and check the HMAC */
743   hash_ptr = HMAC(EVP_sha256(), ccl->secret, strlen(ccl->secret),
744       buffer     + PART_SIGNATURE_SHA256_SIZE,
745       buffer_len - PART_SIGNATURE_SHA256_SIZE,
746       hash,         &length);
747   if (hash_ptr == NULL)
748   {
749     noitL(noit_error, "collectd: Creating HMAC-SHA-256 object failed.\n");
750     sfree (pss.username);
751     return (-1);
752   }
753
754   /* Clean up */
755   sfree (pss.username);
756
757   if (memcmp (pss.hash, hash, sizeof (pss.hash)) != 0)
758   {
759     noitL(noit_error, "collectd: Verifying HMAC-SHA-256 signature failed: "
760         "Hash mismatch.\n");
761   }
762   else
763   {
764     parse_packet (ccl, self, check, buffer + buffer_offset, buffer_len - buffer_offset,
765         flags | PP_SIGNED);
766   }
767
768   *ret_buffer = buffer + buffer_len;
769   *ret_buffer_len = 0;
770
771   return (0);
772 } /* }}} int parse_part_sign_sha256 */
773 /* #endif HAVE_LIBGCRYPT */
774
775 static int parse_part_encr_aes256 (collectd_closure_t *ccl, noit_module_t *self,
776     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len,
777     int flags)
778 {
779   unsigned char  *buffer = *ret_buffer;
780   size_t buffer_len = *ret_buffer_len;
781   size_t payload_len;
782   size_t part_size;
783   size_t buffer_offset;
784   int    tmpbufsize;
785   unsigned char   *tmpbuf;
786   uint16_t username_len;
787   part_encryption_aes256_t pea;
788   unsigned char hash[sizeof (pea.hash)];
789   unsigned int hash_length;
790
791   EVP_CIPHER_CTX *ctx;
792   EVP_MD_CTX ctx_md;
793   int err;
794
795   /* Make sure at least the header if available. */
796   if (buffer_len <= PART_ENCRYPTION_AES256_SIZE)
797   {
798     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
799         "Discarding short packet.\n");
800     return (-1);
801   }
802
803   buffer_offset = 0;
804
805   /* Copy the unencrypted information into `pea'. */
806   BUFFER_READ (&pea.head.type, sizeof (pea.head.type));
807   BUFFER_READ (&pea.head.length, sizeof (pea.head.length));
808
809   /* Check the `part size'. */
810   part_size = ntohs (pea.head.length);
811   if ((part_size <= PART_ENCRYPTION_AES256_SIZE)
812       || (part_size > buffer_len))
813   {
814     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
815         "Discarding part with invalid size.\n");
816     return (-1);
817   }
818
819   /* Read the username */
820   BUFFER_READ (&username_len, sizeof (username_len));
821   username_len = ntohs (username_len);
822
823   if ((username_len <= 0)
824       || (username_len > (part_size - (PART_ENCRYPTION_AES256_SIZE + 1))))
825   {
826     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
827         "Discarding part with invalid username length.\n");
828     return (-1);
829   }
830
831   assert (username_len > 0);
832   pea.username = malloc (username_len + 1);
833   if (pea.username == NULL)
834     return (-ENOMEM);
835   BUFFER_READ (pea.username, username_len);
836   pea.username[username_len] = 0;
837
838   /* Last but not least, the initialization vector */
839   BUFFER_READ (pea.iv, sizeof (pea.iv));
840
841   /* Make sure we are at the right position */
842   assert (buffer_offset == (username_len +
843         PART_ENCRYPTION_AES256_SIZE - sizeof (pea.hash)));
844
845   /* Match up the username with the expected username */
846   if (strcmp(ccl->username, pea.username) != 0)
847   {
848     noitL(noit_error, "collectd: Username received and server side username don't match\n");
849     sfree (pea.username);
850     return (-ENOENT);
851   }
852
853   ctx = network_get_aes256_cypher (ccl, pea.iv, sizeof (pea.iv),
854       pea.username);
855   if (ctx == NULL)
856     return (-1);
857
858   payload_len = part_size - (PART_ENCRYPTION_AES256_SIZE + username_len);
859   assert (payload_len > 0);
860   tmpbuf = malloc(part_size - buffer_offset);
861
862   /* Decrypt the packet */
863   err = EVP_DecryptUpdate(ctx,
864       tmpbuf, &tmpbufsize,
865       buffer    + buffer_offset,
866       part_size - buffer_offset);
867   if (err != 1)
868   {
869     noitL(noit_error, "collectd: openssl returned: %d\n", err);
870     return (-1);
871   }
872
873   assert(part_size - buffer_offset == tmpbufsize);
874   /* Make it appear to be in place */
875   memcpy(buffer + buffer_offset, tmpbuf, part_size - buffer_offset);
876   sfree(tmpbuf);
877
878   /* Read the hash */
879   BUFFER_READ (pea.hash, sizeof (pea.hash));
880
881   /* Make sure we're at the right position - again */
882   assert (buffer_offset == (username_len + PART_ENCRYPTION_AES256_SIZE));
883   assert (buffer_offset == (part_size - payload_len));
884
885   /* Check hash sum */
886   memset (hash, 0, sizeof (hash));
887   EVP_DigestInit(&ctx_md, EVP_sha1());
888   EVP_DigestUpdate(&ctx_md, buffer + buffer_offset, payload_len);
889   EVP_DigestFinal(&ctx_md, hash, &hash_length);
890   if (memcmp (hash, pea.hash, sizeof (hash)) != 0)
891   {
892     noitL(noit_error, "collectd: Decryption failed: Checksum mismatch.\n");
893     return (-1);
894   }
895
896   parse_packet (ccl, self, check, buffer + buffer_offset, payload_len,
897       flags | PP_ENCRYPTED);
898
899   /* Update return values */
900   *ret_buffer =     buffer     + part_size;
901   *ret_buffer_len = buffer_len - part_size;
902   sfree(pea.username);
903
904   return (0);
905 } /* }}} int parse_part_encr_aes256 */
906
907
908 #undef BUFFER_READ
909
910
911 static int parse_packet (/* {{{ */
912     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
913     void *buffer, size_t buffer_size, int flags)
914 {
915   int status;
916
917   int packet_was_signed = (flags & PP_SIGNED);
918   int packet_was_encrypted = (flags & PP_ENCRYPTED);
919   int printed_ignore_warning = 0;
920
921 #define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL }
922   value_list_t vl = VALUE_LIST_INIT;
923   notification_t n;
924
925   memset (&vl, '\0', sizeof (vl));
926   memset (&n, '\0', sizeof (n));
927   status = 0;
928
929   while ((status == 0) && (0 < buffer_size)
930       && ((unsigned int) buffer_size > sizeof (part_header_t)))
931   {
932     uint16_t pkg_length;
933     uint16_t pkg_type;
934
935     memcpy ((void *) &pkg_type,
936         (void *) buffer,
937         sizeof (pkg_type));
938     memcpy ((void *) &pkg_length,
939         (void *) ((char *)buffer + sizeof (pkg_type)),
940         sizeof (pkg_length));
941
942     pkg_length = ntohs (pkg_length);
943     pkg_type = ntohs (pkg_type);
944
945     if (pkg_length > buffer_size)
946       break;
947     /* Ensure that this loop terminates eventually */
948     if (pkg_length < (2 * sizeof (uint16_t)))
949       break;
950
951
952     if (pkg_type == TYPE_ENCR_AES256)
953     {
954       status = parse_part_encr_aes256 (ccl, self, check,
955           &buffer, &buffer_size, flags);
956       if (status != 0)
957       {
958         noitL(noit_error, "collectd: Decrypting AES256 "
959             "part failed "
960             "with status %i.\n", status);
961         break;
962       }
963     }
964     else if ((ccl->security_level == SECURITY_LEVEL_ENCRYPT)
965         && (packet_was_encrypted == 0))
966     {
967       if (printed_ignore_warning == 0)
968       {
969         noitL(noit_debug, "collectd: Unencrypted packet or "
970             "part has been ignored.\n");
971         printed_ignore_warning = 1;
972       }
973       buffer = ((char *) buffer) + pkg_length;
974       continue;
975     }
976     else if (pkg_type == TYPE_SIGN_SHA256)
977     {
978       status = parse_part_sign_sha256 (ccl, self, check,
979                                         &buffer, &buffer_size, flags);
980       if (status != 0)
981       {
982         noitL(noit_error, "collectd: Verifying HMAC-SHA-256 "
983             "signature failed "
984             "with status %i.\n", status);
985         break;
986       }
987     }
988     else if ((ccl->security_level == SECURITY_LEVEL_SIGN)
989         && (packet_was_encrypted == 0)
990         && (packet_was_signed == 0))
991     {
992       if (printed_ignore_warning == 0)
993       {
994         noitL(noit_debug, "collectd: Unsigned packet or "
995             "part has been ignored.\n");
996         printed_ignore_warning = 1;
997       }
998       buffer = ((char *) buffer) + pkg_length;
999       continue;
1000     }
1001     else if (pkg_type == TYPE_VALUES)
1002     {
1003       status = parse_part_values (&buffer, &buffer_size,
1004           &vl.values, &vl.types, &vl.values_len);
1005
1006       if (status != 0)
1007         break;
1008
1009       if ((vl.time > 0)
1010           && (strlen (vl.host) > 0)
1011           && (strlen (vl.plugin) > 0)
1012           && (strlen (vl.type) > 0))
1013       {
1014         queue_values(ccl, self, check, &vl);
1015       }
1016       else
1017       {
1018         noitL(noit_error,
1019               "collectd: NOT dispatching values [%lld,%lld,%lld,%lld]\n",
1020               (long long int)vl.time, (long long int)strlen (vl.host),
1021               (long long int)strlen (vl.plugin),
1022               (long long int)strlen (vl.type));
1023       }
1024
1025       sfree (vl.values);
1026       sfree (vl.types);
1027     }
1028     else if (pkg_type == TYPE_TIME)
1029     {
1030       uint64_t tmp = 0;
1031       status = parse_part_number (&buffer, &buffer_size,
1032           &tmp);
1033       if (status == 0)
1034       {
1035         vl.time = (time_t) tmp;
1036         n.time = (time_t) tmp;
1037       }
1038     }
1039     else if (pkg_type == TYPE_INTERVAL)
1040     {
1041       uint64_t tmp = 0;
1042       status = parse_part_number (&buffer, &buffer_size,
1043           &tmp);
1044       if (status == 0)
1045         vl.interval = (int) tmp;
1046     }
1047     else if (pkg_type == TYPE_HOST)
1048     {
1049       status = parse_part_string (&buffer, &buffer_size,
1050           vl.host, sizeof (vl.host));
1051       if (status == 0)
1052         sstrncpy (n.host, vl.host, sizeof (n.host));
1053     }
1054     else if (pkg_type == TYPE_PLUGIN)
1055     {
1056       status = parse_part_string (&buffer, &buffer_size,
1057           vl.plugin, sizeof (vl.plugin));
1058       if (status == 0)
1059         sstrncpy (n.plugin, vl.plugin,
1060             sizeof (n.plugin));
1061     }
1062     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
1063     {
1064       status = parse_part_string (&buffer, &buffer_size,
1065           vl.plugin_instance,
1066           sizeof (vl.plugin_instance));
1067       if (status == 0)
1068         sstrncpy (n.plugin_instance,
1069             vl.plugin_instance,
1070             sizeof (n.plugin_instance));
1071     }
1072     else if (pkg_type == TYPE_TYPE)
1073     {
1074       status = parse_part_string (&buffer, &buffer_size,
1075           vl.type, sizeof (vl.type));
1076       if (status == 0)
1077         sstrncpy (n.type, vl.type, sizeof (n.type));
1078     }
1079     else if (pkg_type == TYPE_TYPE_INSTANCE)
1080     {
1081       status = parse_part_string (&buffer, &buffer_size,
1082           vl.type_instance,
1083           sizeof (vl.type_instance));
1084       if (status == 0)
1085         sstrncpy (n.type_instance, vl.type_instance,
1086             sizeof (n.type_instance));
1087     }
1088     else if (pkg_type == TYPE_MESSAGE)
1089     {
1090       status = parse_part_string (&buffer, &buffer_size,
1091           n.message, sizeof (n.message));
1092
1093       if (status != 0)
1094       {
1095         /* do nothing */
1096       }
1097       else if ((n.severity != NOTIF_FAILURE)
1098           && (n.severity != NOTIF_WARNING)
1099           && (n.severity != NOTIF_OKAY))
1100       {
1101         noitL(noit_error, "collectd: "
1102             "Ignoring notification with "
1103             "unknown severity %i.\n",
1104             n.severity);
1105       }
1106       else if (n.time <= 0)
1107       {
1108         noitL(noit_error, "collectd: "
1109             "Ignoring notification with "
1110             "time == 0.\n");
1111       }
1112       else if (strlen (n.message) <= 0)
1113       {
1114         noitL(noit_error, "collectd: "
1115             "Ignoring notification with "
1116             "an empty message.\n");
1117       }
1118       else
1119       {
1120         queue_notifications(ccl, self, check, &n);
1121         noitL(noit_error, "collectd: "
1122             "DISPATCH NOTIFICATION\n");
1123       }
1124     }
1125     else if (pkg_type == TYPE_SEVERITY)
1126     {
1127       uint64_t tmp = 0;
1128       status = parse_part_number (&buffer, &buffer_size,
1129           &tmp);
1130       if (status == 0)
1131         n.severity = (int) tmp;
1132     }
1133     else
1134     {
1135       noitL(noit_error, "collectd: parse_packet: Unknown part"
1136           " type: 0x%04hx\n", pkg_type);
1137       buffer = ((char *) buffer) + pkg_length;
1138     }
1139   } /* while (buffer_size > sizeof (part_header_t)) */
1140
1141   return (status);
1142 } /* }}} int parse_packet */
1143
1144
1145 // Not proud of this at all however; I am not sure best how to address this.
1146 static int infer_type(char *buffer, int buffer_len, value_list_t *vl, int index) {
1147   int len = strlen(buffer);
1148   strcat(buffer, "`");
1149   if (cmp_type(vl, "load", "load")) {
1150     assert(vl->values_len == 3);
1151     switch (index) {
1152       case 0:
1153         strcat(buffer, "1min"); break;
1154       case 1:
1155         strcat(buffer, "5min"); break;
1156       case 2:
1157         strcat(buffer, "15min"); break;
1158     }
1159   } else if (cmp_plugin(vl, "interface")) {
1160     assert(vl->values_len == 2);
1161     switch (index) {
1162       case 0:
1163         strcat(buffer, "rx"); break;
1164       case 1:
1165         strcat(buffer, "tx"); break;
1166     }
1167   } else {
1168     char buf[20];
1169     snprintf(buf, sizeof(buf), "%d", index);
1170     strcat(buffer, buf);
1171     noitL(noit_debug, "collectd: parsing multiple values"
1172         " and guessing on the type for plugin[%s] and type[%s]"
1173         , vl->plugin, vl->type);
1174   }
1175   return len;
1176 }
1177
1178 static void concat_metrics(char *buffer, char* plugin, char* plugin_inst, char* type, char* type_inst) {
1179   strcpy(buffer, plugin);
1180
1181   if (strlen(plugin_inst)) {
1182     strcat(buffer, "`");
1183     strcat(buffer, plugin_inst);
1184   }
1185   if (strlen(type)) {
1186     strcat(buffer, "`");
1187     strcat(buffer, type);
1188   }
1189   if (strlen(type_inst)) {
1190     strcat(buffer, "`");
1191     strcat(buffer, type_inst);
1192   }
1193 }
1194
1195 static int queue_notifications(collectd_closure_t *ccl,
1196       noit_module_t *self, noit_check_t *check, notification_t *n) {
1197   stats_t current;
1198   char buffer[DATA_MAX_NAME_LEN*4 + 128];
1199   collectd_mod_config_t *conf;
1200   conf = noit_module_get_userdata(self);
1201
1202   if(!conf->support_notifications) return 0;
1203   /* We are passive, so we don't do anything for transient checks */
1204   if(check->flags & NP_TRANSIENT) return 0;
1205
1206   noit_check_stats_clear(&current);
1207   gettimeofday(&current.whence, NULL);
1208
1209   // Concat all the names together so they fit into the flat noitd model
1210   concat_metrics(buffer, n->plugin, n->plugin_instance, n->type, n->type_instance);
1211   noit_stats_set_metric(&ccl->current, buffer, METRIC_STRING, n->message);
1212   noit_check_passive_set_stats(self, check, &current);
1213   noitL(nldeb, "collectd: dispatch_notifications(%s, %s, %s)\n",check->target, buffer, n->message);
1214   return 0;
1215 }
1216
1217
1218 static int queue_values(collectd_closure_t *ccl,
1219       noit_module_t *self, noit_check_t *check, value_list_t *vl) {
1220   char buffer[DATA_MAX_NAME_LEN*4 + 4 + 1 + 20];
1221   int i, len = 0;
1222
1223   // Concat all the names together so they fit into the flat noitd model
1224   concat_metrics(buffer, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1225   for (i=0; i < vl->values_len; i++) {
1226  
1227     // Only infer the type if the amount of values is greater than one
1228     if (vl->values_len > 1) {
1229       // Trunc the string 
1230       if (len > 0)
1231         buffer[len] = 0;
1232       len = infer_type(buffer, sizeof(buffer), vl, i);
1233     }
1234
1235     switch (vl->types[i])
1236     {
1237       case DS_TYPE_COUNTER:
1238         noit_stats_set_metric(&ccl->current, buffer, METRIC_UINT64, &vl->values[i].counter);
1239         break;
1240
1241       case DS_TYPE_GAUGE:
1242         noit_stats_set_metric(&ccl->current, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1243         break;
1244
1245       case DS_TYPE_DERIVE:
1246         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].derive);
1247         break;
1248
1249       case DS_TYPE_ABSOLUTE:
1250         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].absolute);
1251         break;
1252
1253       default:
1254         noitL(noit_debug, "collectd: parse_part_values: "
1255               "Don't know how to handle data source type %"PRIu8 "\n",
1256               vl->types[i]);
1257         return (-1);
1258     } /* switch (value_types[i]) */
1259     ccl->stats_count++;
1260     noitL(nldeb, "collectd: queue_values(%s, %s)\n", buffer, check->target);
1261   }
1262   return 0;
1263 }
1264
1265 static void clear_closure(collectd_closure_t *ccl) {
1266   ccl->stats_count = 0;
1267   ccl->ntfy_count = 0;
1268   noit_check_stats_clear(&ccl->current);
1269
1270 }
1271
1272 static int collectd_submit(noit_module_t *self, noit_check_t *check) {
1273   collectd_closure_t *ccl;
1274   struct timeval duration;
1275   /* We are passive, so we don't do anything for transient checks */
1276   if(check->flags & NP_TRANSIENT) return 0;
1277
1278   if(!check->closure) {
1279     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1280     memset(ccl, 0, sizeof(collectd_closure_t));
1281   } else {
1282     // Don't count the first run
1283     char human_buffer[256];
1284     ccl = (collectd_closure_t*)check->closure;
1285     gettimeofday(&ccl->current.whence, NULL);
1286     sub_timeval(ccl->current.whence, check->last_fire_time, &duration);
1287     ccl->current.duration = duration.tv_sec; // + duration.tv_usec / (1000 * 1000);
1288
1289     snprintf(human_buffer, sizeof(human_buffer),
1290              "dur=%d,run=%d,stats=%d,ntfy=%d", ccl->current.duration,
1291              check->generation, ccl->stats_count, ccl->ntfy_count);
1292     noitL(nldeb, "collectd(%s) [%s]\n", check->target, human_buffer);
1293
1294     // Not sure what to do here
1295     ccl->current.available = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1296         NP_AVAILABLE : NP_UNAVAILABLE;
1297     ccl->current.state = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1298         NP_GOOD : NP_BAD;
1299     ccl->current.status = human_buffer;
1300     noit_check_passive_set_stats(self, check, &ccl->current);
1301
1302     memcpy(&check->last_fire_time, &ccl->current.whence, sizeof(duration));
1303   }
1304   clear_closure(ccl);
1305   return 0;
1306 }
1307
1308 struct collectd_pkt {
1309   noit_module_t *self;  /* which collect load context */
1310   char *payload;
1311   int len;
1312 };
1313
1314 static int
1315 push_packet_at_check(noit_check_t *check, void *closure) {
1316   struct collectd_pkt *pkt = closure;
1317   collectd_closure_t *ccl;
1318   char *security_buffer;
1319   collectd_mod_config_t *conf;
1320   conf = noit_module_get_userdata(pkt->self);
1321
1322   /* We need a check, and a collectd one at that */
1323   if (!check || strcmp(check->module, pkt->self->hdr.name)) return 0;
1324
1325   // If its a new check retrieve some values
1326   if (check->closure == NULL) {
1327     // TODO: Verify if it could somehow retrieve data before the check closure exists
1328     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1329     memset(ccl, 0, sizeof(collectd_closure_t));
1330   } else {
1331     ccl = (collectd_closure_t*)check->closure;
1332   }
1333   // Default to NONE
1334   ccl->security_level = SECURITY_LEVEL_NONE;
1335   if (noit_hash_retr_str(check->config, "security_level", strlen("security_level"),
1336                          (const char**)&security_buffer))
1337   {
1338     ccl->security_level = atoi(security_buffer);
1339   }
1340
1341   // Is this outside to keep updates happening?
1342   if (!noit_hash_retr_str(check->config, "username", strlen("username"),
1343                          (const char**)&ccl->username) &&
1344       !noit_hash_retr_str(conf->options, "username", strlen("username"),
1345                          (const char**)&ccl->username))
1346   {
1347     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1348       noitL(nlerr, "collectd: no username defined for check.\n");
1349       return 0;
1350     } else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1351       noitL(nlerr, "collectd: no username defined for check, "
1352           "will accept any signed packet.\n");
1353     }
1354   }
1355
1356   if(!ccl->secret)
1357     noit_hash_retr_str(check->config, "secret", strlen("secret"),
1358                        (const char**)&ccl->secret);
1359   if(!ccl->secret)
1360     noit_hash_retr_str(conf->options, "secret", strlen("secret"),
1361                        (const char**)&ccl->secret);
1362   if(!ccl->secret) {
1363     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1364       noitL(nlerr, "collectd: no secret defined for check.\n");
1365       return 0;
1366     }
1367     else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1368       noitL(nlerr, "collectd: no secret defined for check, "
1369           "will accept any signed packet.\n");
1370     }
1371   }
1372
1373   parse_packet(ccl, pkt->self, check, pkt->payload, pkt->len, 0);
1374   return 1;
1375 }
1376
1377 static int noit_collectd_handler(eventer_t e, int mask, void *closure,
1378                              struct timeval *now) {
1379   union {
1380     struct sockaddr_in  skaddr;
1381     struct sockaddr_in6 skaddr6;
1382   } remote;
1383   char packet[1500];
1384   int packet_len = sizeof(packet);
1385   unsigned int from_len;
1386   char ip_p[INET6_ADDRSTRLEN];
1387   noit_module_t *self = (noit_module_t *)closure;
1388   collectd_mod_config_t *conf;
1389   conf = noit_module_get_userdata(self);
1390
1391
1392   // Get the username and password of the string
1393
1394   while(1) {
1395     struct collectd_pkt pkt;
1396     int inlen, check_cnt;
1397
1398     from_len = sizeof(remote);
1399
1400     inlen = recvfrom(e->fd, packet, packet_len, 0,
1401                      (struct sockaddr *)&remote, &from_len);
1402     gettimeofday(now, NULL); /* set it, as we care about accuracy */
1403
1404     if(inlen < 0) {
1405       if(errno == EAGAIN || errno == EINTR) break;
1406       noitLT(nlerr, now, "collectd: recvfrom: %s\n", strerror(errno));
1407       break;
1408     }
1409     if (from_len == sizeof(remote.skaddr)) {
1410       if (!inet_ntop(AF_INET, &(remote.skaddr.sin_addr), ip_p, INET_ADDRSTRLEN)) {
1411         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1412         break;
1413       }
1414     }
1415     else if(from_len == sizeof(remote.skaddr6)) {
1416       if (!inet_ntop(AF_INET6, &(remote.skaddr6.sin6_addr), ip_p, INET6_ADDRSTRLEN)) {
1417         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1418         break;
1419       }
1420     }
1421     else {
1422       noitLT(nlerr, now, "collectd: could not determine address family of remote\n");
1423       break;
1424     }
1425
1426     pkt.self = self;
1427     pkt.payload = packet;
1428     pkt.len = inlen;
1429     check_cnt = noit_poller_target_do(ip_p, push_packet_at_check ,&pkt);
1430     if(check_cnt == 0)
1431       noitL(nlerr, "collectd: No defined check from ip [%s].\n", ip_p);
1432   }
1433   return EVENTER_READ | EVENTER_EXCEPTION;
1434 }
1435
1436 static int noit_collectd_initiate_check(noit_module_t *self,
1437                                         noit_check_t *check,
1438                                         int once, noit_check_t *cause) {
1439   /* The idea is to write the collectd stuff to the stats one every period
1440    * Then we can warn people if no stats where written in a period of time
1441    */
1442   INITIATE_CHECK(collectd_submit, self, check);
1443   return 0;
1444 }
1445
1446 static int noit_collectd_config(noit_module_t *self, noit_hash_table *options) {
1447   collectd_mod_config_t *conf;
1448   conf = noit_module_get_userdata(self);
1449   if(conf) {
1450     if(conf->options) {
1451       noit_hash_destroy(conf->options, free, free);
1452       free(conf->options);
1453     }
1454   }
1455   else
1456     conf = calloc(1, sizeof(*conf));
1457   conf->options = options;
1458   noit_module_set_userdata(self, conf);
1459   return 1;
1460 }
1461
1462 static int noit_collectd_onload(noit_image_t *self) {
1463   if(!nlerr) nlerr = noit_log_stream_find("error/collectd");
1464   if(!nldeb) nldeb = noit_log_stream_find("debug/collectd");
1465   if(!nlerr) nlerr = noit_error;
1466   if(!nldeb) nldeb = noit_debug;
1467   eventer_name_callback("noit_collectd/handler", noit_collectd_handler);
1468   return 0;
1469 }
1470
1471 static int noit_collectd_init(noit_module_t *self) {
1472   const char *config_val;
1473   int sockaddr_len;
1474   collectd_mod_config_t *conf;
1475   conf = noit_module_get_userdata(self);
1476   int portint = 0;
1477   struct sockaddr_in skaddr;
1478   struct sockaddr_in6 skaddr6;
1479   struct in6_addr in6addr_any = IN6ADDR_ANY_INIT;
1480   const char *host;
1481   unsigned short port;
1482
1483   conf->support_notifications = noit_true;
1484   if(noit_hash_retr_str(conf->options,
1485                         "notifications", strlen("notifications"),
1486                         (const char **)&config_val)) {
1487     if(!strcasecmp(config_val, "false") || !strcasecmp(config_val, "off"))
1488       conf->support_notifications = noit_false;
1489   }
1490   /* Default Collectd port */
1491   portint = NET_DEFAULT_PORT;
1492   if(noit_hash_retr_str(conf->options,
1493                          "collectd_port", strlen("collectd_port"),
1494                          (const char**)&config_val))
1495     portint = atoi(config_val);
1496
1497
1498   if(!noit_hash_retr_str(conf->options,
1499                          "collectd_host", strlen("collectd_host"),
1500                          (const char**)&host))
1501     host = "*";
1502
1503   port = (unsigned short) portint;
1504
1505   conf->ipv4_fd = conf->ipv6_fd = -1;
1506
1507   conf->ipv4_fd = socket(PF_INET, SOCK_DGRAM, 0);
1508   if(conf->ipv4_fd < 0) {
1509     noitL(noit_error, "collectd: socket failed: %s\n",
1510           strerror(errno));
1511   }
1512   else {
1513     if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
1514       close(conf->ipv4_fd);
1515       conf->ipv4_fd = -1;
1516       noitL(noit_error,
1517             "collectd: could not set socket non-blocking: %s\n",
1518             strerror(errno));
1519     }
1520   }
1521   skaddr.sin_family = AF_INET;
1522   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
1523   skaddr.sin_port = htons(port);
1524
1525   sockaddr_len = sizeof(skaddr);
1526   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sockaddr_len) < 0) {
1527     noitL(noit_error, "bind failed[%s]: %s\n", host, strerror(errno));
1528     close(conf->ipv4_fd);
1529     return -1;
1530   }
1531
1532   if(conf->ipv4_fd >= 0) {
1533     eventer_t newe;
1534     newe = eventer_alloc();
1535     newe->fd = conf->ipv4_fd;
1536     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
1537     newe->callback = noit_collectd_handler;
1538     newe->closure = self;
1539     eventer_add(newe);
1540   }
1541
1542   conf->ipv6_fd = socket(AF_INET6, SOCK_DGRAM, 0);
1543   if(conf->ipv6_fd < 0) {
1544     noitL(noit_error, "collectd: IPv6 socket failed: %s\n",
1545           strerror(errno));
1546   }
1547   else {
1548     if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
1549       close(conf->ipv6_fd);
1550       conf->ipv6_fd = -1;
1551       noitL(noit_error,
1552             "collectd: could not set socket non-blocking: %s\n",
1553                strerror(errno));
1554     }
1555   }
1556   sockaddr_len = sizeof(skaddr6);
1557   memset(&skaddr6, 0, sizeof(skaddr6));
1558   skaddr6.sin6_family = AF_INET6;
1559   skaddr6.sin6_addr = in6addr_any;
1560   skaddr6.sin6_port = htons(port);
1561
1562   if(bind(conf->ipv6_fd, (struct sockaddr *)&skaddr6, sockaddr_len) < 0) {
1563     noitL(noit_error, "bind(IPv6) failed[%s]: %s\n", host, strerror(errno));
1564     close(conf->ipv6_fd);
1565     conf->ipv6_fd = -1;
1566   }
1567
1568   if(conf->ipv6_fd >= 0) {
1569     eventer_t newe;
1570     newe = eventer_alloc();
1571     newe->fd = conf->ipv6_fd;
1572     newe->mask = EVENTER_READ;
1573     newe->callback = noit_collectd_handler;
1574     newe->closure = self;
1575     eventer_add(newe);
1576   }
1577
1578   noit_module_set_userdata(self, conf);
1579   return 0;
1580 }
1581
1582 #include "collectd.xmlh"
1583 noit_module_t collectd = {
1584   {
1585     NOIT_MODULE_MAGIC,
1586     NOIT_MODULE_ABI_VERSION,
1587     "collectd",
1588     "collectd collection",
1589     collectd_xml_description,
1590     noit_collectd_onload
1591   },
1592   noit_collectd_config,
1593   noit_collectd_init,
1594   noit_collectd_initiate_check,
1595   NULL /* noit_collectd_cleanup */
1596 };
Note: See TracBrowser for help on using the browser.