Changeset 70451c561661f38403852e3585f41976f6a42843

Show
Ignore:
Timestamp:
07/01/08 14:15:44 (10 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1214921744 +0000
git-parent:

[e0795d09a594dd276dd59643a85230388401fb06]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1214921744 +0000
Message:

implements configuration pushing into the DB. refs #26

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/noit.conf

    rbee4ff5 r70451c5  
    3737      <log name="status"/> 
    3838      <log name="metrics"/> 
     39      <log name="config"/> 
    3940    </feeds> 
    4041  </logs> 
  • src/noit_check_log.c

    r8d0941e r70451c5  
    1616/* Log format is tab delimited: 
    1717 * NOIT CONFIG (implemented in noit_conf.c): 
    18  *  'n' TIMESTAMP base64(gzip(xmlconfig)) 
     18 *  'n' TIMESTAMP strlen(xmlconfig) base64(gzip(xmlconfig)) 
    1919 * 
    2020 * CHECK: 
  • src/noit_conf.c

    r2ccf5eb r70451c5  
    1414#include <libxml/tree.h> 
    1515#include <libxml/xpath.h> 
     16#include <zlib.h> 
    1617 
    1718#include "noit_conf.h" 
     
    2021#include "utils/noit_hash.h" 
    2122#include "utils/noit_log.h" 
     23#include "utils/noit_b64.h" 
    2224 
    2325/* tmp hash impl, replace this with something nice */ 
     
    2628static char master_config_file[PATH_MAX] = ""; 
    2729static xmlXPathContextPtr xpath_ctxt = NULL; 
     30 
     31/* This is used to notice config changes and journal the config out 
     32 * using a user-specified function.  It supports allowing multiple config 
     33 * changed to coalesce so you don't write out 1000 changes in a few seconds. 
     34 */ 
     35static u_int32_t __config_gen = 0; 
     36static u_int32_t __config_coalesce = 0; 
     37static u_int32_t __config_coalesce_time = 0; 
     38void noit_conf_coalesce_changes(u_int32_t seconds) { 
     39  __config_coalesce_time = seconds; 
     40} 
     41void noit_conf_mark_changed() { 
     42  /* increment the change counter -- in case anyone cares */ 
     43  __config_gen++; 
     44  /* reset the coalesce counter.  It is decremented each second and 
     45   * the journal function fires on a transition from 1 => 0 
     46   */ 
     47  __config_coalesce = __config_coalesce_time; 
     48} 
     49struct recurrent_journaler { 
     50  int (*journal_config)(void *); 
     51  void *jc_closure; 
     52}; 
     53static int 
     54noit_conf_watch_config_and_journal(eventer_t e, int mask, void *closure, 
     55                                   struct timeval *now) { 
     56  struct recurrent_journaler *rj = closure; 
     57  eventer_t newe; 
     58 
     59  if(__config_coalesce == 1) 
     60    rj->journal_config(rj->jc_closure); 
     61  if(__config_coalesce > 0) 
     62    __config_coalesce--; 
     63 
     64  /* Schedule the same event to fire a second form now */ 
     65  newe = eventer_alloc(); 
     66  gettimeofday(&newe->whence, NULL); 
     67  newe->whence.tv_sec += 1; 
     68  newe->mask = EVENTER_TIMER; 
     69  newe->callback = noit_conf_watch_config_and_journal; 
     70  newe->closure = closure; 
     71  eventer_add(newe); 
     72  return 0; 
     73} 
     74void 
     75noit_conf_watch_and_journal_watchdog(int (*f)(void *), void *c) { 
     76  struct recurrent_journaler *rj; 
     77  struct timeval __now; 
     78  rj = calloc(1, sizeof(*rj)); 
     79  rj->journal_config = f; 
     80  rj->jc_closure = c; 
     81  gettimeofday(&__now, NULL); 
     82  noit_conf_watch_config_and_journal(NULL, EVENTER_TIMER, rj, &__now); 
     83} 
    2884 
    2985static noit_hash_table _compiled_fallback = NOIT_HASH_EMPTY; 
     
    95151    xpath_ctxt = xmlXPathNewContext(master_config); 
    96152    if(path != master_config_file) realpath(path, master_config_file); 
     153    noit_conf_mark_changed(); 
    97154    return 0; 
    98155  } 
     
    400457} 
    401458 
     459struct config_line_vstr { 
     460  char *buff; 
     461  int raw_len; 
     462  int len; 
     463  int allocd; 
     464  enum { CONFIG_RAW = 0, CONFIG_COMPRESSED, CONFIG_B64 } encoded; 
     465}; 
     466static int 
     467noit_config_log_write_xml(void *vstr, const char *buffer, int len) { 
     468  struct config_line_vstr *clv = vstr; 
     469  assert(clv->encoded == CONFIG_RAW); 
     470  if(!clv->buff) { 
     471    clv->allocd = 8192; 
     472    clv->buff = malloc(clv->allocd); 
     473  } 
     474  while(len + clv->len > clv->allocd) { 
     475    char *newbuff; 
     476    int newsize = clv->allocd; 
     477    newsize <<= 1; 
     478    newbuff = realloc(clv->buff, newsize); 
     479    if(!newbuff) { 
     480      return -1; 
     481    } 
     482    clv->allocd = newsize; 
     483    clv->buff = newbuff; 
     484  } 
     485  memcpy(clv->buff + clv->len, buffer, len); 
     486  clv->len += len; 
     487  return len; 
     488} 
     489static int 
     490noit_config_log_close_xml(void *vstr) { 
     491  struct config_line_vstr *clv = vstr; 
     492  uLong initial_dlen, dlen; 
     493  char *compbuff, *b64buff; 
     494 
     495  if(clv->buff == NULL) { 
     496    clv->encoded = CONFIG_B64; 
     497    return 0; 
     498  } 
     499  clv->raw_len = clv->len; 
     500  assert(clv->encoded == CONFIG_RAW); 
     501  /* Compress */ 
     502  initial_dlen = dlen = compressBound(clv->len); 
     503  compbuff = malloc(initial_dlen); 
     504  if(!compbuff) return -1; 
     505  if(Z_OK != compress2((Bytef *)compbuff, &dlen, 
     506                       (Bytef *)clv->buff, clv->len, 9)) { 
     507    noitL(noit_error, "Error compressing config for transmission.\n"); 
     508    free(compbuff); 
     509    return -1; 
     510  } 
     511  free(clv->buff); 
     512  clv->buff = compbuff; 
     513  clv->allocd = initial_dlen; 
     514  clv->len = dlen; 
     515  clv->encoded = CONFIG_COMPRESSED; 
     516  /* Encode */ 
     517  initial_dlen = ((clv->len + 2) / 3) * 4; 
     518  b64buff = malloc(initial_dlen); 
     519  dlen = noit_b64_encode((unsigned char *)clv->buff, clv->len, 
     520                         b64buff, initial_dlen); 
     521  if(dlen == 0) { 
     522    free(b64buff); 
     523    return -1; 
     524  } 
     525  free(clv->buff); 
     526  clv->buff = b64buff; 
     527  clv->allocd = initial_dlen; 
     528  clv->len = dlen; 
     529  clv->encoded = CONFIG_B64; 
     530  return 0; 
     531} 
     532 
    402533int 
    403534noit_conf_reload(noit_console_closure_t ncct, 
     
    459590  } 
    460591  nc_printf(ncct, "%d bytes written.\n", len); 
     592  return 0; 
     593} 
     594int 
     595noit_conf_write_log() { 
     596  static u_int32_t last_write_gen = 0; 
     597  static noit_log_stream_t config_log = NULL; 
     598  struct timeval __now; 
     599  xmlOutputBufferPtr out; 
     600  xmlCharEncodingHandlerPtr enc; 
     601  struct config_line_vstr *clv; 
     602  SETUP_LOG(config, return -1); 
     603 
     604  /* We know we haven't changed */ 
     605  if(last_write_gen == __config_gen) return 0; 
     606 
     607  gettimeofday(&__now, NULL); 
     608  clv = calloc(1, sizeof(*clv)); 
     609  enc = xmlGetCharEncodingHandler(XML_CHAR_ENCODING_UTF8); 
     610  out = xmlOutputBufferCreateIO(noit_config_log_write_xml, 
     611                                noit_config_log_close_xml, 
     612                                clv, enc); 
     613  xmlSaveFormatFileTo(out, master_config, "utf8", 1); 
     614  if(clv->encoded != CONFIG_B64) { 
     615    noitL(noit_error, "Error logging configuration\n"); 
     616    if(clv->buff) free(clv->buff); 
     617    free(clv); 
     618    return -1; 
     619  } 
     620  noitL(config_log, "n\t%lu.%03lu\t%d\t%.*s\n", 
     621        __now.tv_sec, __now.tv_usec / 1000UL, clv->raw_len, 
     622        clv->len, clv->buff); 
     623  free(clv->buff); 
     624  free(clv); 
     625  last_write_gen = __config_gen; 
    461626  return 0; 
    462627} 
  • src/noit_conf.h

    r0709063 r70451c5  
    2222  char prompt[50]; 
    2323} noit_conf_t_userdata_t; 
     24 
     25/* seconds == 0 disable config journaling watchdog */ 
     26API_EXPORT(void) noit_conf_coalesce_changes(u_int32_t seconds); 
     27/* Start the watchdog */ 
     28API_EXPORT(void) noit_conf_watch_and_journal_watchdog(int (*f)(void *), void *c); 
     29 
     30/* marks the config as changed.. if you manipulate the XML tree in any way 
     31 * you must call this function to "let the system know."  This is used 
     32 * to notice changes which are in turn flushed out. 
     33 */ 
     34API_EXPORT(void) noit_conf_mark_changed(); 
    2435 
    2536API_EXPORT(void) noit_conf_init(const char *toplevel); 
     
    7384                       noit_console_state_t *state, void *closure); 
    7485 
     86API_EXPORT(int) 
     87  noit_conf_write_log(); 
     88 
    7589API_EXPORT(void) noit_conf_log_init(const char *toplevel); 
    7690 
  • src/noit_conf_checks.c

    r5e653fe r70451c5  
    176176    if(val) 
    177177      xmlSetProp(node, (xmlChar *)attrinfo->name, (xmlChar *)val); 
     178    noit_conf_mark_changed(); 
    178179  } 
    179180  return error; 
     
    213214      xmlUnlinkNode(newnode); 
    214215    } 
    215     else 
     216    else { 
     217      noit_conf_mark_changed(); 
    216218      rv = 0; 
     219    } 
    217220  } 
    218221 out: 
     
    483486        xmlUnlinkNode(node); 
    484487      } 
     488      noit_conf_mark_changed(); 
    485489    } 
    486490  } 
     
    585589    node = (noit_conf_section_t)xmlXPathNodeSetItem(pobj->nodesetval, 0); 
    586590    xmlUnlinkNode(node); 
     591    noit_conf_mark_changed(); 
    587592    return 0; 
    588593  } 
     
    599604  node = (noit_conf_section_t)xmlXPathNodeSetItem(pobj->nodesetval, 0); 
    600605  if((newnode = xmlNewChild(node, NULL, (xmlChar *)argv[0], NULL)) != NULL) { 
     606    noit_conf_mark_changed(); 
    601607    if(info->path) free(info->path); 
    602608    info->path = strdup((char *)xmlGetNodePath(newnode) + strlen("/noit")); 
     
    10091015    /* Now we create a child */ 
    10101016    xmlNewChild(confignode, NULL, (xmlChar *)name, (xmlChar *)value); 
    1011      
    1012   } 
     1017  } 
     1018  noit_conf_mark_changed(); 
    10131019  rv = 0; 
    10141020 out: 
     
    10811087  if(value) 
    10821088    xmlSetProp(node, (xmlChar *)attrinfo->name, (xmlChar *)value); 
     1089  noit_conf_mark_changed(); 
    10831090  rv = 0; 
    10841091 out: 
  • src/noitd.c

    rf921b22 r70451c5  
    110110  noit_listener_init(APPNAME); 
    111111 
     112  /* Write our log out, and setup a watchdog to write it out on change. */ 
     113  noit_conf_write_log(); 
     114  noit_conf_coalesce_changes(10); /* 10 seconds of no changes before we write */ 
     115  noit_conf_watch_and_journal_watchdog(noit_conf_write_log, NULL); 
     116 
    112117  eventer_loop(); 
    113118  return 0; 
  • src/stratcon.conf

    r4fec84d r70451c5  
    6161                     stratcon.generate_sid_from_id($2), $3, $4) 
    6262      ]]></metric_text> 
     63      <config><![CDATA[ 
     64        SELECT stratcon.update_config 
     65               ($1, $2,  
     66                'epoch'::timestamptz + ($3 || ' seconds')::interval, 
     67                $4 ) 
     68      ]]></config> 
    6369    </statements> 
    6470  </database> 
  • src/stratcon_datastore.c

    r2bdd297 r70451c5  
    77#include "eventer/eventer.h" 
    88#include "utils/noit_log.h" 
     9#include "utils/noit_b64.h" 
    910#include "stratcon_datastore.h" 
    1011#include "noit_conf.h" 
     
    1516#include <arpa/inet.h> 
    1617#include <libpq-fe.h> 
     18#include <zlib.h> 
    1719 
    1820static char *check_insert = NULL; 
     
    2426static char *metric_insert_text = NULL; 
    2527static const char *metric_insert_text_conf = "/stratcon/database/statements/metric_text"; 
     28static char *config_insert = NULL; 
     29static const char *config_insert_conf = "/stratcon/database/statements/config"; 
    2630 
    2731#define GET_QUERY(a) do { \ 
     
    142146stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) { 
    143147  int type, len; 
     148  char *final_buff; 
     149  uLong final_len, actual_final_len;; 
    144150  char *token; 
    145151 
     
    188194    switch(type) { 
    189195      /* See noit_check_log.c for log description */ 
     196      case 'n': 
     197        DECLARE_PARAM_STR(raddr, strlen(raddr)); 
     198        DECLARE_PARAM_STR("noitd",5); /* node_type */ 
     199        PROCESS_NEXT_FIELD(token,len); 
     200        DECLARE_PARAM_STR(token,len); /* timestamp */ 
     201 
     202        /* This is the expected uncompressed len */ 
     203        PROCESS_NEXT_FIELD(token,len); 
     204        final_len = atoi(token); 
     205        final_buff = malloc(final_len); 
     206        if(!final_buff) goto bad_row; 
     207   
     208        /* The last token is b64 endoded and compressed. 
     209         * we need to decode it, declare it and then free it. 
     210         */ 
     211        PROCESS_LAST_FIELD(token, len); 
     212        /* We can in-place decode this */ 
     213        len = noit_b64_decode((char *)token, len, 
     214                              (unsigned char *)token, len); 
     215        if(len <= 0) { 
     216          noitL(noit_error, "noitd config base64 decoding error.\n"); 
     217          goto bad_row; 
     218        } 
     219        actual_final_len = final_len; 
     220        if(Z_OK != uncompress((Bytef *)final_buff, &actual_final_len, 
     221                              (unsigned char *)token, len)) { 
     222          noitL(noit_error, "noitd config decompression failure.\n"); 
     223          goto bad_row; 
     224        } 
     225        if(final_len != actual_final_len) { 
     226          noitL(noit_error, "noitd config decompression error.\n"); 
     227          goto bad_row; 
     228        } 
     229        DECLARE_PARAM_STR(final_buff, final_len); 
     230        free(final_buff); 
     231        break; 
    190232      case 'C': 
    191233        DECLARE_PARAM_STR(raddr, strlen(raddr)); 
     
    240282                     d->paramLengths, d->paramFormats, 0); \ 
    241283  rv = PQresultStatus(res); \ 
    242   if(rv != PGRES_COMMAND_OK) { \ 
    243     noitL(noit_error, "stratcon datasource bad row: %s\n", \ 
    244           PQresultErrorMessage(res)); \ 
     284  if(rv != PGRES_COMMAND_OK && \ 
     285     rv != PGRES_TUPLES_OK) { \ 
     286    noitL(noit_error, "stratcon datasource bad row (%d): %s\n", \ 
     287          rv, PQresultErrorMessage(res)); \ 
    245288    PQclear(res); \ 
    246289    goto bad_row; \ 
     
    251294  /* Now execute the query */ 
    252295  switch(type) { 
     296    case 'n': 
     297      GET_QUERY(config_insert); 
     298      PG_EXEC(config_insert); 
     299      break; 
    253300    case 'C': 
    254301      GET_QUERY(check_insert);