Changeset 7afb4e334fa8390d0543fc8d916d5c6b861511a9

Show
Ignore:
Timestamp:
11/14/09 22:44:17 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1258238657 +0000
git-parent:

[260e2d6dfc8995efa1cba9558c2d99aa5f3421c8]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1258238657 +0000
Message:

fixes #219

This is "significant" as it requires adding a module section to
stratcon.conf and not using the <stomp> stanza, but instead using
<mq type="stomp">. I've tested it with ActiveMQ and RabbitMQ and
both work fine.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/Makefile.in

    r3f0391d r7afb4e3  
    4848        noit_console_complete.o noit_xml.o \ 
    4949        noit_conf.o noit_http.o noit_rest.o noit_tokenizer.o \ 
    50         noit_capabilities_listener.o
     50        noit_capabilities_listener.o noit_module.o
    5151        stratcon_realtime_http.o \ 
    5252        stratcon_jlog_streamer.o stratcon_datastore.o \ 
     
    9898                -Lstomp -lstomp \ 
    9999                $(NOWHOLE_ARCHIVE) \ 
    100                 $(LIBS) $(PGLIBS) @APRLIBS@ 
     100                $(LIBS) $(PGLIBS) 
    101101        @echo "- linking $@" 
    102102 
    103103stratcon_datastore.o:   stratcon_datastore.c 
    104104        @$(CC) $(CPPFLAGS) $(PGCFLAGS) -c $< 
    105         @echo "- compiling $<" 
    106  
    107 stratcon_iep.o: stratcon_iep.c 
    108         @$(CC) $(CPPFLAGS) $(CFLAGS) @APRCFLAGS@ -c $< 
    109105        @echo "- compiling $<" 
    110106 
  • src/java/com/omniti/reconnoiter/StratconConfig.java

    r2d69141 r7afb4e3  
    107107  } 
    108108   
    109   public String getStompParameter(String param, String or) { 
    110     String result = getStompParameter(param); 
     109  public String getMQParameter(String param, String or) { 
     110    String result = getMQParameter(param); 
    111111    if (result == null) 
    112112      return or; 
     
    114114  } 
    115115   
    116   public String getStompParameter(String param) { 
    117     return getIepParameter("stomp", param); 
     116  public String getMQParameter(String param) { 
     117    return getIepParameter("mq", param); 
    118118  } 
    119119   
  • src/java/com/omniti/reconnoiter/broker/RabbitBroker.java

    r0335d9d r7afb4e3  
    3838     
    3939    // This is a fanout exchange 
    40     this.exchangeName = config.getStompParameter("exchange", "noit.firehose"); 
     40    this.exchangeName = config.getMQParameter("exchange", "noit.firehose"); 
    4141    // This queue is bound to the fanout exchange 
    42     this.queueName = config.getStompParameter("queue", "noit.firehose"); 
     42    this.queueName = config.getMQParameter("queue", "noit.firehose"); 
    4343    // No need for a routing key on a FO exchange 
    4444    this.routingKey = ""; 
  • src/modules/Makefile.in

    rc13d3a0 r7afb4e3  
    2727        lua.@MODULEEXT@ dns.@MODULEEXT@ selfcheck.@MODULEEXT@ \ 
    2828        external.@MODULEEXT@ collectd.@MODULEEXT@ \ 
     29        stomp_driver.@MODULEEXT@ \ 
    2930        @BUILD_MODULES@ 
    3031 
     
    5556        @$(CC) $(CPPFLAGS) $(SHCFLAGS) $(PGCFLAGS) -c $< -o $@ 
    5657        @echo "- compiling $<" 
     58 
     59stomp_driver.lo:        stomp_driver.c 
     60        @$(CC) $(CPPFLAGS) $(SHCFLAGS) @APRCFLAGS@ -c $< -o $@ 
     61        @echo "- compiling $<" 
     62 
     63libstomp.lo:    libstomp.c 
     64        @$(CC) $(CPPFLAGS) $(SHCFLAGS) @APRCFLAGS@ -c $< -o $@ 
     65        @echo "- compiling $<" 
     66 
     67stomp_driver.@MODULEEXT@:       stomp_driver.lo libstomp.lo 
     68        @$(MODULELD) $(LDFLAGS) -o $@ stomp_driver.lo libstomp.lo @APRLIBS@ 
     69        @echo "- linking $@" 
    5770 
    5871mysql.@MODULEEXT@:      mysql.lo 
  • src/noit_module.c

    r5b7de23 r7afb4e3  
    135135  noit_image_t *obj; 
    136136 
    137   if(!noit_conf_get_string(NULL, "/noit/modules/@directory", &base)) 
     137  if(!noit_conf_get_string(NULL, "//modules/@directory", &base)) 
    138138    base = strdup(""); 
    139139 
     
    380380 
    381381  /* Load our generic modules */ 
    382   sections = noit_conf_get_sections(NULL, "/noit/modules//generic", &cnt); 
     382  sections = noit_conf_get_sections(NULL, "//modules//generic", &cnt); 
    383383  for(i=0; i<cnt; i++) { 
    384384    char g_name[256]; 
     
    417417  if(sections) free(sections); 
    418418  /* Load our module loaders */ 
    419   sections = noit_conf_get_sections(NULL, "/noit/modules//loader", &cnt); 
     419  sections = noit_conf_get_sections(NULL, "//modules//loader", &cnt); 
    420420  for(i=0; i<cnt; i++) { 
    421421    char loader_name[256]; 
     
    452452  if(sections) free(sections); 
    453453 
    454   /* Load the modules */ 
     454  /* Load the modules (these *are* specific to the /noit/ root) */ 
    455455  sections = noit_conf_get_sections(NULL, "/noit/modules//module", &cnt); 
    456456  if(!sections) return; 
  • src/stratcon.conf.in

    rd2dd72f r7afb4e3  
    1313    </console_output> 
    1414  </logs> 
     15 
     16  <modules directory="%modulesdir%"> 
     17    <generic image="stomp_driver" name="stomp_driver"/> 
     18  </modules> 
    1519 
    1620  <noits> 
     
    3236  </noits> 
    3337 
    34   <iep disabled="true"> <!-- false the default --
     38  <iep disabled="false"
    3539    <start directory="%iepdbdir%" 
    3640           command="%iepbindir%/run-iep.sh" /> 
     41    <mq type="stomp"> 
     42      <!-- (for RabbitMQ, leave type="stomp" in the line above, but set the following) 
     43      <username>guest</username> 
     44      <password>guest</password> 
     45      <exchange>noit.firehose</exchange> 
     46      --> 
     47    </mq> 
     48    <!-- For RabbitMQ  
     49    <broker adapter="rabbitmq" /> 
     50    --> 
    3751    <queries> 
    3852      <statement id="6cc613a4-7f9c-11de-973f-db7e8ccb2e5c" provides="CheckDetails-ddl"> 
  • src/stratcon_iep.c

    rcb991cc r7afb4e3  
    5050#include <sys/filio.h> 
    5151#endif 
     52#include <signal.h> 
     53#include <errno.h> 
    5254#include <assert.h> 
    53 #include "stomp/stomp.h" 
    5455 
    5556eventer_jobq_t iep_jobq; 
     
    5758static noit_spinlock_t iep_conn_cnt = 0; 
    5859 
    59 struct iep_thread_driver { 
    60   stomp_connection *connection; 
    61   apr_pool_t *pool; 
    62   char* exchange; 
    63 }; 
    64 pthread_key_t iep_connection; 
     60static pthread_key_t iep_connection; 
     61static noit_hash_table mq_drivers = NOIT_HASH_EMPTY; 
     62static mq_driver_t *mq_driver = NULL; 
    6563 
    6664struct iep_job_closure { 
     
    6866  char *remote; 
    6967  char *doc_str; 
    70   apr_pool_t *pool; 
    7168}; 
    7269 
    7370static void 
    7471start_iep_daemon(); 
    75  
    7672 
    7773static float 
     
    313309static 
    314310struct iep_thread_driver *stratcon_iep_get_connection() { 
    315   apr_status_t rc; 
     311  int rc; 
    316312  struct iep_thread_driver *driver; 
    317313  driver = pthread_getspecific(iep_connection); 
    318314  if(!driver) { 
    319     driver = calloc(1, sizeof(*driver)); 
     315    driver = mq_driver->allocate(); 
    320316    pthread_setspecific(iep_connection, driver); 
    321317  } 
    322318 
    323   if(!driver->pool) { 
    324     if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL; 
    325   } 
    326  
    327   if(!driver->connection) { 
    328     int port; 
    329     char hostname[128]; 
    330     if(!noit_conf_get_int(NULL, "/stratcon/iep/stomp/port", &port)) 
    331       port = 61613; 
    332     if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/hostname", 
    333                                 hostname, sizeof(hostname))) 
    334       strlcpy(hostname, "127.0.0.1", sizeof(hostname)); 
    335     if(stomp_connect(&driver->connection, hostname, port, 
    336                      driver->pool)!= APR_SUCCESS) { 
    337       noitL(noit_error, "MQ connection failed\n"); 
    338       stomp_disconnect(&driver->connection); 
    339       return NULL; 
    340     } 
    341  
    342     { 
    343       stomp_frame frame; 
    344       char username[128]; 
    345       char password[128]; 
    346       char* exchange = malloc(128); 
    347       frame.command = "CONNECT"; 
    348       frame.headers = apr_hash_make(driver->pool); 
    349       // This is for RabbitMQ Support 
    350       if((noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/username", 
    351                                   username, sizeof(username))) && 
    352          (noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/password", 
    353                                   password, sizeof(password)))) 
    354       { 
    355         apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, username); 
    356         apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, password); 
    357       } 
    358  
    359  
    360       // This is for RabbitMQ support 
    361       driver->exchange = NULL; 
    362       if(noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/exchange", 
    363                                   exchange, 128)) 
    364       { 
    365         if (!driver->exchange) 
    366           driver->exchange = exchange; 
    367         else 
    368           free(exchange); 
    369         apr_hash_set(frame.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
    370       } 
    371  
    372       frame.body = NULL; 
    373       frame.body_length = -1; 
    374       rc = stomp_write(driver->connection, &frame, driver->pool); 
    375       if(rc != APR_SUCCESS) { 
    376         noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc); 
    377         stomp_disconnect(&driver->connection); 
    378         return NULL; 
    379       } 
    380     }   
     319  rc = mq_driver->connect(driver); 
     320  if(rc < 0) return NULL; 
     321  if(rc == 0) { 
     322    /* Initial connect */ 
     323    /* TODO: this should be requested by Esper, not blindly pushed */ 
    381324    stratcon_iep_submit_statements(); 
    382325    stratcon_datastore_iep_check_preload(); 
     
    423366      if(job->remote) free(job->remote); 
    424367      if(job->doc_str) free(job->doc_str); 
    425       if(job->pool) apr_pool_destroy(job->pool); 
    426368      free(job); 
    427369    } 
     
    438380  } 
    439381  /* Submit */ 
    440   if(driver && driver->pool && driver->connection) { 
    441     apr_status_t rc; 
     382  if(driver) { 
    442383    int line_len = strlen(job->line); 
    443384    int remote_len = strlen(job->remote); 
    444     stomp_frame out; 
    445385 
    446386    job->doc_str = (char*)calloc(line_len + 1 /* \t */ + 
     
    451391    strncat(job->doc_str, job->line + 2, line_len - 2); 
    452392 
    453     apr_pool_create(&job->pool, driver->pool); 
    454  
    455     out.command = "SEND"; 
    456     out.headers = apr_hash_make(job->pool); 
    457     if (driver->exchange) 
    458       apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
    459  
    460     apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 
    461     apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 
    462    
    463     out.body_length = -1; 
    464     out.body = job->doc_str; 
    465     rc = stomp_write(driver->connection, &out, job->pool); 
    466     if(rc != APR_SUCCESS) { 
    467       noitL(noit_error, "STOMP send failed, disconnecting\n"); 
    468       if(driver->connection) stomp_disconnect(&driver->connection); 
    469       driver->connection = NULL; 
     393    /* Don't need to catch error here, next submit will catch it */ 
     394    if(mq_driver->submit(driver, job->doc_str, line_len + remote_len + 1) != 0) { 
     395      noitL(noit_debug, "failed to MQ submit.\n"); 
    470396    } 
    471397  } 
     
    529455static void connection_destroy(void *vd) { 
    530456  struct iep_thread_driver *driver = vd; 
    531   if(driver->connection) stomp_disconnect(&driver->connection); 
    532   if(driver->exchange) free(driver->exchange); 
    533   if(driver->pool) apr_pool_destroy(driver->pool); 
    534   free(driver); 
     457  mq_driver->disconnect(driver); 
     458  mq_driver->deallocate(driver); 
    535459} 
    536460 
     
    662586  info = NULL; 
    663587 
    664   /* This will induce a stomp connection which will initialize esper */ 
    665588  setup_iep_connection_later(1); 
    666589 
     
    677600 
    678601void 
     602stratcon_iep_mq_driver_register(const char *name, mq_driver_t *d) { 
     603  noit_hash_replace(&mq_drivers, strdup(name), strlen(name), d, free, NULL); 
     604} 
     605 
     606void 
    679607stratcon_iep_init() { 
    680608  noit_boolean disabled = noit_false; 
    681   apr_initialize()
    682   atexit(apr_terminate);    
     609  char mq_type[128] = "stomp"
     610  void *vdriver; 
    683611 
    684612  if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) && 
     
    687615    return; 
    688616  } 
     617 
     618  if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/mq/@type", 
     619                              mq_type, sizeof(mq_type))) { 
     620    noitL(noit_error, "You must specify an <mq type=\"...\"> that is valid.\n"); 
     621    exit(-2); 
     622  } 
     623  if(!noit_hash_retrieve(&mq_drivers, mq_type, strlen(mq_type), &vdriver) || 
     624     vdriver == NULL) { 
     625    noitL(noit_error, "Cannot find MQ driver type: %s\n", mq_type); 
     626    noitL(noit_error, "Did you forget to load a module?\n"); 
     627    exit(-2); 
     628  } 
     629  mq_driver = (mq_driver_t *)vdriver; 
    689630 
    690631  noit_iep = noit_log_stream_find("error/iep"); 
  • src/stratcon_iep.h

    r5360a1e r7afb4e3  
    5454  stratcon_jlog_streamer_iep_ctx_alloc(void); 
    5555 
     56typedef struct iep_thread_driver iep_thread_driver_t; 
     57 
     58typedef struct mq_driver { 
     59  iep_thread_driver_t *(*allocate)(); 
     60 
     61  int (*connect)(iep_thread_driver_t *driver); 
     62  /* connect returns: 
     63      -1 for failure, 
     64       0 for successful new connection, 
     65       1 for already connected 
     66   */ 
     67 
     68  int (*submit)(iep_thread_driver_t *driver, const char *payload, size_t payloadlen); 
     69  /* submit returns: 0 on success, -1 on failure */ 
     70 
     71  int (*disconnect)(iep_thread_driver_t *driver); 
     72  /* disconnect returns: 
     73      -1 for already disconnected 
     74       0 successful discnonect 
     75   */ 
     76 
     77  void (*deallocate)(iep_thread_driver_t *driver); 
     78} mq_driver_t; 
     79 
     80API_EXPORT(void) 
     81  stratcon_iep_mq_driver_register(const char *, mq_driver_t *); 
     82 
    5683#endif 
  • src/stratcond.c

    rbdb891c r7afb4e3  
    195195  noit_listener_init(APPNAME); 
    196196 
     197  noit_module_init(); 
     198 
    197199  /* Drop privileges */ 
    198200  if(chrootpath && noit_security_chroot(chrootpath)) {