root/src/modules/collectd.c

Revision d5328c3abdc93d2a2046cd50dded49a2a56bf523, 46.8 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

->self.hdr could actually be another module and it would match... just hardcode the collectd requirement

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009  Florian Forster
3  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
4  * Copyright (c) 2009, Dan Di Spaltro
5  * All rights reserved.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library, in file named COPYING.LGPL; if not,
19  * write to the Free Software Foundation, Inc., 59 Temple Place,
20  * Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 #include "noit_defines.h"
25
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <math.h>
31 #include <ctype.h>
32 #ifdef HAVE_SYS_FILIO_H
33 #include <sys/filio.h>
34 #endif
35 #include <openssl/evp.h>
36 #include <openssl/hmac.h>
37 #include <inttypes.h>
38 #include <arpa/inet.h>
39
40 #include "noit_module.h"
41 #include "noit_check.h"
42 #include "noit_check_tools.h"
43 #include "utils/noit_log.h"
44 #include "utils/noit_hash.h"
45
46
47 static noit_log_stream_t nlerr = NULL;
48 static noit_log_stream_t nldeb = NULL;
49
50 typedef struct _mod_config {
51   noit_hash_table *options;
52   noit_hash_table target_sessions;
53   noit_boolean support_notifications;
54   noit_boolean asynch_metrics;
55   int ipv4_fd;
56   int ipv6_fd;
57 } collectd_mod_config_t;
58
59 typedef struct collectd_closure_s {
60   char *username;
61   char *secret;
62   int security_level;
63   EVP_CIPHER_CTX ctx;
64   stats_t current;
65   int stats_count;
66   int ntfy_count;
67 } collectd_closure_t;
68
69 #define cmp_plugin(vl, val) \
70   (strncmp(vl->plugin, val, sizeof(val)) == 0)
71
72 #define cmp_type(vl, plugin, type) \
73   (cmp_plugin(vl, plugin) && \
74    cmp_plugin(vl, type))
75
76
77 /**
78  *
79  * Collectd Structs
80  *
81  *
82  **/
83
84 union meta_value_u
85 {
86   char    *mv_string;
87   int64_t  mv_signed_int;
88   uint64_t mv_unsigned_int;
89   double   mv_double;
90   _Bool    mv_boolean;
91 };
92 typedef union meta_value_u meta_value_t;
93
94 struct meta_entry_s;
95 typedef struct meta_entry_s meta_entry_t;
96 struct meta_entry_s
97 {
98   char         *key;
99   meta_value_t  value;
100   int           type;
101   meta_entry_t *next;
102 };
103
104 struct meta_data_s
105 {
106   meta_entry_t   *head;
107   pthread_mutex_t lock;
108 };
109
110 typedef struct meta_data_s meta_data_t;
111
112
113 #define NET_DEFAULT_V4_ADDR "239.192.74.66"
114 #define NET_DEFAULT_V6_ADDR "ff18::efc0:4a42"
115 #define NET_DEFAULT_PORT    25826
116
117 #define TYPE_HOST            0x0000
118 #define TYPE_TIME            0x0001
119 #define TYPE_PLUGIN          0x0002
120 #define TYPE_PLUGIN_INSTANCE 0x0003
121 #define TYPE_TYPE            0x0004
122 #define TYPE_TYPE_INSTANCE   0x0005
123 #define TYPE_VALUES          0x0006
124 #define TYPE_INTERVAL        0x0007
125
126 /* Types to transmit notifications */
127 #define TYPE_MESSAGE         0x0100
128 #define TYPE_SEVERITY        0x0101
129
130 #define TYPE_SIGN_SHA256     0x0200
131 #define TYPE_ENCR_AES256     0x0210
132
133 #define DATA_MAX_NAME_LEN 64
134
135 #define NOTIF_MAX_MSG_LEN 256
136 #define NOTIF_FAILURE 1
137 #define NOTIF_WARNING 2
138 #define NOTIF_OKAY    4
139
140 #define sfree(ptr) \
141   do { \
142     if((ptr) != NULL) { \
143       free(ptr); \
144     } \
145     (ptr) = NULL; \
146   } while (0)
147
148 #define sstrncpy(a, b, c) strncpy(a, b, c)
149
150 #define SECURITY_LEVEL_NONE     0
151 #define SECURITY_LEVEL_SIGN    1
152 #define SECURITY_LEVEL_ENCRYPT 2
153
154 #define DS_TYPE_COUNTER  0
155 #define DS_TYPE_GAUGE    1
156 #define DS_TYPE_DERIVE   2
157 #define DS_TYPE_ABSOLUTE 3
158
159 #define DS_TYPE_TO_STRING(t) (t == DS_TYPE_COUNTER)     ? "counter"  : \
160         (t == DS_TYPE_GAUGE)    ? "gauge"    : \
161         (t == DS_TYPE_DERIVE)   ? "derive"   : \
162         (t == DS_TYPE_ABSOLUTE) ? "absolute" : \
163         "unknown"
164
165 #define BUFF_SIZE 1024
166
167 typedef unsigned long long counter_t;
168 typedef double gauge_t;
169 typedef int64_t derive_t;
170 typedef uint64_t absolute_t;
171 int  interval_g;
172
173 union value_u
174 {
175   counter_t  counter;
176   gauge_t    gauge;
177   derive_t   derive;
178   absolute_t absolute;
179 };
180 typedef union value_u value_t;
181
182 struct value_list_s
183 {
184   value_t *values;
185   int      values_len;
186   time_t   time;
187   int      interval;
188   char     host[DATA_MAX_NAME_LEN];
189   char     plugin[DATA_MAX_NAME_LEN];
190   char     plugin_instance[DATA_MAX_NAME_LEN];
191   char     type[DATA_MAX_NAME_LEN];
192   char     type_instance[DATA_MAX_NAME_LEN];
193   meta_data_t *meta;
194   uint8_t  *types;
195 };
196 typedef struct value_list_s value_list_t;
197
198
199 enum notification_meta_type_e
200 {
201   NM_TYPE_STRING,
202   NM_TYPE_SIGNED_INT,
203   NM_TYPE_UNSIGNED_INT,
204   NM_TYPE_DOUBLE,
205   NM_TYPE_BOOLEAN
206 };
207
208 typedef struct notification_meta_s
209 {
210   char name[DATA_MAX_NAME_LEN];
211   enum notification_meta_type_e type;
212   union
213   {
214     const char *nm_string;
215     int64_t nm_signed_int;
216     uint64_t nm_unsigned_int;
217     double nm_double;
218     bool nm_boolean;
219   } nm_value;
220   struct notification_meta_s *next;
221 } notification_meta_t;
222
223 typedef struct notification_s
224 {
225   int    severity;
226   time_t time;
227   char   message[NOTIF_MAX_MSG_LEN];
228   char   host[DATA_MAX_NAME_LEN];
229   char   plugin[DATA_MAX_NAME_LEN];
230   char   plugin_instance[DATA_MAX_NAME_LEN];
231   char   type[DATA_MAX_NAME_LEN];
232   char   type_instance[DATA_MAX_NAME_LEN];
233   notification_meta_t *meta;
234 } notification_t;
235
236
237
238 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
239  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
240  * +-------+-----------------------+-------------------------------+
241  * ! Ver.  !                       ! Length                        !
242  * +-------+-----------------------+-------------------------------+
243  */
244 struct part_header_s
245 {
246   uint16_t type;
247   uint16_t length;
248 };
249 typedef struct part_header_s part_header_t;
250
251 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
252  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
253  * +-------------------------------+-------------------------------+
254  * ! Type                          ! Length                        !
255  * +-------------------------------+-------------------------------+
256  * : (Length - 4) Bytes                                            :
257  * +---------------------------------------------------------------+
258  */
259 struct part_string_s
260 {
261   part_header_t *head;
262   char *value;
263 };
264 typedef struct part_string_s part_string_t;
265
266 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
267  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
268  * +-------------------------------+-------------------------------+
269  * ! Type                          ! Length                        !
270  * +-------------------------------+-------------------------------+
271  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
272  * +---------------------------------------------------------------+
273  */
274 struct part_number_s
275 {
276   part_header_t *head;
277   uint64_t *value;
278 };
279 typedef struct part_number_s part_number_t;
280
281 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
282  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
283  * +-------------------------------+-------------------------------+
284  * ! Type                          ! Length                        !
285  * +-------------------------------+---------------+---------------+
286  * ! Num of values                 ! Type0         ! Type1         !
287  * +-------------------------------+---------------+---------------+
288  * ! Value0                                                        !
289  * !                                                               !
290  * +---------------------------------------------------------------+
291  * ! Value1                                                        !
292  * !                                                               !
293  * +---------------------------------------------------------------+
294  */
295 struct part_values_s
296 {
297   part_header_t *head;
298   uint16_t *num_values;
299   uint8_t  *values_types;
300   value_t  *values;
301 };
302 typedef struct part_values_s part_values_t;
303
304 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
305  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
306  * +-------------------------------+-------------------------------+
307  * ! Type                          ! Length                        !
308  * +-------------------------------+-------------------------------+
309  * ! Hash (Bits   0 -  31)                                         !
310  * : :                                                             :
311  * ! Hash (Bits 224 - 255)                                         !
312  * +---------------------------------------------------------------+
313  */
314 /* Minimum size */
315 #define PART_SIGNATURE_SHA256_SIZE 36
316 struct part_signature_sha256_s
317 {
318   part_header_t head;
319   unsigned char hash[32];
320   char *username;
321 };
322 typedef struct part_signature_sha256_s part_signature_sha256_t;
323
324 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
325  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
326  * +-------------------------------+-------------------------------+
327  * ! Type                          ! Length                        !
328  * +-------------------------------+-------------------------------+
329  * ! Original length               ! Padding (0 - 15 bytes)        !
330  * +-------------------------------+-------------------------------+
331  * ! Hash (Bits   0 -  31)                                         !
332  * : :                                                             :
333  * ! Hash (Bits 128 - 159)                                         !
334  * +---------------------------------------------------------------+
335  */
336 /* Minimum size */
337 #define PART_ENCRYPTION_AES256_SIZE 42
338 struct part_encryption_aes256_s
339 {
340   part_header_t head;
341   uint16_t username_length;
342   char *username;
343   unsigned char iv[16];
344   /* <encrypted> */
345   unsigned char hash[20];
346   /*   <payload /> */
347   /* </encrypted> */
348 };
349 typedef struct part_encryption_aes256_s part_encryption_aes256_t;
350
351 struct receive_list_entry_s
352 {
353   char data[BUFF_SIZE];
354   int  data_len;
355   int  fd;
356   struct receive_list_entry_s *next;
357 };
358 typedef struct receive_list_entry_s receive_list_entry_t;
359
360
361 /**
362  *
363  *  END Collectd Structs
364  *
365  */
366
367 static u_int64_t collectd_ntohll (u_int64_t n)
368 {
369 #if BYTE_ORDER == BIG_ENDIAN
370   return (n);
371 #else
372   u_int64_t retval;
373   retval = ((uint64_t) ntohl(n & 0xFFFFFFFFLLU)) << 32;
374   retval |= ntohl((n & 0xFFFFFFFF00000000LLU) >> 32);
375   return retval;
376 #endif
377 } /* u_int64_t collectd_ntohll */
378
379 #define ntohd(d) (d)
380
381 static noit_boolean
382 noit_collects_check_aynsch(noit_module_t *self,
383                            noit_check_t *check) {
384   const char *config_val;
385   collectd_mod_config_t *conf = noit_module_get_userdata(self);
386   noit_boolean is_asynch = conf->asynch_metrics;
387   if(noit_hash_retr_str(check->config,
388                         "asynch_metrics", strlen("asynch_metrics"),
389                         (const char **)&config_val)) {
390     if(!strcasecmp(config_val, "true") || !strcasecmp(config_val, "on"))
391       is_asynch = noit_true;
392   }
393
394   if(is_asynch) check->flags |= NP_SUPPRESS_METRICS;
395   else check->flags &= ~NP_SUPPRESS_METRICS;
396   return is_asynch;
397 }
398
399 /* Forward declare this so the parse function can use it */
400 static int queue_values(collectd_closure_t *ccl,
401   noit_module_t *self, noit_check_t *check, value_list_t *vl);
402 static int queue_notifications(collectd_closure_t *ccl,
403   noit_module_t *self, noit_check_t *check, notification_t *n);
404
405
406 static EVP_CIPHER_CTX* network_get_aes256_cypher (collectd_closure_t *ccl, /* {{{ */
407     const void *iv, size_t iv_size, const char *username)
408 {
409
410   if (ccl->secret == NULL)
411     return (NULL);
412   else
413   {
414     EVP_CIPHER_CTX *ctx_ptr;
415     EVP_MD_CTX ctx_md;
416     unsigned char password_hash[32];
417     unsigned int length = 0;
418     int success;
419
420     ctx_ptr = &ccl->ctx;
421
422     EVP_DigestInit(&ctx_md, EVP_sha256());
423     EVP_DigestUpdate(&ctx_md, ccl->secret, strlen(ccl->secret));
424     EVP_DigestFinal(&ctx_md, password_hash, &length);
425     EVP_MD_CTX_cleanup(&ctx_md);
426
427     assert(length <= 32);
428
429     success = EVP_DecryptInit(ctx_ptr, EVP_aes_256_ofb(), password_hash, iv);
430     if (success != 1)
431     {
432       noitL(noit_error, "collectd: EVP_DecryptInit returned: %d\n",
433           success);
434       return (NULL);
435     }
436     return (ctx_ptr);
437   }
438 } /* }}} int network_get_aes256_cypher */
439
440 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
441     value_t **ret_values, uint8_t **ret_types, int *ret_num_values)
442 {
443   char *buffer = *ret_buffer;
444   size_t buffer_len = *ret_buffer_len;
445
446   uint16_t tmp16;
447   size_t exp_size;
448   int   i;
449
450   uint16_t pkg_length;
451   uint16_t pkg_type;
452   uint16_t pkg_numval;
453
454   uint8_t *pkg_types;
455   value_t *pkg_values;
456
457   if (buffer_len < 15)
458   {
459     noitL(noit_error,"collectd: packet is too short: "
460         "buffer_len = %zu\n", buffer_len);
461     return (-1);
462   }
463
464   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
465   buffer += sizeof (tmp16);
466   pkg_type = ntohs (tmp16);
467
468   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
469   buffer += sizeof (tmp16);
470   pkg_length = ntohs (tmp16);
471
472   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
473   buffer += sizeof (tmp16);
474   pkg_numval = ntohs (tmp16);
475
476   assert (pkg_type == TYPE_VALUES);
477
478   exp_size = 3 * sizeof (uint16_t)
479     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
480   if ((buffer_len < 0) || (buffer_len < exp_size))
481   {
482     noitL(noit_error, "collectd: parse_part_values: "
483         "Packet too short: "
484         "Chunk of size %zu expected, "
485         "but buffer has only %zu bytes left.\n",
486         exp_size, buffer_len);
487     return (-1);
488   }
489
490   if (pkg_length != exp_size)
491   {
492     noitL(noit_debug, "collectd: parse_part_values: "
493         "Length and number of values "
494         "in the packet don't match.\n");
495     return (-1);
496   }
497
498   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
499   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
500   if ((pkg_types == NULL) || (pkg_values == NULL))
501   {
502     sfree (pkg_types);
503     sfree (pkg_values);
504     noitL(noit_error, "collectd: parse_part_values: malloc failed.\n");
505     return (-1);
506   }
507
508   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
509   buffer += pkg_numval * sizeof (uint8_t);
510   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
511   buffer += pkg_numval * sizeof (value_t);
512
513   for (i = 0; i < pkg_numval; i++)
514   {
515     switch (pkg_types[i])
516     {
517       case DS_TYPE_COUNTER:
518         pkg_values[i].counter = (counter_t) collectd_ntohll (pkg_values[i].counter);
519         break;
520
521       case DS_TYPE_GAUGE:
522         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
523         break;
524
525       case DS_TYPE_DERIVE:
526         pkg_values[i].derive = (derive_t) collectd_ntohll (pkg_values[i].derive);
527         break;
528
529       case DS_TYPE_ABSOLUTE:
530         pkg_values[i].absolute = (absolute_t) collectd_ntohll (pkg_values[i].absolute);
531         break;
532
533       default:
534         noitL(noit_debug, "collectd: parse_part_values: "
535       "Don't know how to handle data source type %"PRIu8 "\n",
536       pkg_types[i]);
537         sfree (pkg_types);
538         sfree (pkg_values);
539         return (-1);
540     } /* switch (pkg_types[i]) */
541   }
542
543   *ret_buffer     = buffer;
544   *ret_buffer_len = buffer_len - pkg_length;
545   *ret_num_values = pkg_numval;
546   *ret_types      = pkg_types;
547   *ret_values     = pkg_values;
548
549
550   return (0);
551 } /* int parse_part_values */
552
553 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
554     uint64_t *value)
555 {
556   char *buffer = *ret_buffer;
557   size_t buffer_len = *ret_buffer_len;
558
559   uint16_t tmp16;
560   uint64_t tmp64;
561   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
562
563   uint16_t pkg_length;
564
565   if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
566   {
567     noitL(noit_error, "collectd: parse_part_number: "
568         "Packet too short: "
569         "Chunk of size %zu expected, "
570         "but buffer has only %zu bytes left.\n",
571         exp_size, buffer_len);
572     return (-1);
573   }
574
575   /* skip pkg_type */
576   buffer += sizeof (tmp16);
577
578   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
579   buffer += sizeof (tmp16);
580   pkg_length = ntohs (tmp16);
581
582   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
583   buffer += sizeof (tmp64);
584   *value = collectd_ntohll (tmp64);
585
586   *ret_buffer = buffer;
587   *ret_buffer_len = buffer_len - pkg_length;
588
589   return (0);
590 } /* int parse_part_number */
591
592 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
593     char *output, int output_len)
594 {
595   char *buffer = *ret_buffer;
596   size_t buffer_len = *ret_buffer_len;
597
598   uint16_t tmp16;
599   size_t header_size = 2 * sizeof (uint16_t);
600
601   uint16_t pkg_length;
602
603   if ((buffer_len < 0) || (buffer_len < header_size))
604   {
605     noitL(noit_error, "collectd: parse_part_string: "
606         "Packet too short: "
607         "Chunk of at least size %zu expected, "
608         "but buffer has only %zu bytes left.\n",
609         header_size, buffer_len);
610     return (-1);
611   }
612
613   /* skip pkg_type */
614   buffer += sizeof (tmp16);
615
616   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
617   buffer += sizeof (tmp16);
618   pkg_length = ntohs (tmp16);
619
620   /* Check that packet fits in the input buffer */
621   if (pkg_length > buffer_len)
622   {
623     noitL(noit_error, "collectd: parse_part_string: "
624         "Packet too big: "
625         "Chunk of size %"PRIu16" received, "
626         "but buffer has only %zu bytes left.\n",
627         pkg_length, buffer_len);
628     return (-1);
629   }
630
631   /* Check that pkg_length is in the valid range */
632   if (pkg_length <= header_size)
633   {
634     noitL(noit_error, "collectd: parse_part_string: "
635         "Packet too short: "
636         "Header claims this packet is only %hu "
637         "bytes long.\n", pkg_length);
638     return (-1);
639   }
640
641   /* Check that the package data fits into the output buffer.
642    * The previous if-statement ensures that:
643    * `pkg_length > header_size' */
644   if ((output_len < 0)
645       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
646   {
647     noitL(noit_error, "collectd: parse_part_string: "
648         "Output buffer too small.\n");
649     return (-1);
650   }
651
652   /* All sanity checks successfull, let's copy the data over */
653   output_len = pkg_length - header_size;
654   memcpy ((void *) output, (void *) buffer, output_len);
655   buffer += output_len;
656
657   /* For some very weird reason '\0' doesn't do the trick on SPARC in
658    * this statement. */
659   if (output[output_len - 1] != 0)
660   {
661     noitL(noit_error, "collectd: parse_part_string: "
662         "Received string does not end "
663         "with a NULL-byte.\n");
664     return (-1);
665   }
666
667   *ret_buffer = buffer;
668   *ret_buffer_len = buffer_len - pkg_length;
669
670   return (0);
671 } /* int parse_part_string */
672
673
674 #define PP_SIGNED    0x01
675 #define PP_ENCRYPTED 0x02
676
677 #define BUFFER_READ(p,s) do { \
678   memcpy ((p), buffer + buffer_offset, (s)); \
679   buffer_offset += (s); \
680 } while (0)
681
682
683
684 // Forward declare
685 static int parse_packet (
686     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
687     void *buffer, size_t buffer_size, int flags);
688
689
690 static int parse_part_sign_sha256 (collectd_closure_t *ccl, noit_module_t *self,
691     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len, int flags)
692 {
693   unsigned char *buffer;
694   size_t buffer_len;
695   size_t buffer_offset;
696
697   size_t username_len;
698   part_signature_sha256_t pss;
699   uint16_t pss_head_length;
700   unsigned char hash[sizeof (pss.hash)];
701
702   unsigned char *hash_ptr;
703   unsigned int length;
704
705   buffer = *ret_buffer;
706   buffer_len = *ret_buffer_len;
707   buffer_offset = 0;
708
709   if (ccl->username == NULL)
710   {
711     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
712         "it because no user has been configured. Will accept it.\n");
713     return (0);
714   }
715
716   if (ccl->secret == NULL)
717   {
718     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
719         "it because no secret has been configured. Will accept it.\n");
720     return (0);
721   }
722
723   /* Check if the buffer has enough data for this structure. */
724   if (buffer_len <= PART_SIGNATURE_SHA256_SIZE)
725     return (-ENOMEM);
726
727   /* Read type and length header */
728   BUFFER_READ (&pss.head.type, sizeof (pss.head.type));
729   BUFFER_READ (&pss.head.length, sizeof (pss.head.length));
730   pss_head_length = ntohs (pss.head.length);
731
732   /* Check if the `pss_head_length' is within bounds. */
733   if ((pss_head_length <= PART_SIGNATURE_SHA256_SIZE)
734       || (pss_head_length > buffer_len))
735   {
736     noitL(noit_error, "collectd: HMAC-SHA-256 with invalid length received.\n");
737     return (-1);
738   }
739
740   /* Copy the hash. */
741   BUFFER_READ (pss.hash, sizeof (pss.hash));
742
743   /* Calculate username length (without null byte) and allocate memory */
744   username_len = pss_head_length - PART_SIGNATURE_SHA256_SIZE;
745   pss.username = malloc (username_len + 1);
746   if (pss.username == NULL)
747     return (-ENOMEM);
748
749   /* Read the username */
750   BUFFER_READ (pss.username, username_len);
751   pss.username[username_len] = 0;
752
753   assert (buffer_offset == pss_head_length);
754
755   /* Match up the username with the expected username */
756   if (strcmp(ccl->username, pss.username) != 0)
757   {
758     noitL(noit_error, "collectd: User: %s and Given User: %s don't match\n", ccl->username, pss.username);
759     sfree (pss.username);
760     return (-ENOENT);
761   }
762
763   /* Create a hash device and check the HMAC */
764   hash_ptr = HMAC(EVP_sha256(), ccl->secret, strlen(ccl->secret),
765       buffer     + PART_SIGNATURE_SHA256_SIZE,
766       buffer_len - PART_SIGNATURE_SHA256_SIZE,
767       hash,         &length);
768   if (hash_ptr == NULL)
769   {
770     noitL(noit_error, "collectd: Creating HMAC-SHA-256 object failed.\n");
771     sfree (pss.username);
772     return (-1);
773   }
774
775   /* Clean up */
776   sfree (pss.username);
777
778   if (memcmp (pss.hash, hash, sizeof (pss.hash)) != 0)
779   {
780     noitL(noit_error, "collectd: Verifying HMAC-SHA-256 signature failed: "
781         "Hash mismatch.\n");
782   }
783   else
784   {
785     parse_packet (ccl, self, check, buffer + buffer_offset, buffer_len - buffer_offset,
786         flags | PP_SIGNED);
787   }
788
789   *ret_buffer = buffer + buffer_len;
790   *ret_buffer_len = 0;
791
792   return (0);
793 } /* }}} int parse_part_sign_sha256 */
794 /* #endif HAVE_LIBGCRYPT */
795
796 static int parse_part_encr_aes256 (collectd_closure_t *ccl, noit_module_t *self,
797     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len,
798     int flags)
799 {
800   unsigned char  *buffer = *ret_buffer;
801   size_t buffer_len = *ret_buffer_len;
802   size_t payload_len;
803   size_t part_size;
804   size_t buffer_offset;
805   int    tmpbufsize;
806   unsigned char   *tmpbuf;
807   uint16_t username_len;
808   part_encryption_aes256_t pea;
809   unsigned char hash[sizeof (pea.hash)];
810   unsigned int hash_length;
811
812   EVP_CIPHER_CTX *ctx;
813   EVP_MD_CTX ctx_md;
814   int err;
815
816   /* Make sure at least the header if available. */
817   if (buffer_len <= PART_ENCRYPTION_AES256_SIZE)
818   {
819     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
820         "Discarding short packet.\n");
821     return (-1);
822   }
823
824   buffer_offset = 0;
825
826   /* Copy the unencrypted information into `pea'. */
827   BUFFER_READ (&pea.head.type, sizeof (pea.head.type));
828   BUFFER_READ (&pea.head.length, sizeof (pea.head.length));
829
830   /* Check the `part size'. */
831   part_size = ntohs (pea.head.length);
832   if ((part_size <= PART_ENCRYPTION_AES256_SIZE)
833       || (part_size > buffer_len))
834   {
835     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
836         "Discarding part with invalid size.\n");
837     return (-1);
838   }
839
840   /* Read the username */
841   BUFFER_READ (&username_len, sizeof (username_len));
842   username_len = ntohs (username_len);
843
844   if ((username_len <= 0)
845       || (username_len > (part_size - (PART_ENCRYPTION_AES256_SIZE + 1))))
846   {
847     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
848         "Discarding part with invalid username length.\n");
849     return (-1);
850   }
851
852   assert (username_len > 0);
853   pea.username = malloc (username_len + 1);
854   if (pea.username == NULL)
855     return (-ENOMEM);
856   BUFFER_READ (pea.username, username_len);
857   pea.username[username_len] = 0;
858
859   /* Last but not least, the initialization vector */
860   BUFFER_READ (pea.iv, sizeof (pea.iv));
861
862   /* Make sure we are at the right position */
863   assert (buffer_offset == (username_len +
864         PART_ENCRYPTION_AES256_SIZE - sizeof (pea.hash)));
865
866   /* Match up the username with the expected username */
867   if (strcmp(ccl->username, pea.username) != 0)
868   {
869     noitL(noit_error, "collectd: Username received and server side username don't match\n");
870     sfree (pea.username);
871     return (-ENOENT);
872   }
873
874   ctx = network_get_aes256_cypher (ccl, pea.iv, sizeof (pea.iv),
875       pea.username);
876   if (ctx == NULL)
877     return (-1);
878
879   payload_len = part_size - (PART_ENCRYPTION_AES256_SIZE + username_len);
880   assert (payload_len > 0);
881   tmpbuf = malloc(part_size - buffer_offset);
882
883   /* Decrypt the packet */
884   err = EVP_DecryptUpdate(ctx,
885       tmpbuf, &tmpbufsize,
886       buffer    + buffer_offset,
887       part_size - buffer_offset);
888   if (err != 1)
889   {
890     noitL(noit_error, "collectd: openssl returned: %d\n", err);
891     return (-1);
892   }
893
894   assert(part_size - buffer_offset == tmpbufsize);
895   /* Make it appear to be in place */
896   memcpy(buffer + buffer_offset, tmpbuf, part_size - buffer_offset);
897   sfree(tmpbuf);
898
899   /* Read the hash */
900   BUFFER_READ (pea.hash, sizeof (pea.hash));
901
902   /* Make sure we're at the right position - again */
903   assert (buffer_offset == (username_len + PART_ENCRYPTION_AES256_SIZE));
904   assert (buffer_offset == (part_size - payload_len));
905
906   /* Check hash sum */
907   memset (hash, 0, sizeof (hash));
908   EVP_DigestInit(&ctx_md, EVP_sha1());
909   EVP_DigestUpdate(&ctx_md, buffer + buffer_offset, payload_len);
910   EVP_DigestFinal(&ctx_md, hash, &hash_length);
911   if (memcmp (hash, pea.hash, sizeof (hash)) != 0)
912   {
913     noitL(noit_error, "collectd: Decryption failed: Checksum mismatch.\n");
914     return (-1);
915   }
916
917   parse_packet (ccl, self, check, buffer + buffer_offset, payload_len,
918       flags | PP_ENCRYPTED);
919
920   /* Update return values */
921   *ret_buffer =     buffer     + part_size;
922   *ret_buffer_len = buffer_len - part_size;
923   sfree(pea.username);
924
925   return (0);
926 } /* }}} int parse_part_encr_aes256 */
927
928
929 #undef BUFFER_READ
930
931
932 static int parse_packet (/* {{{ */
933     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
934     void *buffer, size_t buffer_size, int flags)
935 {
936   int status;
937
938   int packet_was_signed = (flags & PP_SIGNED);
939   int packet_was_encrypted = (flags & PP_ENCRYPTED);
940   int printed_ignore_warning = 0;
941
942 #define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL }
943   value_list_t vl = VALUE_LIST_INIT;
944   notification_t n;
945
946   memset (&vl, '\0', sizeof (vl));
947   memset (&n, '\0', sizeof (n));
948   status = 0;
949
950   while ((status == 0) && (0 < buffer_size)
951       && ((unsigned int) buffer_size > sizeof (part_header_t)))
952   {
953     uint16_t pkg_length;
954     uint16_t pkg_type;
955
956     memcpy ((void *) &pkg_type,
957         (void *) buffer,
958         sizeof (pkg_type));
959     memcpy ((void *) &pkg_length,
960         (void *) ((char *)buffer + sizeof (pkg_type)),
961         sizeof (pkg_length));
962
963     pkg_length = ntohs (pkg_length);
964     pkg_type = ntohs (pkg_type);
965
966     if (pkg_length > buffer_size)
967       break;
968     /* Ensure that this loop terminates eventually */
969     if (pkg_length < (2 * sizeof (uint16_t)))
970       break;
971
972
973     if (pkg_type == TYPE_ENCR_AES256)
974     {
975       status = parse_part_encr_aes256 (ccl, self, check,
976           &buffer, &buffer_size, flags);
977       if (status != 0)
978       {
979         noitL(noit_error, "collectd: Decrypting AES256 "
980             "part failed "
981             "with status %i.\n", status);
982         break;
983       }
984     }
985     else if ((ccl->security_level == SECURITY_LEVEL_ENCRYPT)
986         && (packet_was_encrypted == 0))
987     {
988       if (printed_ignore_warning == 0)
989       {
990         noitL(noit_debug, "collectd: Unencrypted packet or "
991             "part has been ignored.\n");
992         printed_ignore_warning = 1;
993       }
994       buffer = ((char *) buffer) + pkg_length;
995       continue;
996     }
997     else if (pkg_type == TYPE_SIGN_SHA256)
998     {
999       status = parse_part_sign_sha256 (ccl, self, check,
1000                                         &buffer, &buffer_size, flags);
1001       if (status != 0)
1002       {
1003         noitL(noit_error, "collectd: Verifying HMAC-SHA-256 "
1004             "signature failed "
1005             "with status %i.\n", status);
1006         break;
1007       }
1008     }
1009     else if ((ccl->security_level == SECURITY_LEVEL_SIGN)
1010         && (packet_was_encrypted == 0)
1011         && (packet_was_signed == 0))
1012     {
1013       if (printed_ignore_warning == 0)
1014       {
1015         noitL(noit_debug, "collectd: Unsigned packet or "
1016             "part has been ignored.\n");
1017         printed_ignore_warning = 1;
1018       }
1019       buffer = ((char *) buffer) + pkg_length;
1020       continue;
1021     }
1022     else if (pkg_type == TYPE_VALUES)
1023     {
1024       status = parse_part_values (&buffer, &buffer_size,
1025           &vl.values, &vl.types, &vl.values_len);
1026
1027       if (status != 0)
1028         break;
1029
1030       if ((strlen (vl.host) > 0) &&
1031           (strlen (vl.plugin) > 0) &&
1032           (strlen (vl.type) > 0))
1033       {
1034         queue_values(ccl, self, check, &vl);
1035       }
1036       else
1037       {
1038         noitL(noit_error,
1039               "collectd: NOT dispatching values [%lld,%s:%s:%s]\n",
1040               (long long int)vl.time, vl.host, vl.plugin, vl.type);
1041       }
1042
1043       sfree (vl.values);
1044       sfree (vl.types);
1045     }
1046     else if (pkg_type == TYPE_TIME)
1047     {
1048       uint64_t tmp = 0;
1049       status = parse_part_number (&buffer, &buffer_size, &tmp);
1050       if (status == 0)
1051       {
1052         vl.time = (time_t) tmp;
1053         n.time = (time_t) tmp;
1054       }
1055     }
1056     else if (pkg_type == TYPE_INTERVAL)
1057     {
1058       uint64_t tmp = 0;
1059       status = parse_part_number (&buffer, &buffer_size,
1060           &tmp);
1061       if (status == 0)
1062         vl.interval = (int) tmp;
1063     }
1064     else if (pkg_type == TYPE_HOST)
1065     {
1066       status = parse_part_string (&buffer, &buffer_size,
1067           vl.host, sizeof (vl.host));
1068       if (status == 0)
1069         sstrncpy (n.host, vl.host, sizeof (n.host));
1070     }
1071     else if (pkg_type == TYPE_PLUGIN)
1072     {
1073       status = parse_part_string (&buffer, &buffer_size,
1074           vl.plugin, sizeof (vl.plugin));
1075       if (status == 0)
1076         sstrncpy (n.plugin, vl.plugin,
1077             sizeof (n.plugin));
1078     }
1079     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
1080     {
1081       status = parse_part_string (&buffer, &buffer_size,
1082           vl.plugin_instance,
1083           sizeof (vl.plugin_instance));
1084       if (status == 0)
1085         sstrncpy (n.plugin_instance,
1086             vl.plugin_instance,
1087             sizeof (n.plugin_instance));
1088     }
1089     else if (pkg_type == TYPE_TYPE)
1090     {
1091       status = parse_part_string (&buffer, &buffer_size,
1092           vl.type, sizeof (vl.type));
1093       if (status == 0)
1094         sstrncpy (n.type, vl.type, sizeof (n.type));
1095     }
1096     else if (pkg_type == TYPE_TYPE_INSTANCE)
1097     {
1098       status = parse_part_string (&buffer, &buffer_size,
1099           vl.type_instance,
1100           sizeof (vl.type_instance));
1101       if (status == 0)
1102         sstrncpy (n.type_instance, vl.type_instance,
1103             sizeof (n.type_instance));
1104     }
1105     else if (pkg_type == TYPE_MESSAGE)
1106     {
1107       status = parse_part_string (&buffer, &buffer_size,
1108           n.message, sizeof (n.message));
1109
1110       if (status != 0)
1111       {
1112         /* do nothing */
1113       }
1114       else if ((n.severity != NOTIF_FAILURE)
1115           && (n.severity != NOTIF_WARNING)
1116           && (n.severity != NOTIF_OKAY))
1117       {
1118         noitL(noit_error, "collectd: "
1119             "Ignoring notification with "
1120             "unknown severity %i.\n",
1121             n.severity);
1122       }
1123 /* Time proves untrustworthy... and we don't use it.
1124       else if (n.time <= 0)
1125       {
1126         noitL(noit_error, "collectd: "
1127             "Ignoring notification with "
1128             "time == 0.\n");
1129       }
1130 */
1131       else if (strlen (n.message) <= 0)
1132       {
1133         noitL(noit_error, "collectd: "
1134             "Ignoring notification with "
1135             "an empty message.\n");
1136       }
1137       else
1138       {
1139         queue_notifications(ccl, self, check, &n);
1140         noitL(noit_error, "collectd: "
1141             "DISPATCH NOTIFICATION\n");
1142       }
1143     }
1144     else if (pkg_type == TYPE_SEVERITY)
1145     {
1146       uint64_t tmp = 0;
1147       status = parse_part_number (&buffer, &buffer_size,
1148           &tmp);
1149       if (status == 0)
1150         n.severity = (int) tmp;
1151     }
1152     else
1153     {
1154       noitL(noit_error, "collectd: parse_packet: Unknown part"
1155           " type: 0x%04hx\n", pkg_type);
1156       buffer = ((char *) buffer) + pkg_length;
1157     }
1158   } /* while (buffer_size > sizeof (part_header_t)) */
1159
1160   return (status);
1161 } /* }}} int parse_packet */
1162
1163
1164 // Not proud of this at all however; I am not sure best how to address this.
1165 static int infer_type(char *buffer, int buffer_len, value_list_t *vl, int index) {
1166   int len = strlen(buffer);
1167   strcat(buffer, "`");
1168   if (cmp_type(vl, "load", "load")) {
1169     assert(vl->values_len == 3);
1170     switch (index) {
1171       case 0:
1172         strcat(buffer, "1min"); break;
1173       case 1:
1174         strcat(buffer, "5min"); break;
1175       case 2:
1176         strcat(buffer, "15min"); break;
1177     }
1178   } else if (cmp_plugin(vl, "interface")) {
1179     assert(vl->values_len == 2);
1180     switch (index) {
1181       case 0:
1182         strcat(buffer, "rx"); break;
1183       case 1:
1184         strcat(buffer, "tx"); break;
1185     }
1186   } else {
1187     char buf[20];
1188     snprintf(buf, sizeof(buf), "%d", index);
1189     strcat(buffer, buf);
1190     noitL(noit_debug, "collectd: parsing multiple values"
1191         " and guessing on the type for plugin[%s] and type[%s]"
1192         , vl->plugin, vl->type);
1193   }
1194   return len;
1195 }
1196
1197 static void concat_metrics(char *buffer, char* plugin, char* plugin_inst, char* type, char* type_inst) {
1198   strcpy(buffer, plugin);
1199
1200   if (strlen(plugin_inst)) {
1201     strcat(buffer, "`");
1202     strcat(buffer, plugin_inst);
1203   }
1204   if (strlen(type)) {
1205     strcat(buffer, "`");
1206     strcat(buffer, type);
1207   }
1208   if (strlen(type_inst)) {
1209     strcat(buffer, "`");
1210     strcat(buffer, type_inst);
1211   }
1212 }
1213
1214 static int queue_notifications(collectd_closure_t *ccl,
1215       noit_module_t *self, noit_check_t *check, notification_t *n) {
1216   stats_t current;
1217   noit_boolean immediate;
1218   char buffer[DATA_MAX_NAME_LEN*4 + 128];
1219   collectd_mod_config_t *conf;
1220   conf = noit_module_get_userdata(self);
1221
1222   if(!conf->support_notifications) return 0;
1223   /* We are passive, so we don't do anything for transient checks */
1224   if(check->flags & NP_TRANSIENT) return 0;
1225
1226   noit_check_stats_clear(&current);
1227   gettimeofday(&current.whence, NULL);
1228
1229   // Concat all the names together so they fit into the flat noitd model
1230   concat_metrics(buffer, n->plugin, n->plugin_instance, n->type, n->type_instance);
1231   noit_stats_set_metric(&ccl->current, buffer, METRIC_STRING, n->message);
1232   immediate = noit_collects_check_aynsch(self,check);
1233   if(immediate)
1234     noit_stats_log_immediate_metric(check, buffer, METRIC_STRING, n->message);
1235   noit_check_passive_set_stats(self, check, &current);
1236   noitL(nldeb, "collectd: dispatch_notifications(%s, %s, %s)\n",check->target, buffer, n->message);
1237   return 0;
1238 }
1239
1240
1241 static int queue_values(collectd_closure_t *ccl,
1242       noit_module_t *self, noit_check_t *check, value_list_t *vl) {
1243   noit_boolean immediate;
1244   char buffer[DATA_MAX_NAME_LEN*4 + 4 + 1 + 20];
1245   int i, len = 0;
1246
1247   // Concat all the names together so they fit into the flat noitd model
1248   immediate = noit_collects_check_aynsch(self,check);
1249   concat_metrics(buffer, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1250   for (i=0; i < vl->values_len; i++) {
1251  
1252     // Only infer the type if the amount of values is greater than one
1253     if (vl->values_len > 1) {
1254       // Trunc the string 
1255       if (len > 0)
1256         buffer[len] = 0;
1257       len = infer_type(buffer, sizeof(buffer), vl, i);
1258     }
1259
1260     switch (vl->types[i])
1261     {
1262       case DS_TYPE_COUNTER:
1263         noit_stats_set_metric(&ccl->current, buffer, METRIC_UINT64, &vl->values[i].counter);
1264         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_UINT64, &vl->values[i].counter);
1265         break;
1266
1267       case DS_TYPE_GAUGE:
1268         noit_stats_set_metric(&ccl->current, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1269         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1270         break;
1271
1272       case DS_TYPE_DERIVE:
1273         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].derive);
1274         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_INT64, &vl->values[i].derive);
1275         break;
1276
1277       case DS_TYPE_ABSOLUTE:
1278         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].absolute);
1279         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_INT64, &vl->values[i].absolute);
1280         break;
1281
1282       default:
1283         noitL(noit_debug, "collectd: parse_part_values: "
1284               "Don't know how to handle data source type %"PRIu8 "\n",
1285               vl->types[i]);
1286         return (-1);
1287     } /* switch (value_types[i]) */
1288     ccl->stats_count++;
1289     noitL(nldeb, "collectd: queue_values(%s, %s)\n", buffer, check->target);
1290   }
1291   return 0;
1292 }
1293
1294 static void clear_closure(collectd_closure_t *ccl) {
1295   ccl->stats_count = 0;
1296   ccl->ntfy_count = 0;
1297   noit_check_stats_clear(&ccl->current);
1298
1299 }
1300
1301 static int collectd_submit(noit_module_t *self, noit_check_t *check) {
1302   collectd_closure_t *ccl;
1303   struct timeval duration;
1304   /* We are passive, so we don't do anything for transient checks */
1305   if(check->flags & NP_TRANSIENT) return 0;
1306
1307   noit_collects_check_aynsch(self, check);
1308   if(!check->closure) {
1309     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1310     memset(ccl, 0, sizeof(collectd_closure_t));
1311   } else {
1312     // Don't count the first run
1313     char human_buffer[256];
1314     ccl = (collectd_closure_t*)check->closure;
1315     gettimeofday(&ccl->current.whence, NULL);
1316     sub_timeval(ccl->current.whence, check->last_fire_time, &duration);
1317     ccl->current.duration = duration.tv_sec; // + duration.tv_usec / (1000 * 1000);
1318
1319     snprintf(human_buffer, sizeof(human_buffer),
1320              "dur=%d,run=%d,stats=%d,ntfy=%d", ccl->current.duration,
1321              check->generation, ccl->stats_count, ccl->ntfy_count);
1322     noitL(nldeb, "collectd(%s) [%s]\n", check->target, human_buffer);
1323
1324     // Not sure what to do here
1325     ccl->current.available = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1326         NP_AVAILABLE : NP_UNAVAILABLE;
1327     ccl->current.state = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1328         NP_GOOD : NP_BAD;
1329     ccl->current.status = human_buffer;
1330     noit_check_passive_set_stats(self, check, &ccl->current);
1331
1332     memcpy(&check->last_fire_time, &ccl->current.whence, sizeof(duration));
1333   }
1334   clear_closure(ccl);
1335   return 0;
1336 }
1337
1338 struct collectd_pkt {
1339   noit_module_t *self;  /* which collect load context */
1340   char *payload;
1341   int len;
1342 };
1343
1344 static int
1345 push_packet_at_check(noit_check_t *check, void *closure) {
1346   struct collectd_pkt *pkt = closure;
1347   collectd_closure_t *ccl;
1348   char *security_buffer;
1349   collectd_mod_config_t *conf;
1350   conf = noit_module_get_userdata(pkt->self);
1351
1352   /* We need a check, and a collectd one at that */
1353   if (!check || strcmp(check->module, "collectd")) return 0;
1354
1355   // If its a new check retrieve some values
1356   if (check->closure == NULL) {
1357     // TODO: Verify if it could somehow retrieve data before the check closure exists
1358     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1359     memset(ccl, 0, sizeof(collectd_closure_t));
1360   } else {
1361     ccl = (collectd_closure_t*)check->closure;
1362   }
1363   // Default to NONE
1364   ccl->security_level = SECURITY_LEVEL_NONE;
1365   if (noit_hash_retr_str(check->config, "security_level", strlen("security_level"),
1366                          (const char**)&security_buffer))
1367   {
1368     ccl->security_level = atoi(security_buffer);
1369   }
1370
1371   // Is this outside to keep updates happening?
1372   if (!noit_hash_retr_str(check->config, "username", strlen("username"),
1373                          (const char**)&ccl->username) &&
1374       !noit_hash_retr_str(conf->options, "username", strlen("username"),
1375                          (const char**)&ccl->username))
1376   {
1377     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1378       noitL(nlerr, "collectd: no username defined for check.\n");
1379       return 0;
1380     } else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1381       noitL(nlerr, "collectd: no username defined for check, "
1382           "will accept any signed packet.\n");
1383     }
1384   }
1385
1386   if(!ccl->secret)
1387     noit_hash_retr_str(check->config, "secret", strlen("secret"),
1388                        (const char**)&ccl->secret);
1389   if(!ccl->secret)
1390     noit_hash_retr_str(conf->options, "secret", strlen("secret"),
1391                        (const char**)&ccl->secret);
1392   if(!ccl->secret) {
1393     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1394       noitL(nlerr, "collectd: no secret defined for check.\n");
1395       return 0;
1396     }
1397     else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1398       noitL(nlerr, "collectd: no secret defined for check, "
1399           "will accept any signed packet.\n");
1400     }
1401   }
1402
1403   parse_packet(ccl, pkt->self, check, pkt->payload, pkt->len, 0);
1404   return 1;
1405 }
1406
1407 static int noit_collectd_handler(eventer_t e, int mask, void *closure,
1408                              struct timeval *now) {
1409   union {
1410     struct sockaddr_in  skaddr;
1411     struct sockaddr_in6 skaddr6;
1412   } remote;
1413   char packet[1500];
1414   int packet_len = sizeof(packet);
1415   unsigned int from_len;
1416   char ip_p[INET6_ADDRSTRLEN];
1417   noit_module_t *self = (noit_module_t *)closure;
1418   collectd_mod_config_t *conf;
1419   conf = noit_module_get_userdata(self);
1420
1421
1422   // Get the username and password of the string
1423
1424   while(1) {
1425     struct collectd_pkt pkt;
1426     int inlen, check_cnt;
1427
1428     from_len = sizeof(remote);
1429
1430     inlen = recvfrom(e->fd, packet, packet_len, 0,
1431                      (struct sockaddr *)&remote, &from_len);
1432     gettimeofday(now, NULL); /* set it, as we care about accuracy */
1433
1434     if(inlen < 0) {
1435       if(errno == EAGAIN || errno == EINTR) break;
1436       noitLT(nlerr, now, "collectd: recvfrom: %s\n", strerror(errno));
1437       break;
1438     }
1439     if (from_len == sizeof(remote.skaddr)) {
1440       if (!inet_ntop(AF_INET, &(remote.skaddr.sin_addr), ip_p, INET_ADDRSTRLEN)) {
1441         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1442         break;
1443       }
1444     }
1445     else if(from_len == sizeof(remote.skaddr6)) {
1446       if (!inet_ntop(AF_INET6, &(remote.skaddr6.sin6_addr), ip_p, INET6_ADDRSTRLEN)) {
1447         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1448         break;
1449       }
1450     }
1451     else {
1452       noitLT(nlerr, now, "collectd: could not determine address family of remote\n");
1453       break;
1454     }
1455
1456     pkt.self = self;
1457     pkt.payload = packet;
1458     pkt.len = inlen;
1459     check_cnt = noit_poller_target_do(ip_p, push_packet_at_check ,&pkt);
1460     if(check_cnt == 0)
1461       noitL(nlerr, "collectd: No defined check from ip [%s].\n", ip_p);
1462   }
1463   return EVENTER_READ | EVENTER_EXCEPTION;
1464 }
1465
1466 static int noit_collectd_initiate_check(noit_module_t *self,
1467                                         noit_check_t *check,
1468                                         int once, noit_check_t *cause) {
1469   /* The idea is to write the collectd stuff to the stats one every period
1470    * Then we can warn people if no stats where written in a period of time
1471    */
1472   INITIATE_CHECK(collectd_submit, self, check);
1473   return 0;
1474 }
1475
1476 static int noit_collectd_config(noit_module_t *self, noit_hash_table *options) {
1477   collectd_mod_config_t *conf;
1478   conf = noit_module_get_userdata(self);
1479   if(conf) {
1480     if(conf->options) {
1481       noit_hash_destroy(conf->options, free, free);
1482       free(conf->options);
1483     }
1484   }
1485   else
1486     conf = calloc(1, sizeof(*conf));
1487   conf->options = options;
1488   noit_module_set_userdata(self, conf);
1489   return 1;
1490 }
1491
1492 static int noit_collectd_onload(noit_image_t *self) {
1493   if(!nlerr) nlerr = noit_log_stream_find("error/collectd");
1494   if(!nldeb) nldeb = noit_log_stream_find("debug/collectd");
1495   if(!nlerr) nlerr = noit_error;
1496   if(!nldeb) nldeb = noit_debug;
1497   eventer_name_callback("noit_collectd/handler", noit_collectd_handler);
1498   return 0;
1499 }
1500
1501 static int noit_collectd_init(noit_module_t *self) {
1502   const char *config_val;
1503   int sockaddr_len;
1504   collectd_mod_config_t *conf;
1505   conf = noit_module_get_userdata(self);
1506   int portint = 0;
1507   struct sockaddr_in skaddr;
1508   struct sockaddr_in6 skaddr6;
1509   struct in6_addr in6addr_any = IN6ADDR_ANY_INIT;
1510   const char *host;
1511   unsigned short port;
1512
1513   conf->support_notifications = noit_true;
1514   if(noit_hash_retr_str(conf->options,
1515                         "notifications", strlen("notifications"),
1516                         (const char **)&config_val)) {
1517     if(!strcasecmp(config_val, "false") || !strcasecmp(config_val, "off"))
1518       conf->support_notifications = noit_false;
1519   }
1520   conf->asynch_metrics = noit_false;
1521   if(noit_hash_retr_str(conf->options,
1522                         "asynch_metrics", strlen("asynch_metrics"),
1523                         (const char **)&config_val)) {
1524     if(!strcasecmp(config_val, "true") || !strcasecmp(config_val, "on"))
1525       conf->asynch_metrics = noit_true;
1526   }
1527
1528   /* Default Collectd port */
1529   portint = NET_DEFAULT_PORT;
1530   if(noit_hash_retr_str(conf->options,
1531                          "collectd_port", strlen("collectd_port"),
1532                          (const char**)&config_val))
1533     portint = atoi(config_val);
1534
1535
1536   if(!noit_hash_retr_str(conf->options,
1537                          "collectd_host", strlen("collectd_host"),
1538                          (const char**)&host))
1539     host = "*";
1540
1541   port = (unsigned short) portint;
1542
1543   conf->ipv4_fd = conf->ipv6_fd = -1;
1544
1545   conf->ipv4_fd = socket(PF_INET, SOCK_DGRAM, 0);
1546   if(conf->ipv4_fd < 0) {
1547     noitL(noit_error, "collectd: socket failed: %s\n",
1548           strerror(errno));
1549   }
1550   else {
1551     if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
1552       close(conf->ipv4_fd);
1553       conf->ipv4_fd = -1;
1554       noitL(noit_error,
1555             "collectd: could not set socket non-blocking: %s\n",
1556             strerror(errno));
1557     }
1558   }
1559   skaddr.sin_family = AF_INET;
1560   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
1561   skaddr.sin_port = htons(port);
1562
1563   sockaddr_len = sizeof(skaddr);
1564   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sockaddr_len) < 0) {
1565     noitL(noit_error, "bind failed[%s]: %s\n", host, strerror(errno));
1566     close(conf->ipv4_fd);
1567     return -1;
1568   }
1569
1570   if(conf->ipv4_fd >= 0) {
1571     eventer_t newe;
1572     newe = eventer_alloc();
1573     newe->fd = conf->ipv4_fd;
1574     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
1575     newe->callback = noit_collectd_handler;
1576     newe->closure = self;
1577     eventer_add(newe);
1578   }
1579
1580   conf->ipv6_fd = socket(AF_INET6, SOCK_DGRAM, 0);
1581   if(conf->ipv6_fd < 0) {
1582     noitL(noit_error, "collectd: IPv6 socket failed: %s\n",
1583           strerror(errno));
1584   }
1585   else {
1586     if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
1587       close(conf->ipv6_fd);
1588       conf->ipv6_fd = -1;
1589       noitL(noit_error,
1590             "collectd: could not set socket non-blocking: %s\n",
1591                strerror(errno));
1592     }
1593   }
1594   sockaddr_len = sizeof(skaddr6);
1595   memset(&skaddr6, 0, sizeof(skaddr6));
1596   skaddr6.sin6_family = AF_INET6;
1597   skaddr6.sin6_addr = in6addr_any;
1598   skaddr6.sin6_port = htons(port);
1599
1600   if(bind(conf->ipv6_fd, (struct sockaddr *)&skaddr6, sockaddr_len) < 0) {
1601     noitL(noit_error, "bind(IPv6) failed[%s]: %s\n", host, strerror(errno));
1602     close(conf->ipv6_fd);
1603     conf->ipv6_fd = -1;
1604   }
1605
1606   if(conf->ipv6_fd >= 0) {
1607     eventer_t newe;
1608     newe = eventer_alloc();
1609     newe->fd = conf->ipv6_fd;
1610     newe->mask = EVENTER_READ;
1611     newe->callback = noit_collectd_handler;
1612     newe->closure = self;
1613     eventer_add(newe);
1614   }
1615
1616   noit_module_set_userdata(self, conf);
1617   return 0;
1618 }
1619
1620 #include "collectd.xmlh"
1621 noit_module_t collectd = {
1622   {
1623     NOIT_MODULE_MAGIC,
1624     NOIT_MODULE_ABI_VERSION,
1625     "collectd",
1626     "collectd collection",
1627     collectd_xml_description,
1628     noit_collectd_onload
1629   },
1630   noit_collectd_config,
1631   noit_collectd_init,
1632   noit_collectd_initiate_check,
1633   NULL /* noit_collectd_cleanup */
1634 };
Note: See TracBrowser for help on using the browser.