[Reconnoiter-devel] [reconnoiter commit] r316 - trunk/src

jesus at omniti.com jesus at omniti.com
Tue Jul 1 10:15:44 EDT 2008


Author: jesus
Date: 2008-07-01 10:15:44 -0400 (Tue, 01 Jul 2008)
New Revision: 316

Modified:
   trunk/src/noit.conf
   trunk/src/noit_check_log.c
   trunk/src/noit_conf.c
   trunk/src/noit_conf.h
   trunk/src/noit_conf_checks.c
   trunk/src/noitd.c
   trunk/src/stratcon.conf
   trunk/src/stratcon_datastore.c
Log:
implements configuration pushing into the DB. refs #26

Modified: trunk/src/noit.conf
===================================================================
--- trunk/src/noit.conf	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit.conf	2008-07-01 14:15:44 UTC (rev 316)
@@ -36,6 +36,7 @@
       </log>
       <log name="status"/>
       <log name="metrics"/>
+      <log name="config"/>
     </feeds>
   </logs>
   <modules directory="./modules">

Modified: trunk/src/noit_check_log.c
===================================================================
--- trunk/src/noit_check_log.c	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_check_log.c	2008-07-01 14:15:44 UTC (rev 316)
@@ -15,7 +15,7 @@
 
 /* Log format is tab delimited:
  * NOIT CONFIG (implemented in noit_conf.c):
- *  'n' TIMESTAMP base64(gzip(xmlconfig))
+ *  'n' TIMESTAMP strlen(xmlconfig) base64(gzip(xmlconfig))
  *
  * CHECK:
  *  'C' TIMESTAMP UUID TARGET MODULE NAME

Modified: trunk/src/noit_conf.c
===================================================================
--- trunk/src/noit_conf.c	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_conf.c	2008-07-01 14:15:44 UTC (rev 316)
@@ -13,12 +13,14 @@
 #include <libxml/parser.h>
 #include <libxml/tree.h>
 #include <libxml/xpath.h>
+#include <zlib.h>
 
 #include "noit_conf.h"
 #include "noit_check.h"
 #include "noit_console.h"
 #include "utils/noit_hash.h"
 #include "utils/noit_log.h"
+#include "utils/noit_b64.h"
 
 /* tmp hash impl, replace this with something nice */
 static noit_hash_table _tmp_config = NOIT_HASH_EMPTY;
@@ -26,6 +28,60 @@
 static char master_config_file[PATH_MAX] = "";
 static xmlXPathContextPtr xpath_ctxt = NULL;
 
+/* This is used to notice config changes and journal the config out
+ * using a user-specified function.  It supports allowing multiple config
+ * changed to coalesce so you don't write out 1000 changes in a few seconds.
+ */
+static u_int32_t __config_gen = 0;
+static u_int32_t __config_coalesce = 0;
+static u_int32_t __config_coalesce_time = 0;
+void noit_conf_coalesce_changes(u_int32_t seconds) {
+  __config_coalesce_time = seconds;
+}
+void noit_conf_mark_changed() {
+  /* increment the change counter -- in case anyone cares */
+  __config_gen++;
+  /* reset the coalesce counter.  It is decremented each second and
+   * the journal function fires on a transition from 1 => 0
+   */
+  __config_coalesce = __config_coalesce_time;
+}
+struct recurrent_journaler {
+  int (*journal_config)(void *);
+  void *jc_closure;
+};
+static int
+noit_conf_watch_config_and_journal(eventer_t e, int mask, void *closure,
+                                   struct timeval *now) {
+  struct recurrent_journaler *rj = closure;
+  eventer_t newe;
+
+  if(__config_coalesce == 1)
+    rj->journal_config(rj->jc_closure);
+  if(__config_coalesce > 0)
+    __config_coalesce--;
+
+  /* Schedule the same event to fire a second form now */
+  newe = eventer_alloc();
+  gettimeofday(&newe->whence, NULL);
+  newe->whence.tv_sec += 1;
+  newe->mask = EVENTER_TIMER;
+  newe->callback = noit_conf_watch_config_and_journal;
+  newe->closure = closure;
+  eventer_add(newe);
+  return 0;
+}
+void
+noit_conf_watch_and_journal_watchdog(int (*f)(void *), void *c) {
+  struct recurrent_journaler *rj;
+  struct timeval __now;
+  rj = calloc(1, sizeof(*rj));
+  rj->journal_config = f;
+  rj->jc_closure = c;
+  gettimeofday(&__now, NULL);
+  noit_conf_watch_config_and_journal(NULL, EVENTER_TIMER, rj, &__now);
+}
+
 static noit_hash_table _compiled_fallback = NOIT_HASH_EMPTY;
 static struct {
   const char *key;
@@ -94,6 +150,7 @@
     master_config = new_config;
     xpath_ctxt = xmlXPathNewContext(master_config);
     if(path != master_config_file) realpath(path, master_config_file);
+    noit_conf_mark_changed();
     return 0;
   }
   return -1;
