Changeset a9077178423e39a94a9b624e44cd4b37899d6fd3
- Timestamp:
- 05/12/09 12:48:35 (9 years ago)
- git-parent:
- Files:
-
- configure.in (modified) (3 diffs)
- src/Makefile.in (modified) (7 diffs)
- src/eventer/eventer_SSL_fd_opset.c (modified) (7 diffs)
- src/eventer/eventer_impl.c (modified) (4 diffs)
- src/eventer/eventer_jobq.c (modified) (2 diffs)
- src/java/Makefile.in (added)
- src/java/com/omniti/reconnoiter/AMQBrokerSingleton.java (added)
- src/java/com/omniti/reconnoiter/AMQListener.java (added)
- src/java/com/omniti/reconnoiter/AMQOutput.java (added)
- src/java/com/omniti/reconnoiter/IEPEngine.java (added)
- src/java/com/omniti/reconnoiter/StratconMessage.java (added)
- src/java/com/omniti/reconnoiter/event/NoitEvent.java (added)
- src/java/com/omniti/reconnoiter/event/StatusEvent.java (added)
- src/java/com/omniti/reconnoiter/event/StratconQuery.java (added)
- src/java/com/omniti/reconnoiter/event/StratconQueryStop.java (added)
- src/java/lib/activemq-all-5.2.0.jar (added)
- src/java/lib/antlr-runtime-3.1.1.jar (added)
- src/java/lib/esper-3.0.0.jar (added)
- src/java/lib/log4j-1.2.15.jar (added)
- src/java/lib/spring-beans-2.5.5.jar (added)
- src/java/lib/spring-context-2.5.5.jar (added)
- src/java/run-iep.sh.in (added)
- src/noit_jlog_listener.c (modified) (7 diffs)
- src/noit_jlog_listener.h (modified) (1 diff)
- src/noit_listener.c (modified) (2 diffs)
- src/noit_listener.h (modified) (1 diff)
- src/stomp/Makefile.in (added)
- src/stomp/stomp.c (added)
- src/stomp/stomp.h (added)
- src/stratcon.conf.in (modified) (1 diff)
- src/stratcon_datastore.c (modified) (4 diffs)
- src/stratcon_iep.c (modified) (5 diffs)
- src/stratcon_jlog_streamer.c (modified) (8 diffs)
- src/stratcon_jlog_streamer.h (modified) (2 diffs)
- src/utils/noit_log.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
configure.in
r0b1d2f2 ra907717 24 24 AC_PATH_PROGS(XSLTPROC, xsltproc) 25 25 AC_PATH_PROGS(XMLLINT, xmllint) 26 AC_PATH_PROGS(JAVA, java) 27 AC_PATH_PROGS(JAVAC, javac) 28 AC_PATH_PROGS(JAR, jar) 26 29 AC_PATH_PROG(NROFF, nroff) 27 30 AC_SUBST(PERL) … … 98 101 AC_SUBST(ATOMIC_OBJS) 99 102 AC_DEFINE_UNQUOTED(MODULEEXT, "$MODULEEXT", [module extension]) 103 AC_MSG_CHECKING([enable build/install of the Java IEP bits]) 104 105 if test -z "$ac_cv_path_JAVA" ; then 106 AC_MSG_RESULT([no]) 107 else 108 JAVAPARTS=java 109 AC_MSG_RESULT([yes]) 110 fi 111 AC_SUBST(JAVAPARTS) 100 112 101 113 NOIT_SVNVERSION=`svnversion` … … 571 583 src/noitedit/Makefile 572 584 src/lua/Makefile 585 src/stomp/Makefile 586 src/java/Makefile 587 src/java/run-iep.sh 573 588 sql/Makefile 574 589 test/Makefile src/Makefile.in
ra18c447 ra907717 32 32 NOWHOLE_ARCHIVE=@NOWHOLE_ARCHIVE@ 33 33 34 SUBS=lua utils eventer udns modules noitedit man 34 SUBS=lua utils eventer udns modules noitedit man stomp @JAVAPARTS@ 35 35 36 36 NOIT_OBJS=noitd.o noit_listener.o \ … … 54 54 make-subdirs: serf/.libs/libserf-0.o jlog/libjlog.a 55 55 for dir in $(SUBS) ; do \ 56 (cd $$dir && make) ; \ 56 echo "- building $$dir bits" ; \ 57 (cd $$dir && make -s) ; \ 57 58 done 58 59 … … 93 94 -Ljlog -ljlog \ 94 95 -Lnoitedit -lnoitedit \ 96 -Lstomp -lstomp \ 95 97 $(NOWHOLE_ARCHIVE) \ 96 $(LIBS) $(PGLIBS) 98 $(LIBS) $(PGLIBS) @SERFLIBS@ 97 99 @echo "- linking $@" 98 100 99 101 stratcon_datastore.o: stratcon_datastore.c 100 102 @$(CC) $(CPPFLAGS) $(PGCFLAGS) -c $< 103 @echo "- compiling $<" 104 105 stratcon_iep.o: stratcon_iep.c 106 @$(CC) $(CPPFLAGS) $(CFLAGS) @SERFCFLAGS@ -c $< 101 107 @echo "- compiling $<" 102 108 … … 115 121 @echo "- re2c noit_tokenizer.re" 116 122 117 test-noit.conf: noit.conf.in 123 test-noit.conf: noit.conf.in Makefile 118 124 sed -e "s^%sysconfdir%^`pwd`^g;" \ 119 125 -e "s^%modulesdir%^`pwd`/modules^g;" \ … … 123 129 test-noit.conf 124 130 125 noit.conf: noit.conf.in 131 noit.conf: noit.conf.in Makefile 126 132 sed -e "s^%sysconfdir%^$(sysconfdir)^g;" \ 127 133 -e "s^%modulesdir%^$(MODULES_DIR)^g;" \ … … 131 137 noit.conf 132 138 133 test-stratcon.conf: stratcon.conf.in 139 test-stratcon.conf: stratcon.conf.in Makefile 134 140 sed -e "s^%sysconfdir%^`pwd`^g;" \ 135 141 -e "s^%modulesdir%^`pwd`/modules^g;" \ 136 142 -e "s^%modulesluadir%^`pwd`/modules-lua^g;" \ 143 -e "s^%iepbindir%^`pwd`/java^g;" \ 144 -e "s^%iepdbdir%^`pwd`/java^g;" \ 137 145 -e "s^%PKIPREFIX%^../test/test-^g;" < \ 138 146 stratcon.conf.in > \ 139 147 test-stratcon.conf 140 148 141 stratcon.conf: stratcon.conf.in 149 stratcon.conf: stratcon.conf.in Makefile 142 150 sed -e "s^%sysconfdir%^$(sysconfdir)^g;" \ 143 151 -e "s^%modulesdir%^$(MODULES_DIR)^g;" \ 144 152 -e "s^%modulesluadir%^$(MODULES_DIR)^g;" \ 153 -e "s^%iepbindir%^$(bindir)^g;" \ 154 -e "s^%iepdbdir%^$(prefix)/var/db^g;" \ 145 155 -e "s^%PKIPREFIX%^$${PKIPREFIX}^g;" < \ 146 156 stratcon.conf.in > \ … … 158 168 (cd man && make install DESTDIR=$(DESTDIR)) 159 169 (cd modules && make install DESTDIR=$(DESTDIR)) 170 test -n "@JAVAPARTS@" && (cd @JAVAPARTS@ && make install DESTDIR=$(DESTDIR)) 160 171 (cd modules-lua && make install DESTDIR=$(DESTDIR)) 161 172 src/eventer/eventer_SSL_fd_opset.c
r6210da7 ra907717 39 39 unsigned long err; 40 40 char buf[120]; /* ERR_error_string(3): buf must be at least 120 bytes */ 41 noitL( noit_error, "%s:%d: errno: [%d] %s\n", f, l, errno, strerror(errno));41 noitL(eventer_err, "%s:%d: errno: [%d] %s\n", f, l, errno, strerror(errno)); 42 42 while((err = ERR_get_error()) != 0) { 43 43 ERR_error_string(err, buf); 44 noitL( noit_error, "%s:%d: write error[%08lx] %s\n", f, l, err, buf);44 noitL(eventer_err, "%s:%d: write error[%08lx] %s\n", f, l, err, buf); 45 45 } 46 46 } … … 68 68 if(!out || 69 69 !PEM_write_bio_RSAPrivateKey(out, tmpkey, NULL, NULL, 0, 0, NULL)) { 70 noitL( noit_error, "Could not save temporary RSA key to %s\n", tmpkeyfile);70 noitL(eventer_err, "Could not save temporary RSA key to %s\n", tmpkeyfile); 71 71 } else { 72 72 tmpkeyfile_time = time(NULL); … … 118 118 if(!strcmp(opt_no_ca, "true")) ok = 1; 119 119 else { 120 noitL( noit_error, "SSL client cert invalid: %s\n",120 noitL(eventer_err, "SSL client cert invalid: %s\n", 121 121 X509_verify_cert_error_string(v_res)); 122 122 ok = 0; … … 128 128 if(!strcmp(ignore_dates, "true")) ok = 1; 129 129 else { 130 noitL( noit_error, "SSL client cert is %s valid.\n",130 noitL(eventer_err, "SSL client cert is %s valid.\n", 131 131 (v_res < 0) ? "not yet" : "no longer"); 132 132 ok = 0; … … 356 356 break; 357 357 default: 358 noitL( noit_error, "SSL rw error: %d\n", sslerror);358 noitL(eventer_err, "SSL rw error: %d\n", sslerror); 359 359 eventer_ssl_error(); 360 360 errno = EIO; … … 374 374 eventer_SSL_read(int fd, void *buffer, size_t len, int *mask, void *closure) { 375 375 int rv; 376 noitL(noit_debug, "SSL_read(%d) wants %ld bytes\n", fd, (long int)len);377 376 rv = eventer_SSL_rw(SSL_OP_READ, fd, buffer, len, mask, closure); 378 noitL(noit_debug, "SSL_read(%d) wanted %ld bytes, got return value %d\n", fd, (long int)len, rv);379 377 return rv; 380 378 } … … 383 381 void *closure) { 384 382 int rv; 385 noitL(noit_debug, "SSL_write(%d) wants %ld bytes\n", fd, (long int)len);386 383 rv = eventer_SSL_rw(SSL_OP_WRITE, fd, (void *)buffer, len, mask, closure); 387 noitL(noit_debug, "SSL_write(%d) wanted %ld bytes, got return value %d\n", fd, (long int)len, rv);388 384 return rv; 389 385 } src/eventer/eventer_impl.c
r84d6f13 ra907717 11 11 12 12 static struct timeval *eventer_impl_epoch = NULL; 13 static int EVENTER_DEBUGGING = 0; 13 14 14 15 #ifdef HAVE_KQUEUE … … 57 58 return 0; 58 59 } 60 else if(!strcasecmp(key, "debugging")) { 61 if(strcmp(value, "0")) { 62 EVENTER_DEBUGGING = 1; 63 noitL(noit_error, "Enabling debugging from property\n"); 64 } 65 return 0; 66 } 59 67 return -1; 60 68 } … … 73 81 int i; 74 82 eventer_t e; 83 char *evdeb; 75 84 85 evdeb = getenv("EVENTER_DEBUGGING"); 86 if(evdeb) { 87 if(strcmp(evdeb, "0")) { 88 /* Set to anything but "0" turns debugging on */ 89 EVENTER_DEBUGGING = 1; 90 noitL(noit_error, "Disabling eventer debugging from environment\n"); 91 } 92 else { 93 EVENTER_DEBUGGING = 1; 94 noitL(noit_error, "Enabling eventer debugging from environment\n"); 95 } 96 } 76 97 eventer_impl_epoch = malloc(sizeof(struct timeval)); 77 98 gettimeofday(eventer_impl_epoch, NULL); … … 104 125 job->fd_event = e; 105 126 gettimeofday(&job->create_time, NULL); 106 if(e->whence.tv_sec) { 127 /* If we're debugging the eventer, these cross thread timeouts will 128 * make it impossible for us to slowly trace an asynch job. */ 129 if(!EVENTER_DEBUGGING && e->whence.tv_sec) { 107 130 job->timeout_event = eventer_alloc(); 108 131 memcpy(&job->timeout_event->whence, &e->whence, sizeof(e->whence)); src/eventer/eventer_jobq.c
rec98850 ra907717 226 226 pthread_mutex_lock(&job->lock); 227 227 if(job->timeout_triggered) { 228 struct timeval diff, diff2; 228 229 /* This happens if the timeout occurred before we even had the change 229 230 * to pull the job off the queue. We must be in bad shape here. … … 231 232 noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p]\n", pthread_self(), jobq->queue_name, job); 232 233 gettimeofday(&job->finish_time, NULL); /* We're done */ 234 sub_timeval(job->finish_time, job->fd_event->whence, &diff); 235 sub_timeval(job->finish_time, job->create_time, &diff2); 236 noitL(eventer_deb, "%p jobq[%s] -> timeout before start [%p] -%0.6f (%0.6f)\n", 237 pthread_self(), jobq->queue_name, job, 238 (float)diff.tv_sec + (float)diff.tv_usec/1000000.0, 239 (float)diff2.tv_sec + (float)diff2.tv_usec/1000000.0); 233 240 pthread_mutex_unlock(&job->lock); 234 241 job->fd_event->callback(job->fd_event, EVENTER_ASYNCH_CLEANUP, src/noit_jlog_listener.c
rdb9d1c2 ra907717 15 15 #include <sys/ioctl.h> 16 16 #define MAX_ROWS_AT_ONCE 1000 17 #define DEFAULT_SECONDS_BETWEEN_BATCHES 1 17 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10 18 19 static noit_atomic32_t tmpfeedcounter = 0; 18 20 19 21 void … … 23 25 NOIT_JLOG_DATA_FEED, 24 26 noit_jlog_handler); 27 noit_control_dispatch_delegate(noit_control_dispatch, 28 NOIT_JLOG_DATA_TEMP_FEED, 29 noit_jlog_handler); 25 30 } 26 31 27 32 typedef struct { 28 33 jlog_ctx *jlog; 34 char *subscriber; 29 35 jlog_id chkpt; 30 36 jlog_id start; … … 43 49 void 44 50 noit_jlog_closure_free(noit_jlog_closure_t *jcl) { 45 if(jcl->jlog) jlog_ctx_close(jcl->jlog); 51 if(jcl->jlog) { 52 if(jcl->subscriber) { 53 if(jcl->subscriber[0] == '~') 54 jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber); 55 free(jcl->subscriber); 56 } 57 jlog_ctx_close(jcl->jlog); 58 } 46 59 free(jcl); 47 60 } … … 112 125 while(1) { 113 126 jlog_id client_chkpt; 114 int sleeptime = DEFAULT_SECONDS_BETWEEN_BATCHES; 127 int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ? 128 1 : DEFAULT_SECONDS_BETWEEN_BATCHES; 115 129 jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt); 116 130 jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish); … … 184 198 noit_log_stream_t ls; 185 199 const char *logname; 186 char path[PATH_MAX], *sub;200 char path[PATH_MAX], subscriber[32], *sub; 187 201 jcl = ac->service_ctx = noit_jlog_closure_alloc(); 188 202 if(!noit_hash_retr_str(ac->config, … … 204 218 goto socket_error; 205 219 } 206 if(!ac->remote_cn) { 207 noitL(noit_error, "jlog transit started to unidentified party.\n"); 208 goto socket_error; 209 } 220 if(ac->cmd == NOIT_JLOG_DATA_FEED) { 221 if(!ac->remote_cn) { 222 noitL(noit_error, "jlog transit started to unidentified party.\n"); 223 goto socket_error; 224 } 225 strlcpy(subscriber, ac->remote_cn, sizeof(subscriber)); 226 } 227 else { 228 snprintf(subscriber, sizeof(subscriber), 229 "~%07d", noit_atomic_inc32(&tmpfeedcounter)); 230 } 231 jcl->subscriber = strdup(subscriber); 210 232 211 233 strlcpy(path, ls->path, sizeof(path)); … … 221 243 222 244 jcl->jlog = jlog_new(path); 223 if(jlog_ctx_open_reader(jcl->jlog, ac->remote_cn) == -1) { 224 noitL(noit_error, "jlog reader[%s] error: %s\n", ac->remote_cn, 245 if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) 246 if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) 247 noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber, 248 jlog_ctx_err_string(jcl->jlog)); 249 if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) { 250 noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber, 225 251 jlog_ctx_err_string(jcl->jlog)); 226 252 goto socket_error; src/noit_jlog_listener.h
r6bb9ef8 ra907717 11 11 12 12 #define NOIT_JLOG_DATA_FEED 0xda7afeed 13 #define NOIT_JLOG_DATA_TEMP_FEED 0x7e66feed 13 14 14 15 API_EXPORT(void) src/noit_listener.c
rdb9d1c2 ra907717 368 368 } 369 369 370 cmd = ntohl(cmd);370 ac->cmd = ntohl(cmd); 371 371 /* Lookup cmd and dispatch */ 372 372 if(noit_hash_retrieve(&listener_commands, … … 376 376 delegation_table = (noit_hash_table *)vdelegation_table; 377 377 if(noit_hash_retrieve(delegation_table, 378 (char *)& cmd, sizeof(cmd), &vfunc)) {378 (char *)&ac->cmd, sizeof(ac->cmd), &vfunc)) { 379 379 e->callback = *((eventer_func_t *)vfunc); 380 380 return e->callback(e, mask, closure, now); src/noit_listener.h
rdb9d1c2 ra907717 29 29 void *service_ctx; 30 30 eventer_func_t dispatch; 31 u_int32_t cmd; 31 32 } acceptor_closure_t; 32 33 src/stratcon.conf.in
re2900c8 ra907717 29 29 <noit address="127.0.0.1" port="34332" /> 30 30 </noits> 31 32 <iep disable="false"> <!-- false the default --> 33 <start directory="%iepdbdir%" 34 command="%iepbindir%/run-iep.sh" /> 35 <queries> 36 <query id="ce6bf8d2-3dd7-11de-a45c-a7df160cba9e" topic="status"> 37 select * from NoitStatus 38 </query> 39 </queries> 40 </iep> 31 41 32 42 <database> src/stratcon_datastore.c
rb3a7d8c ra907717 430 430 noit_hash_table *t; 431 431 432 dsn[0] = '\0'; 433 t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig"); 434 while(noit_hash_next_str(t, &iter, &k, &klen, &v)) { 435 if(dsn[0]) strlcat(dsn, " ", sizeof(dsn)); 436 strlcat(dsn, k, sizeof(dsn)); 437 strlcat(dsn, "=", sizeof(dsn)); 438 strlcat(dsn, v, sizeof(dsn)); 439 } 440 432 441 if(cq->dbh) { 433 442 if(PQstatus(cq->dbh) == CONNECTION_OK) return 0; … … 439 448 } 440 449 441 dsn[0] = '\0';442 t = noit_conf_get_hash(NULL, "/stratcon/database/dbconfig");443 while(noit_hash_next_str(t, &iter, &k, &klen, &v)) {444 if(dsn[0]) strlcat(dsn, " ", sizeof(dsn));445 strlcat(dsn, k, sizeof(dsn));446 strlcat(dsn, "=", sizeof(dsn));447 strlcat(dsn, v, sizeof(dsn));448 }449 450 cq->dbh = PQconnectdb(dsn); 450 451 if(!cq->dbh) return -1; … … 524 525 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, 525 526 struct timeval *now) { 527 int i; 526 528 conn_q *cq = closure; 527 529 ds_job_detail *current, *last_sp; … … 532 534 full_monty: 533 535 /* Make sure we have a connection */ 536 i = 1; 534 537 while(stratcon_database_connect(cq)) { 535 538 noitL(noit_error, "Error connecting to database\n"); 536 sleep(1); 539 sleep(i); 540 i *= 2; 541 i = MIN(i, 16); 537 542 } 538 543 src/stratcon_iep.c
rb3a7d8c ra907717 8 8 #include "utils/noit_log.h" 9 9 #include "utils/noit_b64.h" 10 #include "noit_jlog_listener.h" 11 #include "stratcon_jlog_streamer.h" 10 12 #include "stratcon_datastore.h" 11 13 #include "noit_conf.h" 12 14 #include "noit_check.h" 13 15 14 #define SWEEP_DELAY { 0L, 100000L } /* 100ms */ 16 #include <unistd.h> 17 #include <sys/fcntl.h> 18 #include <assert.h> 19 #include <libxml/parser.h> 20 #include <libxml/tree.h> 21 #include <libxml/xmlsave.h> 22 #ifdef OPENWIRE 23 #include "amqcs.h" 24 #else 25 #include "stomp/stomp.h" 26 #endif 27 15 28 eventer_jobq_t iep_jobq; 16 29 17 struct noit_line_list { 18 char *line; 19 struct noit_line_list *next; 30 struct iep_thread_driver { 31 #ifdef OPENWIRE 32 amqcs_connect_options connect_options; 33 amqcs_connection *connection; 34 #else 35 stomp_connection *connection; 36 #endif 37 apr_pool_t *pool; 20 38 }; 21 struct iep_batch { 22 /* This lock only needs to be used for inserting... (in the pivot) 23 * Once the batch is "done" it is submitted to a thread and has 24 * no more concurrent access. 25 */ 26 pthread_mutex_t lock; 27 int batch_size; 28 struct noit_line_list *head; 29 struct noit_line_list *tail; 39 pthread_key_t iep_connection; 40 41 struct iep_job_closure { 42 char *line; /* This is a copy and gets trashed during processing */ 43 xmlDocPtr doc; 44 char *doc_str; 45 apr_pool_t *pool; 30 46 }; 31 /* We safely insert into the pivot... then lock and flip the batch 32 * into a self-contained iep_batch which is the closure to the asynch 33 * function that inserts it into OpenESB. 34 */ 35 static struct iep_batch pivot_batch; 47 48 static void 49 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 50 struct sockaddr *remote, void *operand); 51 static void 52 start_iep_daemon(); 36 53 37 54 static int 38 stratcon_iep_batch_add_line(const char *data) { 39 int previous_size; 40 struct noit_line_list *nnode; 41 nnode = malloc(sizeof(*nnode)); 42 nnode->line = strdup(data); 43 nnode->next = NULL; 44 pthread_mutex_lock(&pivot_batch.lock); 45 if(!pivot_batch.tail) pivot_batch.head = pivot_batch.tail = nnode; 46 else { 47 pivot_batch.tail->next = nnode; 48 pivot_batch.tail = nnode; 49 } 50 previous_size = pivot_batch.batch_size; 51 pivot_batch.batch_size++; 52 pthread_mutex_unlock(&pivot_batch.lock); 53 return previous_size; 54 } 55 56 static struct iep_batch * 57 stratcon_iep_batch_copytrunc() { 58 struct iep_batch *nbatch; 59 nbatch = calloc(1, sizeof(*nbatch)); 60 /* Lock */ 61 pthread_mutex_lock(&pivot_batch.lock); 62 /* Copy */ 63 nbatch->batch_size = pivot_batch.batch_size; 64 nbatch->head = pivot_batch.head; 65 nbatch->tail = pivot_batch.tail; 66 /* Trunc */ 67 pivot_batch.batch_size = 0; 68 pivot_batch.head = pivot_batch.tail = NULL; 69 /* Lock */ 70 pthread_mutex_unlock(&pivot_batch.lock); 71 return nbatch; 55 bust_to_parts(char *in, char **p, int len) { 56 int cnt = 0; 57 char *s = in; 58 while(cnt < len) { 59 p[cnt++] = s; 60 while(*s && *s != '\t') s++; 61 if(!*s) break; 62 *s++ = '\0'; 63 } 64 while(*s) s++; /* Move to end */ 65 if(s > in && *(s-1) == '\n') *(s-1) = '\0'; /* chomp */ 66 return cnt; 67 } 68 69 #define ADDCHILD(a,b) \ 70 xmlNewTextChild(root, NULL, (xmlChar *)(a), (xmlChar *)(b)) 71 #define NEWDOC(xmldoc,n,stanza) do { \ 72 xmlNodePtr root; \ 73 xmldoc = xmlNewDoc((xmlChar *)"1.0"); \ 74 root = xmlNewDocNode(xmldoc, NULL, (xmlChar *)(n), NULL); \ 75 xmlDocSetRootElement(xmldoc, root); \ 76 stanza \ 77 } while(0) 78 79 80 static xmlDocPtr 81 stratcon_iep_doc_from_status(char *data) { 82 xmlDocPtr doc; 83 char *parts[7]; 84 if(bust_to_parts(data, parts, 7) != 7) return NULL; 85 /* 'S' TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE */ 86 NEWDOC(doc, "NoitStatus", 87 { 88 ADDCHILD("id", parts[2]); 89 ADDCHILD("state", parts[3]); 90 ADDCHILD("availability", parts[4]); 91 ADDCHILD("duration", parts[5]); 92 ADDCHILD("status", parts[6]); 93 }); 94 return doc; 95 } 96 97 static xmlDocPtr 98 stratcon_iep_doc_from_metric(char *data) { 99 xmlDocPtr doc; 100 char *parts[6]; 101 const char *rootname = "NoitMetricNumeric"; 102 const char *valuename = "value"; 103 if(bust_to_parts(data, parts, 6) != 6) return NULL; 104 /* 'M' TIMESTAMP UUID NAME TYPE VALUE */ 105 106 if(*parts[4] == METRIC_STRING) { 107 rootname = "NoitMetricText"; 108 valuename = "message"; 109 } 110 NEWDOC(doc, rootname, 111 { 112 ADDCHILD("id", parts[2]); 113 ADDCHILD("name", parts[3]); 114 ADDCHILD(valuename, parts[5]); 115 }); 116 return doc; 117 } 118 119 static xmlDocPtr 120 stratcon_iep_doc_from_query(char *data) { 121 xmlDocPtr doc; 122 char *parts[4]; 123 if(bust_to_parts(data, parts, 4) != 4) return NULL; 124 /* 'Q' ID NAME QUERY */ 125 126 NEWDOC(doc, "StratconQuery", 127 { 128 ADDCHILD("id", parts[1]); 129 ADDCHILD("name", parts[2]); 130 ADDCHILD("expression", parts[3]); 131 }); 132 return doc; 133 } 134 135 static xmlDocPtr 136 stratcon_iep_doc_from_querystop(char *data) { 137 xmlDocPtr doc; 138 char *parts[2]; 139 if(bust_to_parts(data, parts, 2) != 2) return NULL; 140 /* 'Q' ID */ 141 142 NEWDOC(doc, "StratconQueryStop", 143 { 144 xmlNodeSetContent(root, (xmlChar *)parts[1]); 145 }); 146 return doc; 147 } 148 149 static xmlDocPtr 150 stratcon_iep_doc_from_line(char *data) { 151 if(data) { 152 switch(*data) { 153 case 'S': return stratcon_iep_doc_from_status(data); 154 case 'M': return stratcon_iep_doc_from_metric(data); 155 case 'Q': return stratcon_iep_doc_from_query(data); 156 case 'q': return stratcon_iep_doc_from_querystop(data); 157 } 158 } 159 return NULL; 160 } 161 162 static float 163 stratcon_iep_age_from_line(char *data, struct timeval now) { 164 float n, t; 165 if(data && (*data == 'S' || *data == 'M')) { 166 if(data[1] != '\t') return 0; 167 t = strtof(data + 2, NULL); 168 n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0; 169 return n - t; 170 } 171 return 0; 172 } 173 174 void stratcon_iep_submit_queries() { 175 int i, cnt = 0; 176 noit_conf_section_t *query_configs; 177 char path[256]; 178 179 snprintf(path, sizeof(path), "/stratcon/iep/queries//query"); 180 query_configs = noit_conf_get_sections(NULL, path, &cnt); 181 noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 182 for(i=0; i<cnt; i++) { 183 char id[UUID_STR_LEN]; 184 char topic[256]; 185 char *query; 186 char *line; 187 int line_len; 188 189 if(!noit_conf_get_stringbuf(query_configs[i], 190 "self::node()/@id", 191 id, sizeof(id))) { 192 noitL(noit_error, "No uuid specified in query\n"); 193 continue; 194 } 195 if(!noit_conf_get_stringbuf(query_configs[i], 196 "ancestor-or-self::node()/@topic", 197 topic, sizeof(topic))) { 198 noitL(noit_error, "No topic specified in query\n"); 199 continue; 200 } 201 if(!noit_conf_get_string(query_configs[i], "self::node()", 202 &query)) { 203 noitL(noit_error, "No contents specified in query\n"); 204 continue; 205 } 206 line_len = 4 /* 3 tabs + \0 */ + 207 1 /* 'Q' */ + 1 /* '\n' */ + 208 strlen(id) + strlen(topic) + strlen(query); 209 line = malloc(line_len); 210 snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query); 211 free(query); 212 query = line; 213 while(query[0] && query[1]) { 214 if(*query == '\n') *query = ' '; 215 query++; 216 } 217 stratcon_iep_datastore_onlooker(DS_OP_INSERT, NULL, line); 218 free(line); 219 } 220 } 221 222 static char * 223 stratcon__xml_doc_to_str(xmlDocPtr doc) { 224 char *rv; 225 xmlSaveCtxtPtr savectx; 226 xmlBufferPtr xmlbuffer; 227 xmlbuffer = xmlBufferCreate(); 228 savectx = xmlSaveToBuffer(xmlbuffer, "utf8", 1); 229 xmlSaveDoc(savectx, doc); 230 xmlSaveClose(savectx); 231 rv = strdup((const char *)xmlBufferContent(xmlbuffer)); 232 xmlBufferFree(xmlbuffer); 233 return rv; 234 } 235 236 static 237 struct iep_thread_driver *stratcon_iep_get_connection() { 238 apr_status_t rc; 239 struct iep_thread_driver *driver; 240 driver = pthread_getspecific(iep_connection); 241 if(!driver) { 242 driver = calloc(1, sizeof(*driver)); 243 pthread_setspecific(iep_connection, driver); 244 } 245 246 if(!driver->pool) { 247 if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL; 248 } 249 250 if(!driver->connection) { 251 int port; 252 char hostname[128]; 253 if(!noit_conf_get_int(NULL, "/stratcon/iep/port", &port)) 254 port = 61613; 255 if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/hostname", 256 hostname, sizeof(hostname))) 257 strlcpy(hostname, "127.0.0.1", sizeof(hostname)); 258 #ifdef OPENWIRE 259 memset(&driver->connect_options, 0, sizeof(driver->connect_options)); 260 strlcpy(driver->connect_options.hostname, hostname, 261 sizeof(driver->connect_options.hostname)); 262 driver->connect_options.port = port; 263 if(amqcs_connect(&driver->connection, &driver->connect_options, 264 driver->pool) != APR_SUCCESS) { 265 noitL(noit_error, "MQ connection failed\n"); 266 return NULL; 267 } 268 #else 269 if(stomp_connect(&driver->connection, hostname, port, 270 driver->pool)!= APR_SUCCESS) { 271 noitL(noit_error, "MQ connection failed\n"); 272 return NULL; 273 } 274 275 { 276 stomp_frame frame; 277 frame.command = "CONNECT"; 278 frame.headers = apr_hash_make(driver->pool); 279 /* 280 We don't use login/pass 281 apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, ""); 282 apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, ""); 283 */ 284 frame.body = NULL; 285 frame.body_length = -1; 286 rc = stomp_write(driver->connection, &frame, driver->pool); 287 if(rc != APR_SUCCESS) { 288 noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc); 289 stomp_disconnect(&driver->connection); 290 return NULL; 291 } 292 } 293 { 294 stomp_frame *frame; 295 rc = stomp_read(driver->connection, &frame, driver->pool); 296 if (rc != APR_SUCCESS) { 297 noitL(noit_error, "MQ STOMP CONNECT bad response, %d\n", rc); 298 stomp_disconnect(&driver->connection); 299 return NULL; 300 } 301 noitL(noit_error, "Response: %s, %s\n", frame->command, frame->body); 302 } 303 #endif 304 stratcon_iep_submit_queries(); 305 } 306 307 return driver; 72 308 } 73 309 … … 75 311 stratcon_iep_submitter(eventer_t e, int mask, void *closure, 76 312 struct timeval *now) { 77 struct iep_batch *batch = closure; 313 float age; 314 struct iep_job_closure *job = closure; 78 315 /* We only play when it is an asynch event */ 79 316 if(!(mask & EVENTER_ASYNCH_WORK)) return 0; … … 81 318 if(mask & EVENTER_ASYNCH_CLEANUP) { 82 319 /* free all the memory associated with the batch */ 83 while(batch->head) { 84 struct noit_line_list *l; 85 l = batch->head; 86 batch->head = l->next; 87 free(l->line); 88 free(l); 89 } 90 free(batch); 320 if(job) { 321 if(job->line) free(job->line); 322 if(job->doc_str) free(job->doc_str); 323 if(job->doc) xmlFreeDoc(job->doc); 324 if(job->pool) apr_pool_destroy(job->pool); 325 free(job); 326 } 91 327 return 0; 92 328 } 93 329 94 /* pull from batch and submit */ 95 noitL(noit_error, "Firing stratcon_iep_submitter on a batch of %d events\n", 96 batch->batch_size); 97 98 return 0; 99 } 100 101 static int 102 stratcon_iep_batch_sweep(eventer_t e, int mask, void *closure, 103 struct timeval *now) { 104 struct iep_batch *nbatch; 105 struct timeval iep_timeout = { 5L, 0L }; 106 eventer_t newe; 107 108 nbatch = stratcon_iep_batch_copytrunc(); 109 if(nbatch->batch_size == 0) { 110 /* misfire */ 111 free(nbatch); 330 if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) { 331 noitL(noit_debug, "Skipping old event %f second old.\n", age); 112 332 return 0; 113 333 } 114 115 newe = eventer_alloc(); 116 newe->mask = EVENTER_ASYNCH; 117 add_timeval(*now, iep_timeout, &newe->whence); 118 newe->callback = stratcon_iep_submitter; 119 newe->closure = nbatch; 120 121 eventer_add_asynch(&iep_jobq, newe); 334 job->doc = stratcon_iep_doc_from_line(job->line); 335 if(job->doc) { 336 job->doc_str = stratcon__xml_doc_to_str(job->doc); 337 if(job->doc_str) { 338 /* Submit */ 339 struct iep_thread_driver *driver; 340 driver = stratcon_iep_get_connection(); 341 if(driver && driver->pool && driver->connection) { 342 apr_status_t rc; 343 #ifdef OPENWIRE 344 ow_ActiveMQQueue *dest; 345 ow_ActiveMQTextMessage *message; 346 347 apr_pool_create(&job->pool, driver->pool); 348 message = ow_ActiveMQTextMessage_create(job->pool); 349 message->content = 350 ow_byte_array_create_with_data(job->pool,strlen(job->doc_str), 351 job->doc_str); 352 dest = ow_ActiveMQQueue_create(job->pool); 353 dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE"); 354 rc = amqcs_send(driver->connection, 355 (ow_ActiveMQDestination*)dest, 356 (ow_ActiveMQMessage*)message, 357 1,4,0,job->pool); 358 if(rc != APR_SUCCESS) { 359 noitL(noit_error, "MQ send failed, disconnecting\n"); 360 if(driver->connection) amqcs_disconnect(&driver->connection); 361 driver->connection = NULL; 362 } 363 #else 364 stomp_frame out; 365 366 apr_pool_create(&job->pool, driver->pool); 367 368 out.command = "SEND"; 369 out.headers = apr_hash_make(job->pool); 370 apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 371 apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 372 373 out.body_length = -1; 374 out.body = job->doc_str; 375 rc = stomp_write(driver->connection, &out, job->pool); 376 if(rc != APR_SUCCESS) { 377 noitL(noit_error, "STOMP send failed, disconnecting\n"); 378 if(driver->connection) stomp_disconnect(&driver->connection); 379 driver->connection = NULL; 380 } 381 #endif 382 } 383 else { 384 noitL(noit_error, "Not submitting event, no MQ\n"); 385 } 386 } 387 } 388 else { 389 noitL(noit_error, "no iep handler for: '%s'\n", job->line); 390 } 122 391 return 0; 123 392 } … … 126 395 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 127 396 struct sockaddr *remote, void *operand) { 397 struct iep_job_closure *jc; 398 eventer_t newe; 399 struct timeval __now, iep_timeout = { 20L, 0L }; 128 400 /* We only care about inserts */ 401 402 if(op == DS_OP_CHKPT) { 403 eventer_add((eventer_t) operand); 404 return; 405 } 129 406 if(op != DS_OP_INSERT) return; 130 407 131 408 /* process operand and push onto queue */ 132 if(stratcon_iep_batch_add_line((char *)operand) == 0) { 133 /* If this is the first in the queue, then we need to schedule a 134 * sweeper to submit the queue. 135 */ 136 eventer_t e; 137 struct timeval __now, sweep_delay = SWEEP_DELAY; 138 139 gettimeofday(&__now, NULL); 140 141 e = eventer_alloc(); 142 e->callback = stratcon_iep_batch_sweep; 143 add_timeval(__now, sweep_delay, &e->whence); 144 e->closure = NULL; /* we can only do one thing */ 145 e->mask = EVENTER_TIMER; 146 eventer_add(e); 147 } 409 gettimeofday(&__now, NULL); 410 newe = eventer_alloc(); 411 newe->mask = EVENTER_ASYNCH; 412 add_timeval(__now, iep_timeout, &newe->whence); 413 newe->callback = stratcon_iep_submitter; 414 jc = calloc(1, sizeof(*jc)); 415 jc->line = strdup(operand); 416 newe->closure = jc; 417 418 eventer_add_asynch(&iep_jobq, newe); 419 } 420 421 static void connection_destroy(void *vd) { 422 struct iep_thread_driver *driver = vd; 423 #ifdef OPENWIRE 424 if(driver->connection) amqcs_disconnect(&driver->connection); 425 #else 426 if(driver->connection) stomp_disconnect(&driver->connection); 427 #endif 428 if(driver->pool) apr_pool_destroy(driver->pool); 429 free(driver); 430 } 431 432 jlog_streamer_ctx_t * 433 stratcon_jlog_streamer_iep_ctx_alloc(void) { 434 jlog_streamer_ctx_t *ctx; 435 ctx = stratcon_jlog_streamer_ctx_alloc(); 436 ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED); 437 ctx->push = stratcon_iep_datastore_onlooker; 438 return ctx; 439 } 440 441 struct iep_daemon_info { 442 pid_t child; 443 int stdin_pipe[2]; 444 int stderr_pipe[2]; 445 char *directory; 446 char *command; 447 }; 448 449 static void 450 iep_daemon_info_free(struct iep_daemon_info *info) { 451 if(!info) return; 452 if(info->directory) free(info->directory); 453 if(info->command) free(info->command); 454 if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]); 455 if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]); 456 if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]); 457 if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]); 458 free(info); 459 } 460 461 static int 462 stratcon_iep_err_handler(eventer_t e, int mask, void *closure, 463 struct timeval *now) { 464 int len, newmask; 465 char buff[4096]; 466 struct iep_daemon_info *info = (struct iep_daemon_info *)closure; 467 468 if(mask & EVENTER_EXCEPTION) { 469 int rv; 470 read_error: 471 kill(SIGKILL, info->child); 472 if(waitpid(info->child, &rv, 0) != info->child) { 473 noitL(noit_error, "Failed to reap IEP daemon\n"); 474 exit(-1); 475 } 476 noitL(noit_error, "IEP daemon is done, starting a new one\n"); 477 start_iep_daemon(); 478 eventer_remove_fd(e->fd); 479 e->opset->close(e->fd, &newmask, e); 480 return 0; 481 } 482 while(1) { 483 len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e); 484 if(len == -1 && (errno == EAGAIN || errno == EINTR)) 485 return newmask | EVENTER_EXCEPTION; 486 if(len <= 0) goto read_error; 487 assert(len < sizeof(buff)); 488 buff[len] = '\0'; 489 noitL(noit_error, "IEP: %s", buff); 490 } 491 } 492 493 static void 494 start_iep_daemon() { 495 eventer_t newe; 496 struct iep_daemon_info *info; 497 498 info = calloc(1, sizeof(*info)); 499 info->stdin_pipe[0] = info->stdin_pipe[1] = -1; 500 info->stderr_pipe[0] = info->stderr_pipe[1] = -1; 501 502 if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@directory", 503 &info->directory)) 504 info->directory = strdup("."); 505 if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@command", 506 &info->command)) { 507 noitL(noit_error, "No IEP start command provided. You're on your own.\n"); 508 goto bail; 509 } 510 if(pipe(info->stdin_pipe) != 0 || 511 pipe(info->stderr_pipe) != 0) { 512 noitL(noit_error, "pipe: %s\n", strerror(errno)); 513 goto bail; 514 } 515 info->child = fork(); 516 if(info->child == -1) { 517 noitL(noit_error, "fork: %s\n", strerror(errno)); 518 goto bail; 519 } 520 if(info->child == 0) { 521 char *argv[2] = { "run-iep", NULL }; 522 int stdout_fileno; 523 524 if(chdir(info->directory) != 0) { 525 noitL(noit_error, "Starting IEP daemon, chdir failed: %s\n", 526 strerror(errno)); 527 exit(-1); 528 } 529 530 close(info->stdin_pipe[1]); 531 close(info->stderr_pipe[0]); 532 dup2(info->stdin_pipe[0], 0); 533 dup2(info->stderr_pipe[1], 2); 534 stdout_fileno = open("/dev/null", O_WRONLY); 535 dup2(stdout_fileno, 1); 536 537 exit(execv(info->command, argv)); 538 } 539 /* in the parent */ 540 socklen_t on = 1; 541 542 close(info->stdin_pipe[0]); 543 info->stdin_pipe[0] = -1; 544 close(info->stderr_pipe[1]); 545 info->stderr_pipe[1] = -1; 546 if(ioctl(info->stderr_pipe[0], FIONBIO, &on)) { 547 goto bail; 548 } 549 550 newe = eventer_alloc(); 551 newe->fd = info->stderr_pipe[0]; 552 newe->mask = EVENTER_READ | EVENTER_EXCEPTION; 553 newe->callback = stratcon_iep_err_handler; 554 newe->closure = info; 555 eventer_add(newe); 556 info = NULL; 557 558 return; 559 560 bail: 561 if(info) { 562 iep_daemon_info_free(info); 563 } 564 noitL(noit_error, "Failed to start IEP daemon\n"); 565 exit(-1); 566 return; 148 567 } 149 568 150 569 void 151 570 stratcon_iep_init() { 152 eventer_name_callback("stratcon_iep_batch_sweep", stratcon_iep_batch_sweep); 571 noit_boolean disabled = noit_false; 572 apr_initialize(); 573 atexit(apr_terminate); 574 575 if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) && 576 disabled == noit_true) { 577 noitL(noit_error, "IEP system is disabled!\n"); 578 return; 579 } 580 581 start_iep_daemon(); 582 153 583 eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter); 584 pthread_key_create(&iep_connection, connection_destroy); 154 585 155 586 /* start up a thread pool of one */ … … 159 590 eventer_jobq_increase_concurrency(&iep_jobq); 160 591 161 /* Setup our pivot batch*/162 memset(&pivot_batch, 0, sizeof(pivot_batch));163 pthread_mutex_init(&pivot_batch.lock, NULL);164 165 /* Add our onlooker */166 stratcon_datastore_register_onlooker(stratcon_iep_datastore_onlooker);167 } 168 592 /* setup our live jlog stream */ 593 stratcon_streamer_connection(NULL, NULL, 594 stratcon_jlog_recv_handler, 595 (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc, 596 NULL, 597 jlog_streamer_ctx_free); 598 } 599 src/stratcon_jlog_streamer.c
re7ae97b ra907717 9 9 #include "utils/noit_hash.h" 10 10 #include "utils/noit_log.h" 11 #include "jlog/jlog.h"12 11 #include "noit_jlog_listener.h" 13 12 #include "stratcon_datastore.h" … … 28 27 noit_hash_table noits = NOIT_HASH_EMPTY; 29 28 30 typedef struct jlog_streamer_ctx_t {31 int bytes_expected;32 int bytes_read;33 char *buffer; /* These guys are for doing partial reads */34 35 enum {36 WANT_INITIATE = 0,37 WANT_COUNT = 1,38 WANT_HEADER = 2,39 WANT_BODY = 3,40 WANT_CHKPT = 4,41 } state;42 int count; /* Number of jlog messages we need to read */43 struct {44 jlog_id chkpt;45 u_int32_t tv_sec;46 u_int32_t tv_usec;47 u_int32_t message_len;48 } header;49 } jlog_streamer_ctx_t;50 51 29 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx); 52 30 53 31 jlog_streamer_ctx_t * 54 jlog_streamer_ctx_alloc(void) { 32 stratcon_jlog_streamer_datastore_ctx_alloc(void) { 33 jlog_streamer_ctx_t *ctx; 34 ctx = stratcon_jlog_streamer_ctx_alloc(); 35 ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED); 36 ctx->push = stratcon_datastore_push; 37 return ctx; 38 } 39 jlog_streamer_ctx_t * 40 stratcon_jlog_streamer_ctx_alloc(void) { 55 41 jlog_streamer_ctx_t *ctx; 56 42 ctx = calloc(1, sizeof(*ctx)); … … 180 166 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure, 181 167 struct timeval *now) { 182 static u_int32_t jlog_feed_cmd = 0;183 168 noit_connection_ctx_t *nctx = closure; 184 169 jlog_streamer_ctx_t *ctx = nctx->consumer_ctx; 185 170 int len; 186 171 jlog_id n_chkpt; 187 188 if(!jlog_feed_cmd) jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);189 172 190 173 if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) { … … 207 190 switch(ctx->state) { 208 191 case WANT_INITIATE: 209 len = e->opset->write(e->fd, &jlog_feed_cmd, sizeof(&jlog_feed_cmd), 192 len = e->opset->write(e->fd, &ctx->jlog_feed_cmd, 193 sizeof(&ctx->jlog_feed_cmd), 210 194 &mask, e); 211 195 if(len < 0) { … … 213 197 goto socket_error; 214 198 } 215 if(len != sizeof( jlog_feed_cmd)) {199 if(len != sizeof(ctx->jlog_feed_cmd)) { 216 200 noitL(noit_error, "short write on initiating stream.\n"); 217 201 goto socket_error; … … 246 230 case WANT_BODY: 247 231 FULLREAD(e, ctx, (unsigned long)ctx->header.message_len); 248 stratcon_datastore_push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);232 ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 249 233 /* Don't free the buffer, it's used by the datastore process. */ 250 234 ctx->buffer = NULL; … … 257 241 completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION; 258 242 ctx->state = WANT_CHKPT; 259 stratcon_datastore_push(DS_OP_CHKPT, &nctx->r.remote, completion_e);243 ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 260 244 noitL(noit_debug, "Pushing batch asynch...\n"); 261 245 return 0; … … 513 497 stratcon_streamer_connection(toplevel, NULL, 514 498 stratcon_jlog_recv_handler, 515 (void *(*)())jlog_streamer_ctx_alloc, NULL, 499 (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc, 500 NULL, 516 501 jlog_streamer_ctx_free); 517 502 } src/stratcon_jlog_streamer.h
r21b0c6c ra907717 8 8 9 9 #include "noit_conf.h" 10 #include "jlog/jlog.h" 10 11 #include <netinet/in.h> 11 12 #include <sys/un.h> 12 13 #include <arpa/inet.h> 14 #include "stratcon_datastore.h" 13 15 14 16 typedef struct noit_connection_ctx_t { … … 32 34 } noit_connection_ctx_t; 33 35 36 typedef struct jlog_streamer_ctx_t { 37 u_int32_t jlog_feed_cmd; 38 int bytes_expected; 39 int bytes_read; 40 char *buffer; /* These guys are for doing partial reads */ 41 42 enum { 43 WANT_INITIATE = 0, 44 WANT_COUNT = 1, 45 WANT_HEADER = 2, 46 WANT_BODY = 3, 47 WANT_CHKPT = 4, 48 } state; 49 int count; /* Number of jlog messages we need to read */ 50 struct { 51 jlog_id chkpt; 52 u_int32_t tv_sec; 53 u_int32_t tv_usec; 54 u_int32_t message_len; 55 } header; 56 57 void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *); 58 } jlog_streamer_ctx_t; 59 34 60 API_EXPORT(void) 35 61 stratcon_jlog_streamer_init(const char *toplevel); 36 62 API_EXPORT(void) 37 63 stratcon_jlog_streamer_reload(const char *toplevel); 64 API_EXPORT(int) 65 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure, 66 struct timeval *now); 67 API_EXPORT(jlog_streamer_ctx_t *) 68 stratcon_jlog_streamer_ctx_alloc(void); 69 API_EXPORT(void) 70 jlog_streamer_ctx_free(void *cl); 38 71 API_EXPORT(void) 39 72 stratcon_streamer_connection(const char *toplevel, const char *destination, src/utils/noit_log.c
rae34340 ra907717 80 80 }; 81 81 82 static int 83 jlog_logio_reopen(noit_log_stream_t ls) { 84 char **subs; 85 int i; 86 /* reopening only has the effect of removing temporary subscriptions */ 87 /* (they start with ~ in our hair-brained model */ 88 89 if(jlog_ctx_list_subscribers(ls->op_ctx, &subs) == -1) { 90 noitL(noit_error, "Cannot list subscribers: %s\n", 91 jlog_ctx_err_string(ls->op_ctx)); 92 return 0; 93 } 94 95 for(i=0;subs[i];i++) 96 if(subs[i][0] == '~') 97 if(jlog_ctx_remove_subscriber(ls->op_ctx, subs[i]) == -1) 98 noitL(noit_error, "Cannot remove subscriber '%s': %s\n", 99 subs[i], jlog_ctx_err_string(ls->op_ctx)); 100 101 jlog_ctx_list_subscribers_dispose(ls->op_ctx, subs); 102 return 0; 103 } 82 104 static int 83 105 jlog_logio_open(noit_log_stream_t ls) { … … 126 148 } 127 149 ls->op_ctx = log; 128 return 0; 129 } 130 static int 131 jlog_logio_reopen(noit_log_stream_t ls) { 150 /* We do this to clean things up */ 151 jlog_logio_reopen(ls); 132 152 return 0; 133 153 }