Show
Ignore:
Timestamp:
11/14/09 22:44:17 (9 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1258238657 +0000
git-parent:

[260e2d6dfc8995efa1cba9558c2d99aa5f3421c8]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1258238657 +0000
Message:

fixes #219

This is "significant" as it requires adding a module section to
stratcon.conf and not using the <stomp> stanza, but instead using
<mq type="stomp">. I've tested it with ActiveMQ and RabbitMQ and
both work fine.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon_iep.c

    rcb991cc r7afb4e3  
    5050#include <sys/filio.h> 
    5151#endif 
     52#include <signal.h> 
     53#include <errno.h> 
    5254#include <assert.h> 
    53 #include "stomp/stomp.h" 
    5455 
    5556eventer_jobq_t iep_jobq; 
     
    5758static noit_spinlock_t iep_conn_cnt = 0; 
    5859 
    59 struct iep_thread_driver { 
    60   stomp_connection *connection; 
    61   apr_pool_t *pool; 
    62   char* exchange; 
    63 }; 
    64 pthread_key_t iep_connection; 
     60static pthread_key_t iep_connection; 
     61static noit_hash_table mq_drivers = NOIT_HASH_EMPTY; 
     62static mq_driver_t *mq_driver = NULL; 
    6563 
    6664struct iep_job_closure { 
     
    6866  char *remote; 
    6967  char *doc_str; 
    70   apr_pool_t *pool; 
    7168}; 
    7269 
    7370static void 
    7471start_iep_daemon(); 
    75  
    7672 
    7773static float 
     
    313309static 
    314310struct iep_thread_driver *stratcon_iep_get_connection() { 
    315   apr_status_t rc; 
     311  int rc; 
    316312  struct iep_thread_driver *driver; 
    317313  driver = pthread_getspecific(iep_connection); 
    318314  if(!driver) { 
    319     driver = calloc(1, sizeof(*driver)); 
     315    driver = mq_driver->allocate(); 
    320316    pthread_setspecific(iep_connection, driver); 
    321317  } 
    322318 
    323   if(!driver->pool) { 
    324     if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL; 
    325   } 
    326  
    327   if(!driver->connection) { 
    328     int port; 
    329     char hostname[128]; 
    330     if(!noit_conf_get_int(NULL, "/stratcon/iep/stomp/port", &port)) 
    331       port = 61613; 
    332     if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/hostname", 
    333                                 hostname, sizeof(hostname))) 
    334       strlcpy(hostname, "127.0.0.1", sizeof(hostname)); 
    335     if(stomp_connect(&driver->connection, hostname, port, 
    336                      driver->pool)!= APR_SUCCESS) { 
    337       noitL(noit_error, "MQ connection failed\n"); 
    338       stomp_disconnect(&driver->connection); 
    339       return NULL; 
    340     } 
    341  
    342     { 
    343       stomp_frame frame; 
    344       char username[128]; 
    345       char password[128]; 
    346       char* exchange = malloc(128); 
    347       frame.command = "CONNECT"; 
    348       frame.headers = apr_hash_make(driver->pool); 
    349       // This is for RabbitMQ Support 
    350       if((noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/username", 
    351                                   username, sizeof(username))) && 
    352          (noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/password", 
    353                                   password, sizeof(password)))) 
    354       { 
    355         apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, username); 
    356         apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, password); 
    357       } 
    358  
    359  
    360       // This is for RabbitMQ support 
    361       driver->exchange = NULL; 
    362       if(noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/exchange", 
    363                                   exchange, 128)) 
    364       { 
    365         if (!driver->exchange) 
    366           driver->exchange = exchange; 
    367         else 
    368           free(exchange); 
    369         apr_hash_set(frame.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
    370       } 
    371  
    372       frame.body = NULL; 
    373       frame.body_length = -1; 
    374       rc = stomp_write(driver->connection, &frame, driver->pool); 
    375       if(rc != APR_SUCCESS) { 
    376         noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc); 
    377         stomp_disconnect(&driver->connection); 
    378         return NULL; 
    379       } 
    380     }   
     319  rc = mq_driver->connect(driver); 
     320  if(rc < 0) return NULL; 
     321  if(rc == 0) { 
     322    /* Initial connect */ 
     323    /* TODO: this should be requested by Esper, not blindly pushed */ 
    381324    stratcon_iep_submit_statements(); 
    382325    stratcon_datastore_iep_check_preload(); 
     
    423366      if(job->remote) free(job->remote); 
    424367      if(job->doc_str) free(job->doc_str); 
    425       if(job->pool) apr_pool_destroy(job->pool); 
    426368      free(job); 
    427369    } 
     
    438380  } 
    439381  /* Submit */ 
    440   if(driver && driver->pool && driver->connection) { 
    441     apr_status_t rc; 
     382  if(driver) { 
    442383    int line_len = strlen(job->line); 
    443384    int remote_len = strlen(job->remote); 
    444     stomp_frame out; 
    445385 
    446386    job->doc_str = (char*)calloc(line_len + 1 /* \t */ + 
     
    451391    strncat(job->doc_str, job->line + 2, line_len - 2); 
    452392 
    453     apr_pool_create(&job->pool, driver->pool); 
    454  
    455     out.command = "SEND"; 
    456     out.headers = apr_hash_make(job->pool); 
    457     if (driver->exchange) 
    458       apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
    459  
    460     apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 
    461     apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 
    462    
    463     out.body_length = -1; 
    464     out.body = job->doc_str; 
    465     rc = stomp_write(driver->connection, &out, job->pool); 
    466     if(rc != APR_SUCCESS) { 
    467       noitL(noit_error, "STOMP send failed, disconnecting\n"); 
    468       if(driver->connection) stomp_disconnect(&driver->connection); 
    469       driver->connection = NULL; 
     393    /* Don't need to catch error here, next submit will catch it */ 
     394    if(mq_driver->submit(driver, job->doc_str, line_len + remote_len + 1) != 0) { 
     395      noitL(noit_debug, "failed to MQ submit.\n"); 
    470396    } 
    471397  } 
     
    529455static void connection_destroy(void *vd) { 
    530456  struct iep_thread_driver *driver = vd; 
    531   if(driver->connection) stomp_disconnect(&driver->connection); 
    532   if(driver->exchange) free(driver->exchange); 
    533   if(driver->pool) apr_pool_destroy(driver->pool); 
    534   free(driver); 
     457  mq_driver->disconnect(driver); 
     458  mq_driver->deallocate(driver); 
    535459} 
    536460 
     
    662586  info = NULL; 
    663587 
    664   /* This will induce a stomp connection which will initialize esper */ 
    665588  setup_iep_connection_later(1); 
    666589 
     
    677600 
    678601void 
     602stratcon_iep_mq_driver_register(const char *name, mq_driver_t *d) { 
     603  noit_hash_replace(&mq_drivers, strdup(name), strlen(name), d, free, NULL); 
     604} 
     605 
     606void 
    679607stratcon_iep_init() { 
    680608  noit_boolean disabled = noit_false; 
    681   apr_initialize()
    682   atexit(apr_terminate);    
     609  char mq_type[128] = "stomp"
     610  void *vdriver; 
    683611 
    684612  if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) && 
     
    687615    return; 
    688616  } 
     617 
     618  if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/mq/@type", 
     619                              mq_type, sizeof(mq_type))) { 
     620    noitL(noit_error, "You must specify an <mq type=\"...\"> that is valid.\n"); 
     621    exit(-2); 
     622  } 
     623  if(!noit_hash_retrieve(&mq_drivers, mq_type, strlen(mq_type), &vdriver) || 
     624     vdriver == NULL) { 
     625    noitL(noit_error, "Cannot find MQ driver type: %s\n", mq_type); 
     626    noitL(noit_error, "Did you forget to load a module?\n"); 
     627    exit(-2); 
     628  } 
     629  mq_driver = (mq_driver_t *)vdriver; 
    689630 
    690631  noit_iep = noit_log_stream_find("error/iep");