@@ -399,6 +456,80 @@
   return 0;
 }
 
+struct config_line_vstr {
+  char *buff;
+  int raw_len;
+  int len;
+  int allocd;
+  enum { CONFIG_RAW = 0, CONFIG_COMPRESSED, CONFIG_B64 } encoded;
+};
+static int
+noit_config_log_write_xml(void *vstr, const char *buffer, int len) {
+  struct config_line_vstr *clv = vstr;
+  assert(clv->encoded == CONFIG_RAW);
+  if(!clv->buff) {
+    clv->allocd = 8192;
+    clv->buff = malloc(clv->allocd);
+  }
+  while(len + clv->len > clv->allocd) {
+    char *newbuff;
+    int newsize = clv->allocd;
+    newsize <<= 1;
+    newbuff = realloc(clv->buff, newsize);
+    if(!newbuff) {
+      return -1;
+    }
+    clv->allocd = newsize;
+    clv->buff = newbuff;
+  }
+  memcpy(clv->buff + clv->len, buffer, len);
+  clv->len += len;
+  return len;
+}
+static int
+noit_config_log_close_xml(void *vstr) {
+  struct config_line_vstr *clv = vstr;
+  uLong initial_dlen, dlen;
+  char *compbuff, *b64buff;
+
+  if(clv->buff == NULL) {
+    clv->encoded = CONFIG_B64;
+    return 0;
+  }
+  clv->raw_len = clv->len;
+  assert(clv->encoded == CONFIG_RAW);
+  /* Compress */
+  initial_dlen = dlen = compressBound(clv->len);
+  compbuff = malloc(initial_dlen);
+  if(!compbuff) return -1;
+  if(Z_OK != compress2((Bytef *)compbuff, &dlen,
+                       (Bytef *)clv->buff, clv->len, 9)) {
+    noitL(noit_error, "Error compressing config for transmission.\n");
+    free(compbuff);
+    return -1;
+  }
+  free(clv->buff);
+  clv->buff = compbuff;
+  clv->allocd = initial_dlen;
+  clv->len = dlen;
+  clv->encoded = CONFIG_COMPRESSED;
+  /* Encode */
+  initial_dlen = ((clv->len + 2) / 3) * 4;
+  b64buff = malloc(initial_dlen);
+  dlen = noit_b64_encode((unsigned char *)clv->buff, clv->len,
+                         b64buff, initial_dlen);
+  if(dlen == 0) {
+    free(b64buff);
+    return -1;
+  }
+  free(clv->buff);
+  clv->buff = b64buff;
+  clv->allocd = initial_dlen;
+  clv->len = dlen;
+  clv->encoded = CONFIG_B64;
+  return 0;
+}
+
 int
 noit_conf_reload(noit_console_closure_t ncct,
                  int argc, char **argv,
@@ -460,7 +591,41 @@
   nc_printf(ncct, "%d bytes written.\n", len);
   return 0;
 }
+int
+noit_conf_write_log() {
+  static u_int32_t last_write_gen = 0;
+  static noit_log_stream_t config_log = NULL;
+  struct timeval __now;
+  xmlOutputBufferPtr out;
+  xmlCharEncodingHandlerPtr enc;
+  struct config_line_vstr *clv;
+  SETUP_LOG(config, return -1);
 
+  /* We know we haven't changed */
+  if(last_write_gen == __config_gen) return 0;
+
+  gettimeofday(&__now, NULL);
+  clv = calloc(1, sizeof(*clv));
+  enc = xmlGetCharEncodingHandler(XML_CHAR_ENCODING_UTF8);
+  out = xmlOutputBufferCreateIO(noit_config_log_write_xml,
+                                noit_config_log_close_xml,
+                                clv, enc);
+  xmlSaveFormatFileTo(out, master_config, "utf8", 1);
+  if(clv->encoded != CONFIG_B64) {
+    noitL(noit_error, "Error logging configuration\n");
+    if(clv->buff) free(clv->buff);
+    free(clv);
+    return -1;
+  }
+  noitL(config_log, "n\t%lu.%03lu\t%d\t%.*s\n",
+        __now.tv_sec, __now.tv_usec / 1000UL, clv->raw_len,
+        clv->len, clv->buff);
+  free(clv->buff);
+  free(clv);
+  last_write_gen = __config_gen;
+  return 0;
+}
+
 void
 noit_conf_log_init(const char *toplevel) {
   int i, cnt = 0, o, ocnt = 0;

Modified: trunk/src/noit_conf.h
===================================================================
--- trunk/src/noit_conf.h	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_conf.h	2008-07-01 14:15:44 UTC (rev 316)
@@ -22,6 +22,17 @@
   char prompt[50];
 } noit_conf_t_userdata_t;
 
