[Reconnoiter-devel] [reconnoiter commit] r316 - trunk/src
jesus at omniti.com
jesus at omniti.com
Tue Jul 1 10:15:44 EDT 2008
Author: jesus
Date: 2008-07-01 10:15:44 -0400 (Tue, 01 Jul 2008)
New Revision: 316
Modified:
trunk/src/noit.conf
trunk/src/noit_check_log.c
trunk/src/noit_conf.c
trunk/src/noit_conf.h
trunk/src/noit_conf_checks.c
trunk/src/noitd.c
trunk/src/stratcon.conf
trunk/src/stratcon_datastore.c
Log:
implements configuration pushing into the DB. refs #26
Modified: trunk/src/noit.conf
===================================================================
--- trunk/src/noit.conf 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit.conf 2008-07-01 14:15:44 UTC (rev 316)
@@ -36,6 +36,7 @@
</log>
<log name="status"/>
<log name="metrics"/>
+ <log name="config"/>
</feeds>
</logs>
<modules directory="./modules">
Modified: trunk/src/noit_check_log.c
===================================================================
--- trunk/src/noit_check_log.c 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_check_log.c 2008-07-01 14:15:44 UTC (rev 316)
@@ -15,7 +15,7 @@
/* Log format is tab delimited:
* NOIT CONFIG (implemented in noit_conf.c):
- * 'n' TIMESTAMP base64(gzip(xmlconfig))
+ * 'n' TIMESTAMP strlen(xmlconfig) base64(gzip(xmlconfig))
*
* CHECK:
* 'C' TIMESTAMP UUID TARGET MODULE NAME
Modified: trunk/src/noit_conf.c
===================================================================
--- trunk/src/noit_conf.c 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_conf.c 2008-07-01 14:15:44 UTC (rev 316)
@@ -13,12 +13,14 @@
#include <libxml/parser.h>
#include <libxml/tree.h>
#include <libxml/xpath.h>
+#include <zlib.h>
#include "noit_conf.h"
#include "noit_check.h"
#include "noit_console.h"
#include "utils/noit_hash.h"
#include "utils/noit_log.h"
+#include "utils/noit_b64.h"
/* tmp hash impl, replace this with something nice */
static noit_hash_table _tmp_config = NOIT_HASH_EMPTY;
@@ -26,6 +28,60 @@
static char master_config_file[PATH_MAX] = "";
static xmlXPathContextPtr xpath_ctxt = NULL;
+/* This is used to notice config changes and journal the config out
+ * using a user-specified function. It supports allowing multiple config
+ * changed to coalesce so you don't write out 1000 changes in a few seconds.
+ */
+static u_int32_t __config_gen = 0;
+static u_int32_t __config_coalesce = 0;
+static u_int32_t __config_coalesce_time = 0;
+void noit_conf_coalesce_changes(u_int32_t seconds) {
+ __config_coalesce_time = seconds;
+}
+void noit_conf_mark_changed() {
+ /* increment the change counter -- in case anyone cares */
+ __config_gen++;
+ /* reset the coalesce counter. It is decremented each second and
+ * the journal function fires on a transition from 1 => 0
+ */
+ __config_coalesce = __config_coalesce_time;
+}
+struct recurrent_journaler {
+ int (*journal_config)(void *);
+ void *jc_closure;
+};
+static int
+noit_conf_watch_config_and_journal(eventer_t e, int mask, void *closure,
+ struct timeval *now) {
+ struct recurrent_journaler *rj = closure;
+ eventer_t newe;
+
+ if(__config_coalesce == 1)
+ rj->journal_config(rj->jc_closure);
+ if(__config_coalesce > 0)
+ __config_coalesce--;
+
+ /* Schedule the same event to fire a second form now */
+ newe = eventer_alloc();
+ gettimeofday(&newe->whence, NULL);
+ newe->whence.tv_sec += 1;
+ newe->mask = EVENTER_TIMER;
+ newe->callback = noit_conf_watch_config_and_journal;
+ newe->closure = closure;
+ eventer_add(newe);
+ return 0;
+}
+void
+noit_conf_watch_and_journal_watchdog(int (*f)(void *), void *c) {
+ struct recurrent_journaler *rj;
+ struct timeval __now;
+ rj = calloc(1, sizeof(*rj));
+ rj->journal_config = f;
+ rj->jc_closure = c;
+ gettimeofday(&__now, NULL);
+ noit_conf_watch_config_and_journal(NULL, EVENTER_TIMER, rj, &__now);
+}
+
static noit_hash_table _compiled_fallback = NOIT_HASH_EMPTY;
static struct {
const char *key;
@@ -94,6 +150,7 @@
master_config = new_config;
xpath_ctxt = xmlXPathNewContext(master_config);
if(path != master_config_file) realpath(path, master_config_file);
+ noit_conf_mark_changed();
return 0;
}
return -1;
@@ -399,6 +456,80 @@
return 0;
}
+struct config_line_vstr {
+ char *buff;
+ int raw_len;
+ int len;
+ int allocd;
+ enum { CONFIG_RAW = 0, CONFIG_COMPRESSED, CONFIG_B64 } encoded;
+};
+static int
+noit_config_log_write_xml(void *vstr, const char *buffer, int len) {
+ struct config_line_vstr *clv = vstr;
+ assert(clv->encoded == CONFIG_RAW);
+ if(!clv->buff) {
+ clv->allocd = 8192;
+ clv->buff = malloc(clv->allocd);
+ }
+ while(len + clv->len > clv->allocd) {
+ char *newbuff;
+ int newsize = clv->allocd;
+ newsize <<= 1;
+ newbuff = realloc(clv->buff, newsize);
+ if(!newbuff) {
+ return -1;
+ }
+ clv->allocd = newsize;
+ clv->buff = newbuff;
+ }
+ memcpy(clv->buff + clv->len, buffer, len);
+ clv->len += len;
+ return len;
+}
+static int
+noit_config_log_close_xml(void *vstr) {
+ struct config_line_vstr *clv = vstr;
+ uLong initial_dlen, dlen;
+ char *compbuff, *b64buff;
+
+ if(clv->buff == NULL) {
+ clv->encoded = CONFIG_B64;
+ return 0;
+ }
+ clv->raw_len = clv->len;
+ assert(clv->encoded == CONFIG_RAW);
+ /* Compress */
+ initial_dlen = dlen = compressBound(clv->len);
+ compbuff = malloc(initial_dlen);
+ if(!compbuff) return -1;
+ if(Z_OK != compress2((Bytef *)compbuff, &dlen,
+ (Bytef *)clv->buff, clv->len, 9)) {
+ noitL(noit_error, "Error compressing config for transmission.\n");
+ free(compbuff);
+ return -1;
+ }
+ free(clv->buff);
+ clv->buff = compbuff;
+ clv->allocd = initial_dlen;
+ clv->len = dlen;
+ clv->encoded = CONFIG_COMPRESSED;
+ /* Encode */
+ initial_dlen = ((clv->len + 2) / 3) * 4;
+ b64buff = malloc(initial_dlen);
+ dlen = noit_b64_encode((unsigned char *)clv->buff, clv->len,
+ b64buff, initial_dlen);
+ if(dlen == 0) {
+ free(b64buff);
+ return -1;
+ }
+ free(clv->buff);
+ clv->buff = b64buff;
+ clv->allocd = initial_dlen;
+ clv->len = dlen;
+ clv->encoded = CONFIG_B64;
+ return 0;
+}
+
int
noit_conf_reload(noit_console_closure_t ncct,
int argc, char **argv,
@@ -460,7 +591,41 @@
nc_printf(ncct, "%d bytes written.\n", len);
return 0;
}
+int
+noit_conf_write_log() {
+ static u_int32_t last_write_gen = 0;
+ static noit_log_stream_t config_log = NULL;
+ struct timeval __now;
+ xmlOutputBufferPtr out;
+ xmlCharEncodingHandlerPtr enc;
+ struct config_line_vstr *clv;
+ SETUP_LOG(config, return -1);
+ /* We know we haven't changed */
+ if(last_write_gen == __config_gen) return 0;
+
+ gettimeofday(&__now, NULL);
+ clv = calloc(1, sizeof(*clv));
+ enc = xmlGetCharEncodingHandler(XML_CHAR_ENCODING_UTF8);
+ out = xmlOutputBufferCreateIO(noit_config_log_write_xml,
+ noit_config_log_close_xml,
+ clv, enc);
+ xmlSaveFormatFileTo(out, master_config, "utf8", 1);
+ if(clv->encoded != CONFIG_B64) {
+ noitL(noit_error, "Error logging configuration\n");
+ if(clv->buff) free(clv->buff);
+ free(clv);
+ return -1;
+ }
+ noitL(config_log, "n\t%lu.%03lu\t%d\t%.*s\n",
+ __now.tv_sec, __now.tv_usec / 1000UL, clv->raw_len,
+ clv->len, clv->buff);
+ free(clv->buff);
+ free(clv);
+ last_write_gen = __config_gen;
+ return 0;
+}
+
void
noit_conf_log_init(const char *toplevel) {
int i, cnt = 0, o, ocnt = 0;
Modified: trunk/src/noit_conf.h
===================================================================
--- trunk/src/noit_conf.h 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_conf.h 2008-07-01 14:15:44 UTC (rev 316)
@@ -22,6 +22,17 @@
char prompt[50];
} noit_conf_t_userdata_t;
+/* seconds == 0 disable config journaling watchdog */
+API_EXPORT(void) noit_conf_coalesce_changes(u_int32_t seconds);
+/* Start the watchdog */
+API_EXPORT(void) noit_conf_watch_and_journal_watchdog(int (*f)(void *), void *c);
+
+/* marks the config as changed.. if you manipulate the XML tree in any way
+ * you must call this function to "let the system know." This is used
+ * to notice changes which are in turn flushed out.
+ */
+API_EXPORT(void) noit_conf_mark_changed();
+
API_EXPORT(void) noit_conf_init(const char *toplevel);
API_EXPORT(int) noit_conf_load(const char *path);
API_EXPORT(int) noit_conf_save(const char *path);
@@ -72,6 +83,9 @@
int argc, char **argv,
noit_console_state_t *state, void *closure);
+API_EXPORT(int)
+ noit_conf_write_log();
+
API_EXPORT(void) noit_conf_log_init(const char *toplevel);
#endif
Modified: trunk/src/noit_conf_checks.c
===================================================================
--- trunk/src/noit_conf_checks.c 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noit_conf_checks.c 2008-07-01 14:15:44 UTC (rev 316)
@@ -175,6 +175,7 @@
xmlUnsetProp(node, (xmlChar *)attrinfo->name);
if(val)
xmlSetProp(node, (xmlChar *)attrinfo->name, (xmlChar *)val);
+ noit_conf_mark_changed();
}
return error;
}
@@ -212,8 +213,10 @@
/* Something went wrong, remove the node */
xmlUnlinkNode(newnode);
}
- else
+ else {
+ noit_conf_mark_changed();
rv = 0;
+ }
}
out:
if(pobj) xmlXPathFreeObject(pobj);
@@ -482,6 +485,7 @@
noit_poller_deschedule(checkid);
xmlUnlinkNode(node);
}
+ noit_conf_mark_changed();
}
}
if(argc > 1) {
@@ -584,6 +588,7 @@
if(delete) {
node = (noit_conf_section_t)xmlXPathNodeSetItem(pobj->nodesetval, 0);
xmlUnlinkNode(node);
+ noit_conf_mark_changed();
return 0;
}
if(pobj) xmlXPathFreeObject(pobj);
@@ -598,6 +603,7 @@
}
node = (noit_conf_section_t)xmlXPathNodeSetItem(pobj->nodesetval, 0);
if((newnode = xmlNewChild(node, NULL, (xmlChar *)argv[0], NULL)) != NULL) {
+ noit_conf_mark_changed();
if(info->path) free(info->path);
info->path = strdup((char *)xmlGetNodePath(newnode) + strlen("/noit"));
}
@@ -1008,8 +1014,8 @@
assert(confignode);
/* Now we create a child */
xmlNewChild(confignode, NULL, (xmlChar *)name, (xmlChar *)value);
-
}
+ noit_conf_mark_changed();
rv = 0;
out:
if(pobj) xmlXPathFreeObject(pobj);
@@ -1080,6 +1086,7 @@
xmlUnsetProp(node, (xmlChar *)attrinfo->name);
if(value)
xmlSetProp(node, (xmlChar *)attrinfo->name, (xmlChar *)value);
+ noit_conf_mark_changed();
rv = 0;
out:
if(pobj) xmlXPathFreeObject(pobj);
Modified: trunk/src/noitd.c
===================================================================
--- trunk/src/noitd.c 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/noitd.c 2008-07-01 14:15:44 UTC (rev 316)
@@ -109,6 +109,11 @@
noit_poller_init();
noit_listener_init(APPNAME);
+ /* Write our log out, and setup a watchdog to write it out on change. */
+ noit_conf_write_log();
+ noit_conf_coalesce_changes(10); /* 10 seconds of no changes before we write */
+ noit_conf_watch_and_journal_watchdog(noit_conf_write_log, NULL);
+
eventer_loop();
return 0;
}
Modified: trunk/src/stratcon.conf
===================================================================
--- trunk/src/stratcon.conf 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/stratcon.conf 2008-07-01 14:15:44 UTC (rev 316)
@@ -60,6 +60,12 @@
VALUES ('epoch'::timestamptz + ($1 || ' seconds')::interval,
stratcon.generate_sid_from_id($2), $3, $4)
]]></metric_text>
+ <config><![CDATA[
+ SELECT stratcon.update_config
+ ($1, $2,
+ 'epoch'::timestamptz + ($3 || ' seconds')::interval,
+ $4 )
+ ]]></config>
</statements>
</database>
Modified: trunk/src/stratcon_datastore.c
===================================================================
--- trunk/src/stratcon_datastore.c 2008-07-01 14:14:22 UTC (rev 315)
+++ trunk/src/stratcon_datastore.c 2008-07-01 14:15:44 UTC (rev 316)
@@ -6,6 +6,7 @@
#include "noit_defines.h"
#include "eventer/eventer.h"
#include "utils/noit_log.h"
+#include "utils/noit_b64.h"
#include "stratcon_datastore.h"
#include "noit_conf.h"
#include "noit_check.h"
@@ -14,6 +15,7 @@
#include <sys/un.h>
#include <arpa/inet.h>
#include <libpq-fe.h>
+#include <zlib.h>
static char *check_insert = NULL;
static const char *check_insert_conf = "/stratcon/database/statements/check";
@@ -23,6 +25,8 @@
static const char *metric_insert_numeric_conf = "/stratcon/database/statements/metric_numeric";
static char *metric_insert_text = NULL;
static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text";
+static char *config_insert = NULL;
+static const char *config_insert_conf = "/stratcon/database/statements/config";
#define GET_QUERY(a) do { \
if(a == NULL) \
@@ -141,6 +145,8 @@
execute_outcome_t
stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) {
int type, len;
+ char *final_buff;
+ uLong final_len, actual_final_len;;
char *token;
type = d->data[0];
@@ -187,6 +193,42 @@
PROCESS_NEXT_FIELD(token,len); /* Skip the leader, we know what we are */
switch(type) {
/* See noit_check_log.c for log description */
+ case 'n':
+ DECLARE_PARAM_STR(raddr, strlen(raddr));
+ DECLARE_PARAM_STR("noitd",5); /* node_type */
+ PROCESS_NEXT_FIELD(token,len);
+ DECLARE_PARAM_STR(token,len); /* timestamp */
+
+ /* This is the expected uncompressed len */
+ PROCESS_NEXT_FIELD(token,len);
+ final_len = atoi(token);
+ final_buff = malloc(final_len);
+ if(!final_buff) goto bad_row;
+
+ /* The last token is b64 endoded and compressed.
+ * we need to decode it, declare it and then free it.
+ */
+ PROCESS_LAST_FIELD(token, len);
+ /* We can in-place decode this */
+ len = noit_b64_decode((char *)token, len,
+ (unsigned char *)token, len);
+ if(len <= 0) {
+ noitL(noit_error, "noitd config base64 decoding error.\n");
+ goto bad_row;
+ }
+ actual_final_len = final_len;
+ if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len,
+ (unsigned char *)token, len)) {
+ noitL(noit_error, "noitd config decompression failure.\n");
+ goto bad_row;
+ }
+ if(final_len != actual_final_len) {
+ noitL(noit_error, "noitd config decompression error.\n");
+ goto bad_row;
+ }
+ DECLARE_PARAM_STR(final_buff, final_len);
+ free(final_buff);
+ break;
case 'C':
DECLARE_PARAM_STR(raddr, strlen(raddr));
PROCESS_NEXT_FIELD(token,len);
@@ -239,9 +281,10 @@
(const char * const *)d->paramValues, \
d->paramLengths, d->paramFormats, 0); \
rv = PQresultStatus(res); \
- if(rv != PGRES_COMMAND_OK) { \
- noitL(noit_error, "stratcon datasource bad row: %s\n", \
- PQresultErrorMessage(res)); \
+ if(rv != PGRES_COMMAND_OK && \
+ rv != PGRES_TUPLES_OK) { \
+ noitL(noit_error, "stratcon datasource bad row (%d): %s\n", \
+ rv, PQresultErrorMessage(res)); \
PQclear(res); \
goto bad_row; \
} \
@@ -250,6 +293,10 @@
/* Now execute the query */
switch(type) {
+ case 'n':
+ GET_QUERY(config_insert);
+ PG_EXEC(config_insert);
+ break;
case 'C':
GET_QUERY(check_insert);
PG_EXEC(check_insert);
More information about the Reconnoiter-devel
mailing list