Changeset 6bb9ef8a16723394cc4e5c511f06fe2173d78a11
- Timestamp:
- 08/13/08 21:36:52 (5 years ago)
- git-parent:
- Files:
-
- src/noit.conf.in (modified) (1 diff)
- src/noit_jlog_listener.c (modified) (2 diffs)
- src/noit_jlog_listener.h (modified) (1 diff)
- src/noit_listener.c (modified) (3 diffs)
- src/noit_listener.h (modified) (2 diffs)
- src/stratcon_jlog_streamer.c (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/noit.conf.in
re1b3535 r6bb9ef8 74 74 <listener address="*" port="32323" ssl="on"/> 75 75 </consoles> 76 <listener type=" log_transit" address="*" port="34332" ssl="on">76 <listener type="control_dispatch" address="*" port="34332" ssl="on"> 77 77 <config> 78 <log >feed</log>78 <log_transit_feed_name>feed</log_transit_feed_name> 79 79 </config> 80 80 </listener> src/noit_jlog_listener.c
r324755e r6bb9ef8 20 20 noit_jlog_listener_init() { 21 21 eventer_name_callback("log_transit", noit_jlog_handler); 22 noit_control_dispatch_delegate(noit_control_dispatch, 23 NOIT_JLOG_DATA_FEED, 24 noit_jlog_handler); 22 25 } 23 26 … … 183 186 char path[PATH_MAX], *sub; 184 187 jcl = ac->service_ctx = noit_jlog_closure_alloc(); 185 if(!noit_hash_retrieve(ac->config, "log", strlen("log"), 188 if(!noit_hash_retrieve(ac->config, 189 "log_transit_feed_name", 190 strlen("log_transit_feed_name"), 186 191 (void **)&logname)) { 187 noitL(noit_error, "No 'log ' specified in log_transit.\n");192 noitL(noit_error, "No 'log_transit_feed_name' specified in log_transit.\n"); 188 193 goto socket_error; 189 194 } src/noit_jlog_listener.h
r06f58e6 r6bb9ef8 10 10 #include "eventer/eventer.h" 11 11 12 #define NOIT_JLOG_DATA_FEED 0xda7afeed 13 12 14 API_EXPORT(void) 13 15 noit_jlog_listener_init(void); src/noit_listener.c
r4c963d7 r6bb9ef8 20 20 #include "noit_listener.h" 21 21 #include "noit_conf.h" 22 23 static noit_hash_table listener_commands = NOIT_HASH_EMPTY; 22 24 23 25 void … … 260 262 calloc(1, sizeof(*listener_closure->dispatch_closure)); 261 263 listener_closure->dispatch_closure->config = config; 264 listener_closure->dispatch_closure->dispatch = handler; 262 265 listener_closure->dispatch_closure->service_ctx = service_ctx; 263 266 … … 337 340 } 338 341 } 342 int 343 noit_control_dispatch(eventer_t e, int mask, void *closure, 344 struct timeval *now) { 345 u_int32_t cmd; 346 int len; 347 noit_hash_table *delegation_table; 348 acceptor_closure_t *ac = closure; 349 350 len = e->opset->read(e->fd, &cmd, sizeof(cmd), &mask, e); 351 352 if(len == -1 && errno == EAGAIN) 353 return EVENTER_READ | EVENTER_EXCEPTION; 354 355 if(mask & EVENTER_EXCEPTION || len != sizeof(cmd)) { 356 int newmask; 357 socket_error: 358 /* Exceptions cause us to simply snip the connection */ 359 eventer_remove_fd(e->fd); 360 e->opset->close(e->fd, &newmask, e); 361 if(ac) acceptor_closure_free(ac); 362 return 0; 363 } 364 365 cmd = ntohl(cmd); 366 /* Lookup cmd and dispatch */ 367 if(noit_hash_retrieve(&listener_commands, 368 (char *)&ac->dispatch, sizeof(ac->dispatch), 369 (void **)&delegation_table)) { 370 eventer_func_t *eventer_func; 371 if(noit_hash_retrieve(delegation_table, 372 (char *)&cmd, sizeof(cmd), 373 (void **)&eventer_func)) { 374 e->callback = *eventer_func; 375 return e->callback(e, mask, closure, now); 376 } 377 else { 378 const char *event_name; 379 noitL(noit_error, "listener (%s %p) has no command: 0x%8x\n", 380 (event_name = eventer_name_for_callback(ac->dispatch))?event_name:"???", 381 delegation_table, cmd); 382 } 383 } 384 else { 385 const char *event_name; 386 noitL(noit_error, "No delegation table for listener (%s %p)\n", 387 (event_name = eventer_name_for_callback(ac->dispatch))?event_name:"???", 388 delegation_table); 389 } 390 goto socket_error; 391 } 392 void 393 noit_control_dispatch_delegate(eventer_func_t listener_dispatch, 394 u_int32_t cmd, 395 eventer_func_t delegate_dispatch) { 396 u_int32_t *cmd_copy; 397 eventer_func_t *handler_copy; 398 noit_hash_table *delegation_table; 399 if(!noit_hash_retrieve(&listener_commands, 400 (char *)&listener_dispatch, sizeof(listener_dispatch), 401 (void **)delegation_table)) { 402 delegation_table = calloc(1, sizeof(*delegation_table)); 403 handler_copy = malloc(sizeof(*handler_copy)); 404 *handler_copy = listener_dispatch; 405 noit_hash_store(&listener_commands, 406 (char *)handler_copy, sizeof(*handler_copy), 407 delegation_table); 408 } 409 cmd_copy = malloc(sizeof(*cmd_copy)); 410 *cmd_copy = cmd; 411 handler_copy = malloc(sizeof(*handler_copy)); 412 *handler_copy = delegate_dispatch; 413 noit_hash_replace(delegation_table, 414 (char *)cmd_copy, sizeof(*cmd_copy), 415 handler_copy, 416 free, free); 417 } 418 339 419 void 340 420 noit_listener_init(const char *toplevel) { 341 421 eventer_name_callback("noit_listener_acceptor", noit_listener_acceptor); 342 422 eventer_name_callback("noit_listener_accept_ssl", noit_listener_accept_ssl); 423 eventer_name_callback("control_dispatch", noit_control_dispatch); 343 424 noit_listener_reconfig(toplevel); 344 425 } src/noit_listener.h
rc4546c7 r6bb9ef8 28 28 noit_hash_table *config; 29 29 void *service_ctx; 30 eventer_func_t dispatch; 30 31 } acceptor_closure_t; 31 32 … … 49 50 acceptor_closure_free(acceptor_closure_t *ac); 50 51 52 API_EXPORT(void) 53 noit_control_dispatch_delegate(eventer_func_t listener_dispatch, 54 u_int32_t cmd, 55 eventer_func_t delegate_dispatch); 56 57 int noit_control_dispatch(eventer_t, int, void *, struct timeval *); 58 51 59 #endif src/stratcon_jlog_streamer.c
rfb5f8f9 r6bb9ef8 45 45 46 46 enum { 47 WANT_COUNT = 0, 48 WANT_HEADER = 1, 49 WANT_BODY = 2, 50 WANT_CHKPT = 3, 47 WANT_INITIATE = 0, 48 WANT_COUNT = 1, 49 WANT_HEADER = 2, 50 WANT_BODY = 3, 51 WANT_CHKPT = 4, 51 52 } state; 52 53 int count; /* Number of jlog messages we need to read */ … … 180 181 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure, 181 182 struct timeval *now) { 183 static u_int32_t jlog_feed_cmd = 0; 182 184 jlog_streamer_ctx_t *ctx = closure; 183 185 int len; 184 186 jlog_id n_chkpt; 185 187 188 if(!jlog_feed_cmd) jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED); 189 186 190 if(mask & EVENTER_EXCEPTION || ctx->wants_shutdown) { 187 191 socket_error: 188 ctx->state = WANT_ COUNT;192 ctx->state = WANT_INITIATE; 189 193 ctx->count = 0; 190 194 ctx->bytes_read = 0; … … 200 204 while(1) { 201 205 switch(ctx->state) { 206 case WANT_INITIATE: 207 len = e->opset->write(e->fd, &jlog_feed_cmd, sizeof(&jlog_feed_cmd), 208 &mask, e); 209 if(len < 0) { 210 if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; 211 goto socket_error; 212 } 213 if(len != sizeof(jlog_feed_cmd)) { 214 noitL(noit_error, "short write on initiating stream.\n"); 215 goto socket_error; 216 } 217 ctx->state = WANT_COUNT; 218 break; 219 202 220 case WANT_COUNT: 203 221 FULLREAD(e, ctx, sizeof(u_int32_t));