+/* seconds == 0 disable config journaling watchdog */
+API_EXPORT(void) noit_conf_coalesce_changes(u_int32_t seconds);
+/* Start the watchdog */
+API_EXPORT(void) noit_conf_watch_and_journal_watchdog(int (*f)(void *), void *c);
+
+/* marks the config as changed.. if you manipulate the XML tree in any way
+ * you must call this function to "let the system know."  This is used
+ * to notice changes which are in turn flushed out.
+ */
+API_EXPORT(void) noit_conf_mark_changed();
+
 API_EXPORT(void) noit_conf_init(const char *toplevel);
 API_EXPORT(int) noit_conf_load(const char *path);
 API_EXPORT(int) noit_conf_save(const char *path);
@@ -72,6 +83,9 @@
                        int argc, char **argv,
                        noit_console_state_t *state, void *closure);
 
+API_EXPORT(int)
+  noit_conf_write_log();
+
 API_EXPORT(void) noit_conf_log_init(const char *toplevel);
 
 #endif

Modified: trunk/src/noit_conf_checks.c
===================================================================
--- trunk/src/noit_conf_checks.c	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_conf_checks.c	2008-07-01 14:15:44 UTC (rev 316)
@@ -175,6 +175,7 @@
     xmlUnsetProp(node, (xmlChar *)attrinfo->name);
     if(val)
       xmlSetProp(node, (xmlChar *)attrinfo->name, (xmlChar *)val);
+    noit_conf_mark_changed();
   }
   return error;
 }
@@ -212,8 +213,10 @@
       /* Something went wrong, remove the node */
       xmlUnlinkNode(newnode);
     }
-    else
+    else {
+      noit_conf_mark_changed();
       rv = 0;
+    }
   }
  out:
   if(pobj) xmlXPathFreeObject(pobj);
@@ -482,6 +485,7 @@
         noit_poller_deschedule(checkid);
         xmlUnlinkNode(node);
       }
+      noit_conf_mark_changed();
     }
   }
   if(argc > 1) {
@@ -584,6 +588,7 @@
   if(delete) {
     node = (noit_conf_section_t)xmlXPathNodeSetItem(pobj->nodesetval, 0);
     xmlUnlinkNode(node);
+    noit_conf_mark_changed();
     return 0;
   }
   if(pobj) xmlXPathFreeObject(pobj);
@@ -598,6 +603,7 @@
   }
   node = (noit_conf_section_t)xmlXPathNodeSetItem(pobj->nodesetval, 0);
   if((newnode = xmlNewChild(node, NULL, (xmlChar *)argv[0], NULL)) != NULL) {
+    noit_conf_mark_changed();
     if(info->path) free(info->path);
     info->path = strdup((char *)xmlGetNodePath(newnode) + strlen("/noit"));
   }
@@ -1008,8 +1014,8 @@
     assert(confignode);
     /* Now we create a child */
     xmlNewChild(confignode, NULL, (xmlChar *)name, (xmlChar *)value);
-    
   }
+  noit_conf_mark_changed();
   rv = 0;
  out:
   if(pobj) xmlXPathFreeObject(pobj);
@@ -1080,6 +1086,7 @@
   xmlUnsetProp(node, (xmlChar *)attrinfo->name);
   if(value)
     xmlSetProp(node, (xmlChar *)attrinfo->name, (xmlChar *)value);
+  noit_conf_mark_changed();
   rv = 0;
  out:
   if(pobj) xmlXPathFreeObject(pobj);

Modified: trunk/src/noitd.c
===================================================================
--- trunk/src/noitd.c	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noitd.c	2008-07-01 14:15:44 UTC (rev 316)
@@ -109,6 +109,11 @@
   noit_poller_init();
   noit_listener_init(APPNAME);
 
+  /* Write our log out, and setup a watchdog to write it out on change. */
+  noit_conf_write_log();
+  noit_conf_coalesce_changes(10); /* 10 seconds of no changes before we write */
+  noit_conf_watch_and_journal_watchdog(noit_conf_write_log, NULL);
+
   eventer_loop();
   return 0;
 }

