Changeset 7afb4e334fa8390d0543fc8d916d5c6b861511a9
- Timestamp:
- 11/14/09 22:44:17 (4 years ago)
- git-parent:
- Files:
-
- src/Makefile.in (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/StratconConfig.java (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/broker/RabbitBroker.java (modified) (1 diff)
- src/modules/Makefile.in (modified) (2 diffs)
- src/modules/libstomp.c (added)
- src/modules/libstomp.h (added)
- src/modules/stomp_driver.c (added)
- src/noit_module.c (modified) (4 diffs)
- src/stomp/Makefile.in (deleted)
- src/stomp/stomp.c (deleted)
- src/stomp/stomp.h (deleted)
- src/stratcon.conf.in (modified) (2 diffs)
- src/stratcon_iep.c (modified) (11 diffs)
- src/stratcon_iep.h (modified) (1 diff)
- src/stratcond.c (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/Makefile.in
r3f0391d r7afb4e3 48 48 noit_console_complete.o noit_xml.o \ 49 49 noit_conf.o noit_http.o noit_rest.o noit_tokenizer.o \ 50 noit_capabilities_listener.o \50 noit_capabilities_listener.o noit_module.o \ 51 51 stratcon_realtime_http.o \ 52 52 stratcon_jlog_streamer.o stratcon_datastore.o \ … … 98 98 -Lstomp -lstomp \ 99 99 $(NOWHOLE_ARCHIVE) \ 100 $(LIBS) $(PGLIBS) @APRLIBS@100 $(LIBS) $(PGLIBS) 101 101 @echo "- linking $@" 102 102 103 103 stratcon_datastore.o: stratcon_datastore.c 104 104 @$(CC) $(CPPFLAGS) $(PGCFLAGS) -c $< 105 @echo "- compiling $<"106 107 stratcon_iep.o: stratcon_iep.c108 @$(CC) $(CPPFLAGS) $(CFLAGS) @APRCFLAGS@ -c $<109 105 @echo "- compiling $<" 110 106 src/java/com/omniti/reconnoiter/StratconConfig.java
r2d69141 r7afb4e3 107 107 } 108 108 109 public String get StompParameter(String param, String or) {110 String result = get StompParameter(param);109 public String getMQParameter(String param, String or) { 110 String result = getMQParameter(param); 111 111 if (result == null) 112 112 return or; … … 114 114 } 115 115 116 public String get StompParameter(String param) {117 return getIepParameter(" stomp", param);116 public String getMQParameter(String param) { 117 return getIepParameter("mq", param); 118 118 } 119 119 src/java/com/omniti/reconnoiter/broker/RabbitBroker.java
r0335d9d r7afb4e3 38 38 39 39 // This is a fanout exchange 40 this.exchangeName = config.get StompParameter("exchange", "noit.firehose");40 this.exchangeName = config.getMQParameter("exchange", "noit.firehose"); 41 41 // This queue is bound to the fanout exchange 42 this.queueName = config.get StompParameter("queue", "noit.firehose");42 this.queueName = config.getMQParameter("queue", "noit.firehose"); 43 43 // No need for a routing key on a FO exchange 44 44 this.routingKey = ""; src/modules/Makefile.in
rc13d3a0 r7afb4e3 27 27 lua.@MODULEEXT@ dns.@MODULEEXT@ selfcheck.@MODULEEXT@ \ 28 28 external.@MODULEEXT@ collectd.@MODULEEXT@ \ 29 stomp_driver.@MODULEEXT@ \ 29 30 @BUILD_MODULES@ 30 31 … … 55 56 @$(CC) $(CPPFLAGS) $(SHCFLAGS) $(PGCFLAGS) -c $< -o $@ 56 57 @echo "- compiling $<" 58 59 stomp_driver.lo: stomp_driver.c 60 @$(CC) $(CPPFLAGS) $(SHCFLAGS) @APRCFLAGS@ -c $< -o $@ 61 @echo "- compiling $<" 62 63 libstomp.lo: libstomp.c 64 @$(CC) $(CPPFLAGS) $(SHCFLAGS) @APRCFLAGS@ -c $< -o $@ 65 @echo "- compiling $<" 66 67 stomp_driver.@MODULEEXT@: stomp_driver.lo libstomp.lo 68 @$(MODULELD) $(LDFLAGS) -o $@ stomp_driver.lo libstomp.lo @APRLIBS@ 69 @echo "- linking $@" 57 70 58 71 mysql.@MODULEEXT@: mysql.lo src/noit_module.c
r5b7de23 r7afb4e3 135 135 noit_image_t *obj; 136 136 137 if(!noit_conf_get_string(NULL, "/ noit/modules/@directory", &base))137 if(!noit_conf_get_string(NULL, "//modules/@directory", &base)) 138 138 base = strdup(""); 139 139 … … 380 380 381 381 /* Load our generic modules */ 382 sections = noit_conf_get_sections(NULL, "/ noit/modules//generic", &cnt);382 sections = noit_conf_get_sections(NULL, "//modules//generic", &cnt); 383 383 for(i=0; i<cnt; i++) { 384 384 char g_name[256]; … … 417 417 if(sections) free(sections); 418 418 /* Load our module loaders */ 419 sections = noit_conf_get_sections(NULL, "/ noit/modules//loader", &cnt);419 sections = noit_conf_get_sections(NULL, "//modules//loader", &cnt); 420 420 for(i=0; i<cnt; i++) { 421 421 char loader_name[256]; … … 452 452 if(sections) free(sections); 453 453 454 /* Load the modules */454 /* Load the modules (these *are* specific to the /noit/ root) */ 455 455 sections = noit_conf_get_sections(NULL, "/noit/modules//module", &cnt); 456 456 if(!sections) return; src/stratcon.conf.in
rd2dd72f r7afb4e3 13 13 </console_output> 14 14 </logs> 15 16 <modules directory="%modulesdir%"> 17 <generic image="stomp_driver" name="stomp_driver"/> 18 </modules> 15 19 16 20 <noits> … … 32 36 </noits> 33 37 34 <iep disabled=" true"> <!-- false the default -->38 <iep disabled="false"> 35 39 <start directory="%iepdbdir%" 36 40 command="%iepbindir%/run-iep.sh" /> 41 <mq type="stomp"> 42 <!-- (for RabbitMQ, leave type="stomp" in the line above, but set the following) 43 <username>guest</username> 44 <password>guest</password> 45 <exchange>noit.firehose</exchange> 46 --> 47 </mq> 48 <!-- For RabbitMQ 49 <broker adapter="rabbitmq" /> 50 --> 37 51 <queries> 38 52 <statement id="6cc613a4-7f9c-11de-973f-db7e8ccb2e5c" provides="CheckDetails-ddl"> src/stratcon_iep.c
rcb991cc r7afb4e3 50 50 #include <sys/filio.h> 51 51 #endif 52 #include <signal.h> 53 #include <errno.h> 52 54 #include <assert.h> 53 #include "stomp/stomp.h"54 55 55 56 eventer_jobq_t iep_jobq; … … 57 58 static noit_spinlock_t iep_conn_cnt = 0; 58 59 59 struct iep_thread_driver { 60 stomp_connection *connection; 61 apr_pool_t *pool; 62 char* exchange; 63 }; 64 pthread_key_t iep_connection; 60 static pthread_key_t iep_connection; 61 static noit_hash_table mq_drivers = NOIT_HASH_EMPTY; 62 static mq_driver_t *mq_driver = NULL; 65 63 66 64 struct iep_job_closure { … … 68 66 char *remote; 69 67 char *doc_str; 70 apr_pool_t *pool;71 68 }; 72 69 73 70 static void 74 71 start_iep_daemon(); 75 76 72 77 73 static float … … 313 309 static 314 310 struct iep_thread_driver *stratcon_iep_get_connection() { 315 apr_status_t rc;311 int rc; 316 312 struct iep_thread_driver *driver; 317 313 driver = pthread_getspecific(iep_connection); 318 314 if(!driver) { 319 driver = calloc(1, sizeof(*driver));315 driver = mq_driver->allocate(); 320 316 pthread_setspecific(iep_connection, driver); 321 317 } 322 318 323 if(!driver->pool) { 324 if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL; 325 } 326 327 if(!driver->connection) { 328 int port; 329 char hostname[128]; 330 if(!noit_conf_get_int(NULL, "/stratcon/iep/stomp/port", &port)) 331 port = 61613; 332 if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/hostname", 333 hostname, sizeof(hostname))) 334 strlcpy(hostname, "127.0.0.1", sizeof(hostname)); 335 if(stomp_connect(&driver->connection, hostname, port, 336 driver->pool)!= APR_SUCCESS) { 337 noitL(noit_error, "MQ connection failed\n"); 338 stomp_disconnect(&driver->connection); 339 return NULL; 340 } 341 342 { 343 stomp_frame frame; 344 char username[128]; 345 char password[128]; 346 char* exchange = malloc(128); 347 frame.command = "CONNECT"; 348 frame.headers = apr_hash_make(driver->pool); 349 // This is for RabbitMQ Support 350 if((noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/username", 351 username, sizeof(username))) && 352 (noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/password", 353 password, sizeof(password)))) 354 { 355 apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, username); 356 apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, password); 357 } 358 359 360 // This is for RabbitMQ support 361 driver->exchange = NULL; 362 if(noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/exchange", 363 exchange, 128)) 364 { 365 if (!driver->exchange) 366 driver->exchange = exchange; 367 else 368 free(exchange); 369 apr_hash_set(frame.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 370 } 371 372 frame.body = NULL; 373 frame.body_length = -1; 374 rc = stomp_write(driver->connection, &frame, driver->pool); 375 if(rc != APR_SUCCESS) { 376 noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc); 377 stomp_disconnect(&driver->connection); 378 return NULL; 379 } 380 } 319 rc = mq_driver->connect(driver); 320 if(rc < 0) return NULL; 321 if(rc == 0) { 322 /* Initial connect */ 323 /* TODO: this should be requested by Esper, not blindly pushed */ 381 324 stratcon_iep_submit_statements(); 382 325 stratcon_datastore_iep_check_preload(); … … 423 366 if(job->remote) free(job->remote); 424 367 if(job->doc_str) free(job->doc_str); 425 if(job->pool) apr_pool_destroy(job->pool);426 368 free(job); 427 369 } … … 438 380 } 439 381 /* Submit */ 440 if(driver && driver->pool && driver->connection) { 441 apr_status_t rc; 382 if(driver) { 442 383 int line_len = strlen(job->line); 443 384 int remote_len = strlen(job->remote); 444 stomp_frame out;445 385 446 386 job->doc_str = (char*)calloc(line_len + 1 /* \t */ + … … 451 391 strncat(job->doc_str, job->line + 2, line_len - 2); 452 392 453 apr_pool_create(&job->pool, driver->pool); 454 455 out.command = "SEND"; 456 out.headers = apr_hash_make(job->pool); 457 if (driver->exchange) 458 apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 459 460 apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 461 apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 462 463 out.body_length = -1; 464 out.body = job->doc_str; 465 rc = stomp_write(driver->connection, &out, job->pool); 466 if(rc != APR_SUCCESS) { 467 noitL(noit_error, "STOMP send failed, disconnecting\n"); 468 if(driver->connection) stomp_disconnect(&driver->connection); 469 driver->connection = NULL; 393 /* Don't need to catch error here, next submit will catch it */ 394 if(mq_driver->submit(driver, job->doc_str, line_len + remote_len + 1) != 0) { 395 noitL(noit_debug, "failed to MQ submit.\n"); 470 396 } 471 397 } … … 529 455 static void connection_destroy(void *vd) { 530 456 struct iep_thread_driver *driver = vd; 531 if(driver->connection) stomp_disconnect(&driver->connection); 532 if(driver->exchange) free(driver->exchange); 533 if(driver->pool) apr_pool_destroy(driver->pool); 534 free(driver); 457 mq_driver->disconnect(driver); 458 mq_driver->deallocate(driver); 535 459 } 536 460 … … 662 586 info = NULL; 663 587 664 /* This will induce a stomp connection which will initialize esper */665 588 setup_iep_connection_later(1); 666 589 … … 677 600 678 601 void 602 stratcon_iep_mq_driver_register(const char *name, mq_driver_t *d) { 603 noit_hash_replace(&mq_drivers, strdup(name), strlen(name), d, free, NULL); 604 } 605 606 void 679 607 stratcon_iep_init() { 680 608 noit_boolean disabled = noit_false; 681 apr_initialize();682 atexit(apr_terminate);609 char mq_type[128] = "stomp"; 610 void *vdriver; 683 611 684 612 if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) && … … 687 615 return; 688 616 } 617 618 if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/mq/@type", 619 mq_type, sizeof(mq_type))) { 620 noitL(noit_error, "You must specify an <mq type=\"...\"> that is valid.\n"); 621 exit(-2); 622 } 623 if(!noit_hash_retrieve(&mq_drivers, mq_type, strlen(mq_type), &vdriver) || 624 vdriver == NULL) { 625 noitL(noit_error, "Cannot find MQ driver type: %s\n", mq_type); 626 noitL(noit_error, "Did you forget to load a module?\n"); 627 exit(-2); 628 } 629 mq_driver = (mq_driver_t *)vdriver; 689 630 690 631 noit_iep = noit_log_stream_find("error/iep"); src/stratcon_iep.h
r5360a1e r7afb4e3 54 54 stratcon_jlog_streamer_iep_ctx_alloc(void); 55 55 56 typedef struct iep_thread_driver iep_thread_driver_t; 57 58 typedef struct mq_driver { 59 iep_thread_driver_t *(*allocate)(); 60 61 int (*connect)(iep_thread_driver_t *driver); 62 /* connect returns: 63 -1 for failure, 64 0 for successful new connection, 65 1 for already connected 66 */ 67 68 int (*submit)(iep_thread_driver_t *driver, const char *payload, size_t payloadlen); 69 /* submit returns: 0 on success, -1 on failure */ 70 71 int (*disconnect)(iep_thread_driver_t *driver); 72 /* disconnect returns: 73 -1 for already disconnected 74 0 successful discnonect 75 */ 76 77 void (*deallocate)(iep_thread_driver_t *driver); 78 } mq_driver_t; 79 80 API_EXPORT(void) 81 stratcon_iep_mq_driver_register(const char *, mq_driver_t *); 82 56 83 #endif src/stratcond.c
rbdb891c r7afb4e3 195 195 noit_listener_init(APPNAME); 196 196 197 noit_module_init(); 198 197 199 /* Drop privileges */ 198 200 if(chrootpath && noit_security_chroot(chrootpath)) {
