root/src/modules/collectd.c

Revision f91ddca09660d8415b1bcdd88b20946e82b0ef62, 46.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

propagate the cause for causal checks into all of the calls

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2005-2009  Florian Forster
3  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
4  * Copyright (c) 2009, Dan Di Spaltro
5  * All rights reserved.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2.1 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library, in file named COPYING.LGPL; if not,
19  * write to the Free Software Foundation, Inc., 59 Temple Place,
20  * Suite 330, Boston, MA  02111-1307  USA
21  *
22  */
23
24 #include "noit_defines.h"
25
26 #include <stdio.h>
27 #include <unistd.h>
28 #include <errno.h>
29 #include <assert.h>
30 #include <math.h>
31 #include <ctype.h>
32 #ifdef HAVE_SYS_FILIO_H
33 #include <sys/filio.h>
34 #endif
35 #include <openssl/evp.h>
36 #include <openssl/hmac.h>
37 #include <inttypes.h>
38 #include <netinet/in.h>
39 #include <arpa/inet.h>
40
41 #include "noit_module.h"
42 #include "noit_check.h"
43 #include "noit_check_tools.h"
44 #include "utils/noit_log.h"
45 #include "utils/noit_hash.h"
46
47
48 static noit_log_stream_t nlerr = NULL;
49 static noit_log_stream_t nldeb = NULL;
50
51 typedef struct _mod_config {
52   noit_hash_table *options;
53   noit_hash_table target_sessions;
54   noit_boolean support_notifications;
55   noit_boolean asynch_metrics;
56   int ipv4_fd;
57   int ipv6_fd;
58 } collectd_mod_config_t;
59
60 typedef struct collectd_closure_s {
61   char *username;
62   char *secret;
63   int security_level;
64   EVP_CIPHER_CTX ctx;
65   stats_t current;
66   int stats_count;
67   int ntfy_count;
68 } collectd_closure_t;
69
70 #define cmp_plugin(vl, val) \
71   (strncmp(vl->plugin, val, sizeof(val)) == 0)
72
73 #define cmp_type(vl, plugin, type) \
74   (cmp_plugin(vl, plugin) && \
75    cmp_plugin(vl, type))
76
77
78 /**
79  *
80  * Collectd Structs
81  *
82  *
83  **/
84
85 union meta_value_u
86 {
87   char    *mv_string;
88   int64_t  mv_signed_int;
89   uint64_t mv_unsigned_int;
90   double   mv_double;
91   _Bool    mv_boolean;
92 };
93 typedef union meta_value_u meta_value_t;
94
95 struct meta_entry_s;
96 typedef struct meta_entry_s meta_entry_t;
97 struct meta_entry_s
98 {
99   char         *key;
100   meta_value_t  value;
101   int           type;
102   meta_entry_t *next;
103 };
104
105 struct meta_data_s
106 {
107   meta_entry_t   *head;
108   pthread_mutex_t lock;
109 };
110
111 typedef struct meta_data_s meta_data_t;
112
113
114 #define NET_DEFAULT_V4_ADDR "239.192.74.66"
115 #define NET_DEFAULT_V6_ADDR "ff18::efc0:4a42"
116 #define NET_DEFAULT_PORT    25826
117
118 #define TYPE_HOST            0x0000
119 #define TYPE_TIME            0x0001
120 #define TYPE_PLUGIN          0x0002
121 #define TYPE_PLUGIN_INSTANCE 0x0003
122 #define TYPE_TYPE            0x0004
123 #define TYPE_TYPE_INSTANCE   0x0005
124 #define TYPE_VALUES          0x0006
125 #define TYPE_INTERVAL        0x0007
126
127 /* Types to transmit notifications */
128 #define TYPE_MESSAGE         0x0100
129 #define TYPE_SEVERITY        0x0101
130
131 #define TYPE_SIGN_SHA256     0x0200
132 #define TYPE_ENCR_AES256     0x0210
133
134 #define DATA_MAX_NAME_LEN 64
135
136 #define NOTIF_MAX_MSG_LEN 256
137 #define NOTIF_FAILURE 1
138 #define NOTIF_WARNING 2
139 #define NOTIF_OKAY    4
140
141 #define sfree(ptr) \
142   do { \
143     if((ptr) != NULL) { \
144       free(ptr); \
145     } \
146     (ptr) = NULL; \
147   } while (0)
148
149 #define sstrncpy(a, b, c) strncpy(a, b, c)
150
151 #define SECURITY_LEVEL_NONE     0
152 #define SECURITY_LEVEL_SIGN    1
153 #define SECURITY_LEVEL_ENCRYPT 2
154
155 #define DS_TYPE_COUNTER  0
156 #define DS_TYPE_GAUGE    1
157 #define DS_TYPE_DERIVE   2
158 #define DS_TYPE_ABSOLUTE 3
159
160 #define DS_TYPE_TO_STRING(t) (t == DS_TYPE_COUNTER)     ? "counter"  : \
161         (t == DS_TYPE_GAUGE)    ? "gauge"    : \
162         (t == DS_TYPE_DERIVE)   ? "derive"   : \
163         (t == DS_TYPE_ABSOLUTE) ? "absolute" : \
164         "unknown"
165
166 #define BUFF_SIZE 1024
167
168 typedef unsigned long long counter_t;
169 typedef double gauge_t;
170 typedef int64_t derive_t;
171 typedef uint64_t absolute_t;
172 int  interval_g;
173
174 union value_u
175 {
176   counter_t  counter;
177   gauge_t    gauge;
178   derive_t   derive;
179   absolute_t absolute;
180 };
181 typedef union value_u value_t;
182
183 struct value_list_s
184 {
185   value_t *values;
186   int      values_len;
187   time_t   time;
188   int      interval;
189   char     host[DATA_MAX_NAME_LEN];
190   char     plugin[DATA_MAX_NAME_LEN];
191   char     plugin_instance[DATA_MAX_NAME_LEN];
192   char     type[DATA_MAX_NAME_LEN];
193   char     type_instance[DATA_MAX_NAME_LEN];
194   meta_data_t *meta;
195   uint8_t  *types;
196 };
197 typedef struct value_list_s value_list_t;
198
199
200 enum notification_meta_type_e
201 {
202   NM_TYPE_STRING,
203   NM_TYPE_SIGNED_INT,
204   NM_TYPE_UNSIGNED_INT,
205   NM_TYPE_DOUBLE,
206   NM_TYPE_BOOLEAN
207 };
208
209 typedef struct notification_meta_s
210 {
211   char name[DATA_MAX_NAME_LEN];
212   enum notification_meta_type_e type;
213   union
214   {
215     const char *nm_string;
216     int64_t nm_signed_int;
217     uint64_t nm_unsigned_int;
218     double nm_double;
219     bool nm_boolean;
220   } nm_value;
221   struct notification_meta_s *next;
222 } notification_meta_t;
223
224 typedef struct notification_s
225 {
226   int    severity;
227   time_t time;
228   char   message[NOTIF_MAX_MSG_LEN];
229   char   host[DATA_MAX_NAME_LEN];
230   char   plugin[DATA_MAX_NAME_LEN];
231   char   plugin_instance[DATA_MAX_NAME_LEN];
232   char   type[DATA_MAX_NAME_LEN];
233   char   type_instance[DATA_MAX_NAME_LEN];
234   notification_meta_t *meta;
235 } notification_t;
236
237
238
239 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
240  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
241  * +-------+-----------------------+-------------------------------+
242  * ! Ver.  !                       ! Length                        !
243  * +-------+-----------------------+-------------------------------+
244  */
245 struct part_header_s
246 {
247   uint16_t type;
248   uint16_t length;
249 };
250 typedef struct part_header_s part_header_t;
251
252 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
253  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
254  * +-------------------------------+-------------------------------+
255  * ! Type                          ! Length                        !
256  * +-------------------------------+-------------------------------+
257  * : (Length - 4) Bytes                                            :
258  * +---------------------------------------------------------------+
259  */
260 struct part_string_s
261 {
262   part_header_t *head;
263   char *value;
264 };
265 typedef struct part_string_s part_string_t;
266
267 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
268  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
269  * +-------------------------------+-------------------------------+
270  * ! Type                          ! Length                        !
271  * +-------------------------------+-------------------------------+
272  * : (Length - 4 == 2 || 4 || 8) Bytes                             :
273  * +---------------------------------------------------------------+
274  */
275 struct part_number_s
276 {
277   part_header_t *head;
278   uint64_t *value;
279 };
280 typedef struct part_number_s part_number_t;
281
282 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
283  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
284  * +-------------------------------+-------------------------------+
285  * ! Type                          ! Length                        !
286  * +-------------------------------+---------------+---------------+
287  * ! Num of values                 ! Type0         ! Type1         !
288  * +-------------------------------+---------------+---------------+
289  * ! Value0                                                        !
290  * !                                                               !
291  * +---------------------------------------------------------------+
292  * ! Value1                                                        !
293  * !                                                               !
294  * +---------------------------------------------------------------+
295  */
296 struct part_values_s
297 {
298   part_header_t *head;
299   uint16_t *num_values;
300   uint8_t  *values_types;
301   value_t  *values;
302 };
303 typedef struct part_values_s part_values_t;
304
305 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
306  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
307  * +-------------------------------+-------------------------------+
308  * ! Type                          ! Length                        !
309  * +-------------------------------+-------------------------------+
310  * ! Hash (Bits   0 -  31)                                         !
311  * : :                                                             :
312  * ! Hash (Bits 224 - 255)                                         !
313  * +---------------------------------------------------------------+
314  */
315 /* Minimum size */
316 #define PART_SIGNATURE_SHA256_SIZE 36
317 struct part_signature_sha256_s
318 {
319   part_header_t head;
320   unsigned char hash[32];
321   char *username;
322 };
323 typedef struct part_signature_sha256_s part_signature_sha256_t;
324
325 /*                      1 1 1 1 1 1 1 1 1 1 2 2 2 2 2 2 2 2 2 2 3 3
326  *  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
327  * +-------------------------------+-------------------------------+
328  * ! Type                          ! Length                        !
329  * +-------------------------------+-------------------------------+
330  * ! Original length               ! Padding (0 - 15 bytes)        !
331  * +-------------------------------+-------------------------------+
332  * ! Hash (Bits   0 -  31)                                         !
333  * : :                                                             :
334  * ! Hash (Bits 128 - 159)                                         !
335  * +---------------------------------------------------------------+
336  */
337 /* Minimum size */
338 #define PART_ENCRYPTION_AES256_SIZE 42
339 struct part_encryption_aes256_s
340 {
341   part_header_t head;
342   uint16_t username_length;
343   char *username;
344   unsigned char iv[16];
345   /* <encrypted> */
346   unsigned char hash[20];
347   /*   <payload /> */
348   /* </encrypted> */
349 };
350 typedef struct part_encryption_aes256_s part_encryption_aes256_t;
351
352 struct receive_list_entry_s
353 {
354   char data[BUFF_SIZE];
355   int  data_len;
356   int  fd;
357   struct receive_list_entry_s *next;
358 };
359 typedef struct receive_list_entry_s receive_list_entry_t;
360
361
362 /**
363  *
364  *  END Collectd Structs
365  *
366  */
367
368 static u_int64_t collectd_ntohll (u_int64_t n)
369 {
370 #if BYTE_ORDER == BIG_ENDIAN
371   return (n);
372 #else
373   u_int64_t retval;
374   retval = ((uint64_t) ntohl(n & 0xFFFFFFFFLLU)) << 32;
375   retval |= ntohl((n & 0xFFFFFFFF00000000LLU) >> 32);
376   return retval;
377 #endif
378 } /* u_int64_t collectd_ntohll */
379
380 #define ntohd(d) (d)
381
382 static noit_boolean
383 noit_collects_check_aynsch(noit_module_t *self,
384                            noit_check_t *check) {
385   const char *config_val;
386   collectd_mod_config_t *conf = noit_module_get_userdata(self);
387   noit_boolean is_asynch = conf->asynch_metrics;
388   if(noit_hash_retr_str(check->config,
389                         "asynch_metrics", strlen("asynch_metrics"),
390                         (const char **)&config_val)) {
391     if(!strcasecmp(config_val, "true") || !strcasecmp(config_val, "on"))
392       is_asynch = noit_true;
393   }
394
395   if(is_asynch) check->flags |= NP_SUPPRESS_METRICS;
396   else check->flags &= ~NP_SUPPRESS_METRICS;
397   return is_asynch;
398 }
399
400 /* Forward declare this so the parse function can use it */
401 static int queue_values(collectd_closure_t *ccl,
402   noit_module_t *self, noit_check_t *check, value_list_t *vl);
403 static int queue_notifications(collectd_closure_t *ccl,
404   noit_module_t *self, noit_check_t *check, notification_t *n);
405
406
407 static EVP_CIPHER_CTX* network_get_aes256_cypher (collectd_closure_t *ccl, /* {{{ */
408     const void *iv, size_t iv_size, const char *username)
409 {
410
411   if (ccl->secret == NULL)
412     return (NULL);
413   else
414   {
415     EVP_CIPHER_CTX *ctx_ptr;
416     EVP_MD_CTX ctx_md;
417     unsigned char password_hash[32];
418     unsigned int length = 0;
419     int success;
420
421     ctx_ptr = &ccl->ctx;
422
423     EVP_DigestInit(&ctx_md, EVP_sha256());
424     EVP_DigestUpdate(&ctx_md, ccl->secret, strlen(ccl->secret));
425     EVP_DigestFinal(&ctx_md, password_hash, &length);
426     EVP_MD_CTX_cleanup(&ctx_md);
427
428     assert(length <= 32);
429
430     success = EVP_DecryptInit(ctx_ptr, EVP_aes_256_ofb(), password_hash, iv);
431     if (success != 1)
432     {
433       noitL(noit_error, "collectd: EVP_DecryptInit returned: %d\n",
434           success);
435       return (NULL);
436     }
437     return (ctx_ptr);
438   }
439 } /* }}} int network_get_aes256_cypher */
440
441 static int parse_part_values (void **ret_buffer, size_t *ret_buffer_len,
442     value_t **ret_values, uint8_t **ret_types, int *ret_num_values)
443 {
444   char *buffer = *ret_buffer;
445   size_t buffer_len = *ret_buffer_len;
446
447   uint16_t tmp16;
448   size_t exp_size;
449   int   i;
450
451   uint16_t pkg_length;
452   uint16_t pkg_type;
453   uint16_t pkg_numval;
454
455   uint8_t *pkg_types;
456   value_t *pkg_values;
457
458   if (buffer_len < 15)
459   {
460     noitL(noit_error,"collectd: packet is too short: "
461         "buffer_len = %zu\n", buffer_len);
462     return (-1);
463   }
464
465   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
466   buffer += sizeof (tmp16);
467   pkg_type = ntohs (tmp16);
468
469   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
470   buffer += sizeof (tmp16);
471   pkg_length = ntohs (tmp16);
472
473   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
474   buffer += sizeof (tmp16);
475   pkg_numval = ntohs (tmp16);
476
477   assert (pkg_type == TYPE_VALUES);
478
479   exp_size = 3 * sizeof (uint16_t)
480     + pkg_numval * (sizeof (uint8_t) + sizeof (value_t));
481   if ((buffer_len < 0) || (buffer_len < exp_size))
482   {
483     noitL(noit_error, "collectd: parse_part_values: "
484         "Packet too short: "
485         "Chunk of size %zu expected, "
486         "but buffer has only %zu bytes left.\n",
487         exp_size, buffer_len);
488     return (-1);
489   }
490
491   if (pkg_length != exp_size)
492   {
493     noitL(noit_debug, "collectd: parse_part_values: "
494         "Length and number of values "
495         "in the packet don't match.\n");
496     return (-1);
497   }
498
499   pkg_types = (uint8_t *) malloc (pkg_numval * sizeof (uint8_t));
500   pkg_values = (value_t *) malloc (pkg_numval * sizeof (value_t));
501   if ((pkg_types == NULL) || (pkg_values == NULL))
502   {
503     sfree (pkg_types);
504     sfree (pkg_values);
505     noitL(noit_error, "collectd: parse_part_values: malloc failed.\n");
506     return (-1);
507   }
508
509   memcpy ((void *) pkg_types, (void *) buffer, pkg_numval * sizeof (uint8_t));
510   buffer += pkg_numval * sizeof (uint8_t);
511   memcpy ((void *) pkg_values, (void *) buffer, pkg_numval * sizeof (value_t));
512   buffer += pkg_numval * sizeof (value_t);
513
514   for (i = 0; i < pkg_numval; i++)
515   {
516     switch (pkg_types[i])
517     {
518       case DS_TYPE_COUNTER:
519         pkg_values[i].counter = (counter_t) collectd_ntohll (pkg_values[i].counter);
520         break;
521
522       case DS_TYPE_GAUGE:
523         pkg_values[i].gauge = (gauge_t) ntohd (pkg_values[i].gauge);
524         break;
525
526       case DS_TYPE_DERIVE:
527         pkg_values[i].derive = (derive_t) collectd_ntohll (pkg_values[i].derive);
528         break;
529
530       case DS_TYPE_ABSOLUTE:
531         pkg_values[i].absolute = (absolute_t) collectd_ntohll (pkg_values[i].absolute);
532         break;
533
534       default:
535         noitL(noit_debug, "collectd: parse_part_values: "
536       "Don't know how to handle data source type %"PRIu8 "\n",
537       pkg_types[i]);
538         sfree (pkg_types);
539         sfree (pkg_values);
540         return (-1);
541     } /* switch (pkg_types[i]) */
542   }
543
544   *ret_buffer     = buffer;
545   *ret_buffer_len = buffer_len - pkg_length;
546   *ret_num_values = pkg_numval;
547   *ret_types      = pkg_types;
548   *ret_values     = pkg_values;
549
550
551   return (0);
552 } /* int parse_part_values */
553
554 static int parse_part_number (void **ret_buffer, size_t *ret_buffer_len,
555     uint64_t *value)
556 {
557   char *buffer = *ret_buffer;
558   size_t buffer_len = *ret_buffer_len;
559
560   uint16_t tmp16;
561   uint64_t tmp64;
562   size_t exp_size = 2 * sizeof (uint16_t) + sizeof (uint64_t);
563
564   uint16_t pkg_length;
565
566   if ((buffer_len < 0) || ((size_t) buffer_len < exp_size))
567   {
568     noitL(noit_error, "collectd: parse_part_number: "
569         "Packet too short: "
570         "Chunk of size %zu expected, "
571         "but buffer has only %zu bytes left.\n",
572         exp_size, buffer_len);
573     return (-1);
574   }
575
576   /* skip pkg_type */
577   buffer += sizeof (tmp16);
578
579   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
580   buffer += sizeof (tmp16);
581   pkg_length = ntohs (tmp16);
582
583   memcpy ((void *) &tmp64, buffer, sizeof (tmp64));
584   buffer += sizeof (tmp64);
585   *value = collectd_ntohll (tmp64);
586
587   *ret_buffer = buffer;
588   *ret_buffer_len = buffer_len - pkg_length;
589
590   return (0);
591 } /* int parse_part_number */
592
593 static int parse_part_string (void **ret_buffer, size_t *ret_buffer_len,
594     char *output, int output_len)
595 {
596   char *buffer = *ret_buffer;
597   size_t buffer_len = *ret_buffer_len;
598
599   uint16_t tmp16;
600   size_t header_size = 2 * sizeof (uint16_t);
601
602   uint16_t pkg_length;
603
604   if ((buffer_len < 0) || (buffer_len < header_size))
605   {
606     noitL(noit_error, "collectd: parse_part_string: "
607         "Packet too short: "
608         "Chunk of at least size %zu expected, "
609         "but buffer has only %zu bytes left.\n",
610         header_size, buffer_len);
611     return (-1);
612   }
613
614   /* skip pkg_type */
615   buffer += sizeof (tmp16);
616
617   memcpy ((void *) &tmp16, buffer, sizeof (tmp16));
618   buffer += sizeof (tmp16);
619   pkg_length = ntohs (tmp16);
620
621   /* Check that packet fits in the input buffer */
622   if (pkg_length > buffer_len)
623   {
624     noitL(noit_error, "collectd: parse_part_string: "
625         "Packet too big: "
626         "Chunk of size %"PRIu16" received, "
627         "but buffer has only %zu bytes left.\n",
628         pkg_length, buffer_len);
629     return (-1);
630   }
631
632   /* Check that pkg_length is in the valid range */
633   if (pkg_length <= header_size)
634   {
635     noitL(noit_error, "collectd: parse_part_string: "
636         "Packet too short: "
637         "Header claims this packet is only %hu "
638         "bytes long.\n", pkg_length);
639     return (-1);
640   }
641
642   /* Check that the package data fits into the output buffer.
643    * The previous if-statement ensures that:
644    * `pkg_length > header_size' */
645   if ((output_len < 0)
646       || ((size_t) output_len < ((size_t) pkg_length - header_size)))
647   {
648     noitL(noit_error, "collectd: parse_part_string: "
649         "Output buffer too small.\n");
650     return (-1);
651   }
652
653   /* All sanity checks successfull, let's copy the data over */
654   output_len = pkg_length - header_size;
655   memcpy ((void *) output, (void *) buffer, output_len);
656   buffer += output_len;
657
658   /* For some very weird reason '\0' doesn't do the trick on SPARC in
659    * this statement. */
660   if (output[output_len - 1] != 0)
661   {
662     noitL(noit_error, "collectd: parse_part_string: "
663         "Received string does not end "
664         "with a NULL-byte.\n");
665     return (-1);
666   }
667
668   *ret_buffer = buffer;
669   *ret_buffer_len = buffer_len - pkg_length;
670
671   return (0);
672 } /* int parse_part_string */
673
674
675 #define PP_SIGNED    0x01
676 #define PP_ENCRYPTED 0x02
677
678 #define BUFFER_READ(p,s) do { \
679   memcpy ((p), buffer + buffer_offset, (s)); \
680   buffer_offset += (s); \
681 } while (0)
682
683
684
685 // Forward declare
686 static int parse_packet (
687     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
688     void *buffer, size_t buffer_size, int flags);
689
690
691 static int parse_part_sign_sha256 (collectd_closure_t *ccl, noit_module_t *self,
692     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len, int flags)
693 {
694   unsigned char *buffer;
695   size_t buffer_len;
696   size_t buffer_offset;
697
698   size_t username_len;
699   part_signature_sha256_t pss;
700   uint16_t pss_head_length;
701   unsigned char hash[sizeof (pss.hash)];
702
703   unsigned char *hash_ptr;
704   unsigned int length;
705
706   buffer = *ret_buffer;
707   buffer_len = *ret_buffer_len;
708   buffer_offset = 0;
709
710   if (ccl->username == NULL)
711   {
712     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
713         "it because no user has been configured. Will accept it.\n");
714     return (0);
715   }
716
717   if (ccl->secret == NULL)
718   {
719     noitL(noit_debug, "collectd: Received signed network packet but can't verify "
720         "it because no secret has been configured. Will accept it.\n");
721     return (0);
722   }
723
724   /* Check if the buffer has enough data for this structure. */
725   if (buffer_len <= PART_SIGNATURE_SHA256_SIZE)
726     return (-ENOMEM);
727
728   /* Read type and length header */
729   BUFFER_READ (&pss.head.type, sizeof (pss.head.type));
730   BUFFER_READ (&pss.head.length, sizeof (pss.head.length));
731   pss_head_length = ntohs (pss.head.length);
732
733   /* Check if the `pss_head_length' is within bounds. */
734   if ((pss_head_length <= PART_SIGNATURE_SHA256_SIZE)
735       || (pss_head_length > buffer_len))
736   {
737     noitL(noit_error, "collectd: HMAC-SHA-256 with invalid length received.\n");
738     return (-1);
739   }
740
741   /* Copy the hash. */
742   BUFFER_READ (pss.hash, sizeof (pss.hash));
743
744   /* Calculate username length (without null byte) and allocate memory */
745   username_len = pss_head_length - PART_SIGNATURE_SHA256_SIZE;
746   pss.username = malloc (username_len + 1);
747   if (pss.username == NULL)
748     return (-ENOMEM);
749
750   /* Read the username */
751   BUFFER_READ (pss.username, username_len);
752   pss.username[username_len] = 0;
753
754   assert (buffer_offset == pss_head_length);
755
756   /* Match up the username with the expected username */
757   if (strcmp(ccl->username, pss.username) != 0)
758   {
759     noitL(noit_error, "collectd: User: %s and Given User: %s don't match\n", ccl->username, pss.username);
760     sfree (pss.username);
761     return (-ENOENT);
762   }
763
764   /* Create a hash device and check the HMAC */
765   hash_ptr = HMAC(EVP_sha256(), ccl->secret, strlen(ccl->secret),
766       buffer     + PART_SIGNATURE_SHA256_SIZE,
767       buffer_len - PART_SIGNATURE_SHA256_SIZE,
768       hash,         &length);
769   if (hash_ptr == NULL)
770   {
771     noitL(noit_error, "collectd: Creating HMAC-SHA-256 object failed.\n");
772     sfree (pss.username);
773     return (-1);
774   }
775
776   /* Clean up */
777   sfree (pss.username);
778
779   if (memcmp (pss.hash, hash, sizeof (pss.hash)) != 0)
780   {
781     noitL(noit_error, "collectd: Verifying HMAC-SHA-256 signature failed: "
782         "Hash mismatch.\n");
783   }
784   else
785   {
786     parse_packet (ccl, self, check, buffer + buffer_offset, buffer_len - buffer_offset,
787         flags | PP_SIGNED);
788   }
789
790   *ret_buffer = buffer + buffer_len;
791   *ret_buffer_len = 0;
792
793   return (0);
794 } /* }}} int parse_part_sign_sha256 */
795 /* #endif HAVE_LIBGCRYPT */
796
797 static int parse_part_encr_aes256 (collectd_closure_t *ccl, noit_module_t *self,
798     noit_check_t *check, void **ret_buffer, size_t *ret_buffer_len,
799     int flags)
800 {
801   unsigned char  *buffer = *ret_buffer;
802   size_t buffer_len = *ret_buffer_len;
803   size_t payload_len;
804   size_t part_size;
805   size_t buffer_offset;
806   int    tmpbufsize;
807   unsigned char   *tmpbuf;
808   uint16_t username_len;
809   part_encryption_aes256_t pea;
810   unsigned char hash[sizeof (pea.hash)];
811   unsigned int hash_length;
812
813   EVP_CIPHER_CTX *ctx;
814   EVP_MD_CTX ctx_md;
815   int err;
816
817   /* Make sure at least the header if available. */
818   if (buffer_len <= PART_ENCRYPTION_AES256_SIZE)
819   {
820     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
821         "Discarding short packet.\n");
822     return (-1);
823   }
824
825   buffer_offset = 0;
826
827   /* Copy the unencrypted information into `pea'. */
828   BUFFER_READ (&pea.head.type, sizeof (pea.head.type));
829   BUFFER_READ (&pea.head.length, sizeof (pea.head.length));
830
831   /* Check the `part size'. */
832   part_size = ntohs (pea.head.length);
833   if ((part_size <= PART_ENCRYPTION_AES256_SIZE)
834       || (part_size > buffer_len))
835   {
836     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
837         "Discarding part with invalid size.\n");
838     return (-1);
839   }
840
841   /* Read the username */
842   BUFFER_READ (&username_len, sizeof (username_len));
843   username_len = ntohs (username_len);
844
845   if ((username_len <= 0)
846       || (username_len > (part_size - (PART_ENCRYPTION_AES256_SIZE + 1))))
847   {
848     noitL(noit_debug, "collectd: parse_part_encr_aes256: "
849         "Discarding part with invalid username length.\n");
850     return (-1);
851   }
852
853   assert (username_len > 0);
854   pea.username = malloc (username_len + 1);
855   if (pea.username == NULL)
856     return (-ENOMEM);
857   BUFFER_READ (pea.username, username_len);
858   pea.username[username_len] = 0;
859
860   /* Last but not least, the initialization vector */
861   BUFFER_READ (pea.iv, sizeof (pea.iv));
862
863   /* Make sure we are at the right position */
864   assert (buffer_offset == (username_len +
865         PART_ENCRYPTION_AES256_SIZE - sizeof (pea.hash)));
866
867   /* Match up the username with the expected username */
868   if (strcmp(ccl->username, pea.username) != 0)
869   {
870     noitL(noit_error, "collectd: Username received and server side username don't match\n");
871     sfree (pea.username);
872     return (-ENOENT);
873   }
874
875   ctx = network_get_aes256_cypher (ccl, pea.iv, sizeof (pea.iv),
876       pea.username);
877   if (ctx == NULL)
878     return (-1);
879
880   payload_len = part_size - (PART_ENCRYPTION_AES256_SIZE + username_len);
881   assert (payload_len > 0);
882   tmpbuf = malloc(part_size - buffer_offset);
883
884   /* Decrypt the packet */
885   err = EVP_DecryptUpdate(ctx,
886       tmpbuf, &tmpbufsize,
887       buffer    + buffer_offset,
888       part_size - buffer_offset);
889   if (err != 1)
890   {
891     noitL(noit_error, "collectd: openssl returned: %d\n", err);
892     return (-1);
893   }
894
895   assert(part_size - buffer_offset == tmpbufsize);
896   /* Make it appear to be in place */
897   memcpy(buffer + buffer_offset, tmpbuf, part_size - buffer_offset);
898   sfree(tmpbuf);
899
900   /* Read the hash */
901   BUFFER_READ (pea.hash, sizeof (pea.hash));
902
903   /* Make sure we're at the right position - again */
904   assert (buffer_offset == (username_len + PART_ENCRYPTION_AES256_SIZE));
905   assert (buffer_offset == (part_size - payload_len));
906
907   /* Check hash sum */
908   memset (hash, 0, sizeof (hash));
909   EVP_DigestInit(&ctx_md, EVP_sha1());
910   EVP_DigestUpdate(&ctx_md, buffer + buffer_offset, payload_len);
911   EVP_DigestFinal(&ctx_md, hash, &hash_length);
912   if (memcmp (hash, pea.hash, sizeof (hash)) != 0)
913   {
914     noitL(noit_error, "collectd: Decryption failed: Checksum mismatch.\n");
915     return (-1);
916   }
917
918   parse_packet (ccl, self, check, buffer + buffer_offset, payload_len,
919       flags | PP_ENCRYPTED);
920
921   /* Update return values */
922   *ret_buffer =     buffer     + part_size;
923   *ret_buffer_len = buffer_len - part_size;
924   sfree(pea.username);
925
926   return (0);
927 } /* }}} int parse_part_encr_aes256 */
928
929
930 #undef BUFFER_READ
931
932
933 static int parse_packet (/* {{{ */
934     collectd_closure_t *ccl, noit_module_t *self, noit_check_t *check,
935     void *buffer, size_t buffer_size, int flags)
936 {
937   int status;
938
939   int packet_was_signed = (flags & PP_SIGNED);
940   int packet_was_encrypted = (flags & PP_ENCRYPTED);
941   int printed_ignore_warning = 0;
942
943 #define VALUE_LIST_INIT { NULL, 0, 0, interval_g, "localhost", "", "", "", "", NULL }
944   value_list_t vl = VALUE_LIST_INIT;
945   notification_t n;
946
947   memset (&vl, '\0', sizeof (vl));
948   memset (&n, '\0', sizeof (n));
949   status = 0;
950
951   while ((status == 0) && (0 < buffer_size)
952       && ((unsigned int) buffer_size > sizeof (part_header_t)))
953   {
954     uint16_t pkg_length;
955     uint16_t pkg_type;
956
957     memcpy ((void *) &pkg_type,
958         (void *) buffer,
959         sizeof (pkg_type));
960     memcpy ((void *) &pkg_length,
961         (void *) ((char *)buffer + sizeof (pkg_type)),
962         sizeof (pkg_length));
963
964     pkg_length = ntohs (pkg_length);
965     pkg_type = ntohs (pkg_type);
966
967     if (pkg_length > buffer_size)
968       break;
969     /* Ensure that this loop terminates eventually */
970     if (pkg_length < (2 * sizeof (uint16_t)))
971       break;
972
973
974     if (pkg_type == TYPE_ENCR_AES256)
975     {
976       status = parse_part_encr_aes256 (ccl, self, check,
977           &buffer, &buffer_size, flags);
978       if (status != 0)
979       {
980         noitL(noit_error, "collectd: Decrypting AES256 "
981             "part failed "
982             "with status %i.\n", status);
983         break;
984       }
985     }
986     else if ((ccl->security_level == SECURITY_LEVEL_ENCRYPT)
987         && (packet_was_encrypted == 0))
988     {
989       if (printed_ignore_warning == 0)
990       {
991         noitL(noit_debug, "collectd: Unencrypted packet or "
992             "part has been ignored.\n");
993         printed_ignore_warning = 1;
994       }
995       buffer = ((char *) buffer) + pkg_length;
996       continue;
997     }
998     else if (pkg_type == TYPE_SIGN_SHA256)
999     {
1000       status = parse_part_sign_sha256 (ccl, self, check,
1001                                         &buffer, &buffer_size, flags);
1002       if (status != 0)
1003       {
1004         noitL(noit_error, "collectd: Verifying HMAC-SHA-256 "
1005             "signature failed "
1006             "with status %i.\n", status);
1007         break;
1008       }
1009     }
1010     else if ((ccl->security_level == SECURITY_LEVEL_SIGN)
1011         && (packet_was_encrypted == 0)
1012         && (packet_was_signed == 0))
1013     {
1014       if (printed_ignore_warning == 0)
1015       {
1016         noitL(noit_debug, "collectd: Unsigned packet or "
1017             "part has been ignored.\n");
1018         printed_ignore_warning = 1;
1019       }
1020       buffer = ((char *) buffer) + pkg_length;
1021       continue;
1022     }
1023     else if (pkg_type == TYPE_VALUES)
1024     {
1025       status = parse_part_values (&buffer, &buffer_size,
1026           &vl.values, &vl.types, &vl.values_len);
1027
1028       if (status != 0)
1029         break;
1030
1031       if ((strlen (vl.host) > 0) &&
1032           (strlen (vl.plugin) > 0) &&
1033           (strlen (vl.type) > 0))
1034       {
1035         queue_values(ccl, self, check, &vl);
1036       }
1037       else
1038       {
1039         noitL(noit_error,
1040               "collectd: NOT dispatching values [%lld,%s:%s:%s]\n",
1041               (long long int)vl.time, vl.host, vl.plugin, vl.type);
1042       }
1043
1044       sfree (vl.values);
1045       sfree (vl.types);
1046     }
1047     else if (pkg_type == TYPE_TIME)
1048     {
1049       uint64_t tmp = 0;
1050       status = parse_part_number (&buffer, &buffer_size, &tmp);
1051       if (status == 0)
1052       {
1053         vl.time = (time_t) tmp;
1054         n.time = (time_t) tmp;
1055       }
1056     }
1057     else if (pkg_type == TYPE_INTERVAL)
1058     {
1059       uint64_t tmp = 0;
1060       status = parse_part_number (&buffer, &buffer_size,
1061           &tmp);
1062       if (status == 0)
1063         vl.interval = (int) tmp;
1064     }
1065     else if (pkg_type == TYPE_HOST)
1066     {
1067       status = parse_part_string (&buffer, &buffer_size,
1068           vl.host, sizeof (vl.host));
1069       if (status == 0)
1070         sstrncpy (n.host, vl.host, sizeof (n.host));
1071     }
1072     else if (pkg_type == TYPE_PLUGIN)
1073     {
1074       status = parse_part_string (&buffer, &buffer_size,
1075           vl.plugin, sizeof (vl.plugin));
1076       if (status == 0)
1077         sstrncpy (n.plugin, vl.plugin,
1078             sizeof (n.plugin));
1079     }
1080     else if (pkg_type == TYPE_PLUGIN_INSTANCE)
1081     {
1082       status = parse_part_string (&buffer, &buffer_size,
1083           vl.plugin_instance,
1084           sizeof (vl.plugin_instance));
1085       if (status == 0)
1086         sstrncpy (n.plugin_instance,
1087             vl.plugin_instance,
1088             sizeof (n.plugin_instance));
1089     }
1090     else if (pkg_type == TYPE_TYPE)
1091     {
1092       status = parse_part_string (&buffer, &buffer_size,
1093           vl.type, sizeof (vl.type));
1094       if (status == 0)
1095         sstrncpy (n.type, vl.type, sizeof (n.type));
1096     }
1097     else if (pkg_type == TYPE_TYPE_INSTANCE)
1098     {
1099       status = parse_part_string (&buffer, &buffer_size,
1100           vl.type_instance,
1101           sizeof (vl.type_instance));
1102       if (status == 0)
1103         sstrncpy (n.type_instance, vl.type_instance,
1104             sizeof (n.type_instance));
1105     }
1106     else if (pkg_type == TYPE_MESSAGE)
1107     {
1108       status = parse_part_string (&buffer, &buffer_size,
1109           n.message, sizeof (n.message));
1110
1111       if (status != 0)
1112       {
1113         /* do nothing */
1114       }
1115       else if ((n.severity != NOTIF_FAILURE)
1116           && (n.severity != NOTIF_WARNING)
1117           && (n.severity != NOTIF_OKAY))
1118       {
1119         noitL(noit_error, "collectd: "
1120             "Ignoring notification with "
1121             "unknown severity %i.\n",
1122             n.severity);
1123       }
1124 /* Time proves untrustworthy... and we don't use it.
1125       else if (n.time <= 0)
1126       {
1127         noitL(noit_error, "collectd: "
1128             "Ignoring notification with "
1129             "time == 0.\n");
1130       }
1131 */
1132       else if (strlen (n.message) <= 0)
1133       {
1134         noitL(noit_error, "collectd: "
1135             "Ignoring notification with "
1136             "an empty message.\n");
1137       }
1138       else
1139       {
1140         queue_notifications(ccl, self, check, &n);
1141         noitL(noit_error, "collectd: "
1142             "DISPATCH NOTIFICATION\n");
1143       }
1144     }
1145     else if (pkg_type == TYPE_SEVERITY)
1146     {
1147       uint64_t tmp = 0;
1148       status = parse_part_number (&buffer, &buffer_size,
1149           &tmp);
1150       if (status == 0)
1151         n.severity = (int) tmp;
1152     }
1153     else
1154     {
1155       noitL(noit_error, "collectd: parse_packet: Unknown part"
1156           " type: 0x%04hx\n", pkg_type);
1157       buffer = ((char *) buffer) + pkg_length;
1158     }
1159   } /* while (buffer_size > sizeof (part_header_t)) */
1160
1161   return (status);
1162 } /* }}} int parse_packet */
1163
1164
1165 // Not proud of this at all however; I am not sure best how to address this.
1166 static int infer_type(char *buffer, int buffer_len, value_list_t *vl, int index) {
1167   int len = strlen(buffer);
1168   strcat(buffer, "`");
1169   if (cmp_type(vl, "load", "load")) {
1170     assert(vl->values_len == 3);
1171     switch (index) {
1172       case 0:
1173         strcat(buffer, "1min"); break;
1174       case 1:
1175         strcat(buffer, "5min"); break;
1176       case 2:
1177         strcat(buffer, "15min"); break;
1178     }
1179   } else if (cmp_plugin(vl, "interface")) {
1180     assert(vl->values_len == 2);
1181     switch (index) {
1182       case 0:
1183         strcat(buffer, "rx"); break;
1184       case 1:
1185         strcat(buffer, "tx"); break;
1186     }
1187   } else {
1188     char buf[20];
1189     snprintf(buf, sizeof(buf), "%d", index);
1190     strcat(buffer, buf);
1191     noitL(noit_debug, "collectd: parsing multiple values"
1192         " and guessing on the type for plugin[%s] and type[%s]"
1193         , vl->plugin, vl->type);
1194   }
1195   return len;
1196 }
1197
1198 static void concat_metrics(char *buffer, char* plugin, char* plugin_inst, char* type, char* type_inst) {
1199   strcpy(buffer, plugin);
1200
1201   if (strlen(plugin_inst)) {
1202     strcat(buffer, "`");
1203     strcat(buffer, plugin_inst);
1204   }
1205   if (strlen(type)) {
1206     strcat(buffer, "`");
1207     strcat(buffer, type);
1208   }
1209   if (strlen(type_inst)) {
1210     strcat(buffer, "`");
1211     strcat(buffer, type_inst);
1212   }
1213 }
1214
1215 static int queue_notifications(collectd_closure_t *ccl,
1216       noit_module_t *self, noit_check_t *check, notification_t *n) {
1217   stats_t current;
1218   noit_boolean immediate;
1219   char buffer[DATA_MAX_NAME_LEN*4 + 128];
1220   collectd_mod_config_t *conf;
1221   conf = noit_module_get_userdata(self);
1222
1223   if(!conf->support_notifications) return 0;
1224   /* We are passive, so we don't do anything for transient checks */
1225   if(check->flags & NP_TRANSIENT) return 0;
1226
1227   noit_check_stats_clear(&current);
1228   gettimeofday(&current.whence, NULL);
1229
1230   // Concat all the names together so they fit into the flat noitd model
1231   concat_metrics(buffer, n->plugin, n->plugin_instance, n->type, n->type_instance);
1232   noit_stats_set_metric(&ccl->current, buffer, METRIC_STRING, n->message);
1233   immediate = noit_collects_check_aynsch(self,check);
1234   if(immediate)
1235     noit_stats_log_immediate_metric(check, buffer, METRIC_STRING, n->message);
1236   noit_check_passive_set_stats(self, check, &current);
1237   noitL(nldeb, "collectd: dispatch_notifications(%s, %s, %s)\n",check->target, buffer, n->message);
1238   return 0;
1239 }
1240
1241
1242 static int queue_values(collectd_closure_t *ccl,
1243       noit_module_t *self, noit_check_t *check, value_list_t *vl) {
1244   noit_boolean immediate;
1245   char buffer[DATA_MAX_NAME_LEN*4 + 4 + 1 + 20];
1246   int i, len = 0;
1247
1248   // Concat all the names together so they fit into the flat noitd model
1249   immediate = noit_collects_check_aynsch(self,check);
1250   concat_metrics(buffer, vl->plugin, vl->plugin_instance, vl->type, vl->type_instance);
1251   for (i=0; i < vl->values_len; i++) {
1252  
1253     // Only infer the type if the amount of values is greater than one
1254     if (vl->values_len > 1) {
1255       // Trunc the string 
1256       if (len > 0)
1257         buffer[len] = 0;
1258       len = infer_type(buffer, sizeof(buffer), vl, i);
1259     }
1260
1261     switch (vl->types[i])
1262     {
1263       case DS_TYPE_COUNTER:
1264         noit_stats_set_metric(&ccl->current, buffer, METRIC_UINT64, &vl->values[i].counter);
1265         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_UINT64, &vl->values[i].counter);
1266         break;
1267
1268       case DS_TYPE_GAUGE:
1269         noit_stats_set_metric(&ccl->current, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1270         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_DOUBLE, &vl->values[i].gauge);
1271         break;
1272
1273       case DS_TYPE_DERIVE:
1274         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].derive);
1275         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_INT64, &vl->values[i].derive);
1276         break;
1277
1278       case DS_TYPE_ABSOLUTE:
1279         noit_stats_set_metric(&ccl->current, buffer, METRIC_INT64, &vl->values[i].absolute);
1280         if(immediate) noit_stats_log_immediate_metric(check, buffer, METRIC_INT64, &vl->values[i].absolute);
1281         break;
1282
1283       default:
1284         noitL(noit_debug, "collectd: parse_part_values: "
1285               "Don't know how to handle data source type %"PRIu8 "\n",
1286               vl->types[i]);
1287         return (-1);
1288     } /* switch (value_types[i]) */
1289     ccl->stats_count++;
1290     noitL(nldeb, "collectd: queue_values(%s, %s)\n", buffer, check->target);
1291   }
1292   return 0;
1293 }
1294
1295 static void clear_closure(collectd_closure_t *ccl) {
1296   ccl->stats_count = 0;
1297   ccl->ntfy_count = 0;
1298   noit_check_stats_clear(&ccl->current);
1299
1300 }
1301
1302 static int collectd_submit(noit_module_t *self, noit_check_t *check,
1303                            noit_check_t *cause) {
1304   collectd_closure_t *ccl;
1305   struct timeval duration;
1306   /* We are passive, so we don't do anything for transient checks */
1307   if(check->flags & NP_TRANSIENT) return 0;
1308
1309   noit_collects_check_aynsch(self, check);
1310   if(!check->closure) {
1311     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1312     memset(ccl, 0, sizeof(collectd_closure_t));
1313   } else {
1314     // Don't count the first run
1315     char human_buffer[256];
1316     ccl = (collectd_closure_t*)check->closure;
1317     gettimeofday(&ccl->current.whence, NULL);
1318     sub_timeval(ccl->current.whence, check->last_fire_time, &duration);
1319     ccl->current.duration = duration.tv_sec; // + duration.tv_usec / (1000 * 1000);
1320
1321     snprintf(human_buffer, sizeof(human_buffer),
1322              "dur=%d,run=%d,stats=%d,ntfy=%d", ccl->current.duration,
1323              check->generation, ccl->stats_count, ccl->ntfy_count);
1324     noitL(nldeb, "collectd(%s) [%s]\n", check->target, human_buffer);
1325
1326     // Not sure what to do here
1327     ccl->current.available = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1328         NP_AVAILABLE : NP_UNAVAILABLE;
1329     ccl->current.state = (ccl->ntfy_count > 0 || ccl->stats_count > 0) ?
1330         NP_GOOD : NP_BAD;
1331     ccl->current.status = human_buffer;
1332     noit_check_passive_set_stats(self, check, &ccl->current);
1333
1334     memcpy(&check->last_fire_time, &ccl->current.whence, sizeof(duration));
1335   }
1336   clear_closure(ccl);
1337   return 0;
1338 }
1339
1340 struct collectd_pkt {
1341   noit_module_t *self;  /* which collect load context */
1342   char *payload;
1343   int len;
1344 };
1345
1346 static int
1347 push_packet_at_check(noit_check_t *check, void *closure) {
1348   struct collectd_pkt *pkt = closure;
1349   collectd_closure_t *ccl;
1350   char *security_buffer;
1351   collectd_mod_config_t *conf;
1352   conf = noit_module_get_userdata(pkt->self);
1353
1354   /* We need a check, and a collectd one at that */
1355   if (!check || strcmp(check->module, "collectd")) return 0;
1356
1357   // If its a new check retrieve some values
1358   if (check->closure == NULL) {
1359     // TODO: Verify if it could somehow retrieve data before the check closure exists
1360     ccl = check->closure = (void *)calloc(1, sizeof(collectd_closure_t));
1361     memset(ccl, 0, sizeof(collectd_closure_t));
1362   } else {
1363     ccl = (collectd_closure_t*)check->closure;
1364   }
1365   // Default to NONE
1366   ccl->security_level = SECURITY_LEVEL_NONE;
1367   if (noit_hash_retr_str(check->config, "security_level", strlen("security_level"),
1368                          (const char**)&security_buffer))
1369   {
1370     ccl->security_level = atoi(security_buffer);
1371   }
1372
1373   // Is this outside to keep updates happening?
1374   if (!noit_hash_retr_str(check->config, "username", strlen("username"),
1375                          (const char**)&ccl->username) &&
1376       !noit_hash_retr_str(conf->options, "username", strlen("username"),
1377                          (const char**)&ccl->username))
1378   {
1379     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1380       noitL(nlerr, "collectd: no username defined for check.\n");
1381       return 0;
1382     } else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1383       noitL(nlerr, "collectd: no username defined for check, "
1384           "will accept any signed packet.\n");
1385     }
1386   }
1387
1388   if(!ccl->secret)
1389     noit_hash_retr_str(check->config, "secret", strlen("secret"),
1390                        (const char**)&ccl->secret);
1391   if(!ccl->secret)
1392     noit_hash_retr_str(conf->options, "secret", strlen("secret"),
1393                        (const char**)&ccl->secret);
1394   if(!ccl->secret) {
1395     if (ccl->security_level == SECURITY_LEVEL_ENCRYPT) {
1396       noitL(nlerr, "collectd: no secret defined for check.\n");
1397       return 0;
1398     }
1399     else if (ccl->security_level == SECURITY_LEVEL_SIGN) {
1400       noitL(nlerr, "collectd: no secret defined for check, "
1401           "will accept any signed packet.\n");
1402     }
1403   }
1404
1405   parse_packet(ccl, pkt->self, check, pkt->payload, pkt->len, 0);
1406   return 1;
1407 }
1408
1409 static int noit_collectd_handler(eventer_t e, int mask, void *closure,
1410                              struct timeval *now) {
1411   union {
1412     struct sockaddr_in  skaddr;
1413     struct sockaddr_in6 skaddr6;
1414   } remote;
1415   char packet[1500];
1416   int packet_len = sizeof(packet);
1417   unsigned int from_len;
1418   char ip_p[INET6_ADDRSTRLEN];
1419   noit_module_t *self = (noit_module_t *)closure;
1420   collectd_mod_config_t *conf;
1421   conf = noit_module_get_userdata(self);
1422
1423
1424   // Get the username and password of the string
1425
1426   while(1) {
1427     struct collectd_pkt pkt;
1428     int inlen, check_cnt;
1429
1430     from_len = sizeof(remote);
1431
1432     inlen = recvfrom(e->fd, packet, packet_len, 0,
1433                      (struct sockaddr *)&remote, &from_len);
1434     gettimeofday(now, NULL); /* set it, as we care about accuracy */
1435
1436     if(inlen < 0) {
1437       if(errno == EAGAIN || errno == EINTR) break;
1438       noitLT(nlerr, now, "collectd: recvfrom: %s\n", strerror(errno));
1439       break;
1440     }
1441     if (from_len == sizeof(remote.skaddr)) {
1442       if (!inet_ntop(AF_INET, &(remote.skaddr.sin_addr), ip_p, INET_ADDRSTRLEN)) {
1443         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1444         break;
1445       }
1446     }
1447     else if(from_len == sizeof(remote.skaddr6)) {
1448       if (!inet_ntop(AF_INET6, &(remote.skaddr6.sin6_addr), ip_p, INET6_ADDRSTRLEN)) {
1449         noitLT(nlerr, now, "collectd: inet_ntop failed: %s\n", strerror(errno));
1450         break;
1451       }
1452     }
1453     else {
1454       noitLT(nlerr, now, "collectd: could not determine address family of remote\n");
1455       break;
1456     }
1457
1458     pkt.self = self;
1459     pkt.payload = packet;
1460     pkt.len = inlen;
1461     check_cnt = noit_poller_target_do(ip_p, push_packet_at_check ,&pkt);
1462     if(check_cnt == 0)
1463       noitL(nlerr, "collectd: No defined check from ip [%s].\n", ip_p);
1464   }
1465   return EVENTER_READ | EVENTER_EXCEPTION;
1466 }
1467
1468 static int noit_collectd_initiate_check(noit_module_t *self,
1469                                         noit_check_t *check,
1470                                         int once, noit_check_t *cause) {
1471   /* The idea is to write the collectd stuff to the stats one every period
1472    * Then we can warn people if no stats where written in a period of time
1473    */
1474   INITIATE_CHECK(collectd_submit, self, check, cause);
1475   return 0;
1476 }
1477
1478 static int noit_collectd_config(noit_module_t *self, noit_hash_table *options) {
1479   collectd_mod_config_t *conf;
1480   conf = noit_module_get_userdata(self);
1481   if(conf) {
1482     if(conf->options) {
1483       noit_hash_destroy(conf->options, free, free);
1484       free(conf->options);
1485     }
1486   }
1487   else
1488     conf = calloc(1, sizeof(*conf));
1489   conf->options = options;
1490   noit_module_set_userdata(self, conf);
1491   return 1;
1492 }
1493
1494 static int noit_collectd_onload(noit_image_t *self) {
1495   if(!nlerr) nlerr = noit_log_stream_find("error/collectd");
1496   if(!nldeb) nldeb = noit_log_stream_find("debug/collectd");
1497   if(!nlerr) nlerr = noit_error;
1498   if(!nldeb) nldeb = noit_debug;
1499   eventer_name_callback("noit_collectd/handler", noit_collectd_handler);
1500   return 0;
1501 }
1502
1503 static int noit_collectd_init(noit_module_t *self) {
1504   const char *config_val;
1505   int sockaddr_len;
1506   collectd_mod_config_t *conf;
1507   conf = noit_module_get_userdata(self);
1508   int portint = 0;
1509   struct sockaddr_in skaddr;
1510   struct sockaddr_in6 skaddr6;
1511   struct in6_addr in6addr_any = IN6ADDR_ANY_INIT;
1512   const char *host;
1513   unsigned short port;
1514
1515   conf->support_notifications = noit_true;
1516   if(noit_hash_retr_str(conf->options,
1517                         "notifications", strlen("notifications"),
1518                         (const char **)&config_val)) {
1519     if(!strcasecmp(config_val, "false") || !strcasecmp(config_val, "off"))
1520       conf->support_notifications = noit_false;
1521   }
1522   conf->asynch_metrics = noit_false;
1523   if(noit_hash_retr_str(conf->options,
1524                         "asynch_metrics", strlen("asynch_metrics"),
1525                         (const char **)&config_val)) {
1526     if(!strcasecmp(config_val, "true") || !strcasecmp(config_val, "on"))
1527       conf->asynch_metrics = noit_true;
1528   }
1529
1530   /* Default Collectd port */
1531   portint = NET_DEFAULT_PORT;
1532   if(noit_hash_retr_str(conf->options,
1533                          "collectd_port", strlen("collectd_port"),
1534                          (const char**)&config_val))
1535     portint = atoi(config_val);
1536
1537
1538   if(!noit_hash_retr_str(conf->options,
1539                          "collectd_host", strlen("collectd_host"),
1540                          (const char**)&host))
1541     host = "*";
1542
1543   port = (unsigned short) portint;
1544
1545   conf->ipv4_fd = conf->ipv6_fd = -1;
1546
1547   conf->ipv4_fd = socket(PF_INET, SOCK_DGRAM, 0);
1548   if(conf->ipv4_fd < 0) {
1549     noitL(noit_error, "collectd: socket failed: %s\n",
1550           strerror(errno));
1551   }
1552   else {
1553     if(eventer_set_fd_nonblocking(conf->ipv4_fd)) {
1554       close(conf->ipv4_fd);
1555       conf->ipv4_fd = -1;
1556       noitL(noit_error,
1557             "collectd: could not set socket non-blocking: %s\n",
1558             strerror(errno));
1559     }
1560   }
1561   skaddr.sin_family = AF_INET;
1562   skaddr.sin_addr.s_addr = htonl(INADDR_ANY);
1563   skaddr.sin_port = htons(port);
1564
1565   sockaddr_len = sizeof(skaddr);
1566   if(bind(conf->ipv4_fd, (struct sockaddr *)&skaddr, sockaddr_len) < 0) {
1567     noitL(noit_error, "bind failed[%s]: %s\n", host, strerror(errno));
1568     close(conf->ipv4_fd);
1569     return -1;
1570   }
1571
1572   if(conf->ipv4_fd >= 0) {
1573     eventer_t newe;
1574     newe = eventer_alloc();
1575     newe->fd = conf->ipv4_fd;
1576     newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
1577     newe->callback = noit_collectd_handler;
1578     newe->closure = self;
1579     eventer_add(newe);
1580   }
1581
1582   conf->ipv6_fd = socket(AF_INET6, SOCK_DGRAM, 0);
1583   if(conf->ipv6_fd < 0) {
1584     noitL(noit_error, "collectd: IPv6 socket failed: %s\n",
1585           strerror(errno));
1586   }
1587   else {
1588     if(eventer_set_fd_nonblocking(conf->ipv6_fd)) {
1589       close(conf->ipv6_fd);
1590       conf->ipv6_fd = -1;
1591       noitL(noit_error,
1592             "collectd: could not set socket non-blocking: %s\n",
1593                strerror(errno));
1594     }
1595   }
1596   sockaddr_len = sizeof(skaddr6);
1597   memset(&skaddr6, 0, sizeof(skaddr6));
1598   skaddr6.sin6_family = AF_INET6;
1599   skaddr6.sin6_addr = in6addr_any;
1600   skaddr6.sin6_port = htons(port);
1601
1602   if(bind(conf->ipv6_fd, (struct sockaddr *)&skaddr6, sockaddr_len) < 0) {
1603     noitL(noit_error, "bind(IPv6) failed[%s]: %s\n", host, strerror(errno));
1604     close(conf->ipv6_fd);
1605     conf->ipv6_fd = -1;
1606   }
1607
1608   if(conf->ipv6_fd >= 0) {
1609     eventer_t newe;
1610     newe = eventer_alloc();
1611     newe->fd = conf->ipv6_fd;
1612     newe->mask = EVENTER_READ;
1613     newe->callback = noit_collectd_handler;
1614     newe->closure = self;
1615     eventer_add(newe);
1616   }
1617
1618   noit_module_set_userdata(self, conf);
1619   return 0;
1620 }
1621
1622 #include "collectd.xmlh"
1623 noit_module_t collectd = {
1624   {
1625     NOIT_MODULE_MAGIC,
1626     NOIT_MODULE_ABI_VERSION,
1627     "collectd",
1628     "collectd collection",
1629     collectd_xml_description,
1630     noit_collectd_onload
1631   },
1632   noit_collectd_config,
1633   noit_collectd_init,
1634   noit_collectd_initiate_check,
1635   NULL /* noit_collectd_cleanup */
1636 };
Note: See TracBrowser for help on using the browser.