Modified: trunk/src/stratcon.conf
===================================================================
--- trunk/src/stratcon.conf	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/stratcon.conf	2008-07-01 14:15:44 UTC (rev 316)
@@ -60,6 +60,12 @@
              VALUES ('epoch'::timestamptz + ($1 || ' seconds')::interval,
                      stratcon.generate_sid_from_id($2), $3, $4)
       ]]></metric_text>
+      <config><![CDATA[
+        SELECT stratcon.update_config
+               ($1, $2, 
+                'epoch'::timestamptz + ($3 || ' seconds')::interval,
+                $4 )
+      ]]></config>
     </statements>
   </database>
 

Modified: trunk/src/stratcon_datastore.c
===================================================================
--- trunk/src/stratcon_datastore.c	2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/stratcon_datastore.c	2008-07-01 14:15:44 UTC (rev 316)
@@ -6,6 +6,7 @@
 #include "noit_defines.h"
 #include "eventer/eventer.h"
 #include "utils/noit_log.h"
+#include "utils/noit_b64.h"
 #include "stratcon_datastore.h"
 #include "noit_conf.h"
 #include "noit_check.h"
@@ -14,6 +15,7 @@
 #include <sys/un.h>
 #include <arpa/inet.h>
 #include <libpq-fe.h>
+#include <zlib.h>
 
 static char *check_insert = NULL;
 static const char *check_insert_conf = "/stratcon/database/statements/check";
@@ -23,6 +25,8 @@
 static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
 static char *metric_insert_text = NULL;
 static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
+static char *config_insert = NULL;
+static const char *config_insert_conf = "/stratcon/database/statements/config";
 
 #define GET_QUERY(a) do { \
   if(a == NULL) \
@@ -141,6 +145,8 @@
 execute_outcome_t
 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
   int type, len;
+  char *final_buff;
+  uLong final_len, actual_final_len;;
   char *token;
 
   type = d->data[0];
@@ -187,6 +193,42 @@
     PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
     switch(type) {
       /* See noit_check_log.c for log description */
+      case 'n':
+        DECLARE_PARAM_STR(raddr, strlen(raddr));
+        DECLARE_PARAM_STR("noitd",5); /* node_type */
+        PROCESS_NEXT_FIELD(token,len);
+        DECLARE_PARAM_STR(token,len); /* timestamp */
+
+        /* This is the expected uncompressed len */
+        PROCESS_NEXT_FIELD(token,len);
+        final_len = atoi(token);
+        final_buff = malloc(final_len);
+        if(!final_buff) goto bad_row;
+  
+        /* The last token is b64 endoded and compressed.
+         * we need to decode it, declare it and then free it.
+         */
+        PROCESS_LAST_FIELD(token, len);
+        /* We can in-place decode this */
+        len = noit_b64_decode((char *)token, len,
+                              (unsigned char *)token, len);
+        if(len <= 0) {
+          noitL(noit_error, "noitd config base64 decoding error.\n");
+          goto bad_row;
+        }
+        actual_final_len = final_len;
+        if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
+                              (unsigned char *)token, len)) {
+          noitL(noit_error, "noitd config decompression failure.\n");
+          goto bad_row;
+        }
+        if(final_len != actual_final_len) {
+          noitL(noit_error, "noitd config decompression error.\n");
+          goto bad_row;
+        }
+        DECLARE_PARAM_STR(final_buff, final_len);
+        free(final_buff);
+        break;
       case 'C':
         DECLARE_PARAM_STR(raddr, strlen(raddr));
         PROCESS_NEXT_FIELD(token,len);
@@ -239,9 +281,10 @@
                      (const char * const *)d->paramValues, \
                      d->paramLengths, d->paramFormats, 0); \
   rv = PQresultStatus(res); \
-  if(rv != PGRES_COMMAND_OK) { \
-    noitL(noit_error, "stratcon datasource bad row: %s\n", \
-          PQresultErrorMessage(res)); \
+  if(rv != PGRES_COMMAND_OK && \
+     rv != PGRES_TUPLES_OK) { \
+    noitL(noit_error, "stratcon datasource bad row (%d): %s\n", \
+          rv, PQresultErrorMessage(res)); \
     PQclear(res); \
     goto bad_row; \
   } \
@@ -250,6 +293,10 @@
 
   /* Now execute the query */
   switch(type) {
+    case 'n':
+      GET_QUERY(config_insert);
+      PG_EXEC(config_insert);
+      break;
     case 'C':
       GET_QUERY(check_insert);
       PG_EXEC(check_insert);



More information about the Reconnoiter-devel mailing list