Changeset 48e115ad8158970b1f83732d0301d0289f42c7fb
- Timestamp:
- 09/09/09 04:36:13 (9 years ago)
- git-parent:
- Files:
-
- src/stratcon_jlog_streamer.c (modified) (12 diffs)
- src/stratcon_jlog_streamer.h (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/stratcon_jlog_streamer.c
r23cfce8 r48e115a 52 52 #include <arpa/inet.h> 53 53 54 pthread_mutex_t noits_lock; 54 55 noit_hash_table noits = NOIT_HASH_EMPTY; 55 56 56 57 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx); 58 59 static int 60 remote_str_sort(const void *a, const void *b) { 61 noit_connection_ctx_t * const *actx = a; 62 noit_connection_ctx_t * const *bctx = b; 63 return strcmp((*actx)->remote_str, (*bctx)->remote_str); 64 } 65 static void 66 nc_print_noit_conn_brief(noit_console_closure_t ncct, 67 noit_connection_ctx_t *ctx) { 68 struct timeval now, diff, session_duration; 69 gettimeofday(&now, NULL); 70 const char *lasttime = "never"; 71 if(ctx->last_connect.tv_sec != 0) { 72 char cmdbuf[4096]; 73 time_t r = ctx->last_connect.tv_sec; 74 struct tm tbuf, *tm; 75 tm = gmtime_r(&r, &tbuf); 76 strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm); 77 lasttime = cmdbuf; 78 } 79 nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str, 80 ctx->timeout_event ? "disconnected" : "connected", lasttime); 81 if(ctx->timeout_event) { 82 sub_timeval(now, ctx->timeout_event->whence, &diff); 83 nc_printf(ncct, "\tNext attempet in %llu.%06us\n", diff.tv_sec, diff.tv_usec); 84 } 85 else { 86 nc_printf(ncct, "\tRemote CN: '%s'\n", 87 ctx->remote_cn ? ctx->remote_cn : "???"); 88 if(ctx->consumer_callback == stratcon_jlog_recv_handler) { 89 jlog_streamer_ctx_t *jctx = ctx->consumer_ctx; 90 struct timeval last; 91 double session_duration_seconds; 92 const char *feedtype = "unknown"; 93 const char *state = "unknown"; 94 95 switch(ntohl(jctx->jlog_feed_cmd)) { 96 case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break; 97 case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break; 98 } 99 switch(jctx->state) { 100 case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break; 101 case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break; 102 case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break; 103 case JLOG_STREAMER_WANT_BODY: state = "reading body"; break; 104 case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break; 105 case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break; 106 } 107 last.tv_sec = jctx->header.tv_sec; 108 last.tv_usec = jctx->header.tv_usec; 109 sub_timeval(now, last, &diff); 110 sub_timeval(now, ctx->last_connect, &session_duration); 111 session_duration_seconds = session_duration.tv_sec + 112 (double)session_duration.tv_usec/1000000.0; 113 nc_printf(ncct, "\tJLog event streamer [%s]\n\tState: %s\n" 114 "\tNext checkpoint: [%08x:%08x]\n" 115 "\tLast event: %llu.%06us ago\n" 116 "\tEvents this session: %llu (%0.2f/s)\n" 117 "\tOctets this session: %llu (%0.2f/s)\n", 118 feedtype, state, 119 jctx->header.chkpt.log, jctx->header.chkpt.marker, 120 diff.tv_sec, diff.tv_usec, 121 jctx->total_events, 122 (double)jctx->total_events/session_duration_seconds, 123 jctx->total_bytes_read, 124 (double)jctx->total_bytes_read/session_duration_seconds); 125 } 126 else { 127 nc_printf(ncct, "\tUnknown type.\n"); 128 } 129 } 130 } 57 131 58 132 jlog_streamer_ctx_t * … … 72 146 noit_connection_ctx_t * 73 147 noit_connection_ctx_alloc(void) { 74 noit_connection_ctx_t *ctx ;148 noit_connection_ctx_t *ctx, **pctx; 75 149 ctx = calloc(1, sizeof(*ctx)); 150 ctx->refcnt = 1; 151 pctx = malloc(sizeof(*pctx)); 152 *pctx = ctx; 153 pthread_mutex_lock(&noits_lock); 154 noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx); 155 pthread_mutex_unlock(&noits_lock); 76 156 return ctx; 77 157 } … … 90 170 const char *v; 91 171 u_int32_t min_interval = 1000, max_interval = 8000; 172 if(ctx->remote_cn) { 173 free(ctx->remote_cn); 174 ctx->remote_cn = NULL; 175 } 92 176 if(noit_hash_retr_str(ctx->config, 93 177 "reconnect_initial_interval", … … 126 210 eventer_add(ctx->timeout_event); 127 211 } 128 void212 static void 129 213 noit_connection_ctx_free(noit_connection_ctx_t *ctx) { 130 214 if(ctx->remote_cn) free(ctx->remote_cn); … … 136 220 ctx->consumer_free(ctx->consumer_ctx); 137 221 free(ctx); 222 } 223 void 224 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) { 225 if(noit_atomic_dec32(&ctx->refcnt) == 0) 226 noit_connection_ctx_free(ctx); 227 } 228 void 229 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) { 230 noit_connection_ctx_t **pctx = &ctx; 231 pthread_mutex_lock(&noits_lock); 232 noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx), 233 free, (void (*)(void *))noit_connection_ctx_deref); 234 pthread_mutex_unlock(&noits_lock); 138 235 } 139 236 void … … 159 256 * if(len == 0) return ctx->bytes_read; 160 257 */ 258 ctx->total_bytes_read += len; 161 259 ctx->bytes_read += len; 162 260 } … … 266 364 ctx->buffer = NULL; 267 365 ctx->count--; 366 ctx->total_events++; 268 367 if(ctx->count == 0) { 269 368 eventer_t completion_e; … … 280 379 break; 281 380 381 case JLOG_STREAMER_IS_ASYNC: 382 ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */ 282 383 case JLOG_STREAMER_WANT_CHKPT: 283 384 noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n", … … 397 498 if(!sslctx) goto connect_error; 398 499 500 memcpy(&nctx->last_connect, now, sizeof(*now)); 399 501 eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert, 400 502 nctx->sslconfig); … … 409 511 int rv, fd = -1; 410 512 513 if(nctx->wants_permanent_shutdown) { 514 noit_connection_ctx_dealloc(nctx); 515 return; 516 } 411 517 /* Open a socket */ 412 518 fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0); … … 572 678 } 573 679 680 static int 681 stratcon_console_show_noits(noit_console_closure_t ncct, 682 int argc, char **argv, 683 noit_console_state_t *dstate, 684 void *closure) { 685 noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 686 uuid_t key_id; 687 int klen, n = 0, i; 688 void *vconn; 689 noit_connection_ctx_t **ctx; 690 691 pthread_mutex_lock(&noits_lock); 692 ctx = malloc(sizeof(*ctx) * noits.size); 693 while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen, 694 &vconn)) { 695 ctx[n] = (noit_connection_ctx_t *)vconn; 696 noit_atomic_inc32(&ctx[n]->refcnt); 697 n++; 698 } 699 pthread_mutex_unlock(&noits_lock); 700 qsort(ctx, n, sizeof(*ctx), remote_str_sort); 701 for(i=0; i<n; i++) { 702 nc_print_noit_conn_brief(ncct, ctx[i]); 703 noit_connection_ctx_deref(ctx[i]); 704 } 705 return 0; 706 } 707 708 static void 709 register_console_streamer_commands() { 710 noit_console_state_t *tl; 711 cmd_info_t *showcmd; 712 713 tl = noit_console_state_initial(); 714 showcmd = noit_console_state_get_cmd(tl, "show"); 715 assert(showcmd && showcmd->dstate); 716 717 noit_console_state_add_cmd(showcmd->dstate, 718 NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL)); 719 } 720 574 721 void 575 722 stratcon_jlog_streamer_init(const char *toplevel) { 723 pthread_mutex_init(&noits_lock, NULL); 576 724 eventer_name_callback("noit_connection_reinitiate", 577 725 noit_connection_reinitiate); … … 582 730 eventer_name_callback("noit_connection_complete_connect", 583 731 noit_connection_complete_connect); 732 register_console_streamer_commands(); 584 733 stratcon_jlog_streamer_reload(toplevel); 585 734 } src/stratcon_jlog_streamer.h
r9488f45 r48e115a 35 35 36 36 #include "noit_conf.h" 37 #include "utils/noit_atomic.h" 37 38 #include "jlog/jlog.h" 38 39 #include <netinet/in.h> … … 42 43 43 44 typedef struct noit_connection_ctx_t { 45 noit_atomic32_t refcnt; 44 46 union { 45 47 struct sockaddr remote; … … 53 55 u_int32_t current_backoff; 54 56 int wants_shutdown; 57 int wants_permanent_shutdown; 55 58 noit_hash_table *config; 56 59 noit_hash_table *sslconfig; 60 struct timeval last_connect; 57 61 eventer_t timeout_event; 58 62 … … 73 77 JLOG_STREAMER_WANT_HEADER = 2, 74 78 JLOG_STREAMER_WANT_BODY = 3, 75 JLOG_STREAMER_WANT_CHKPT = 4, 79 JLOG_STREAMER_IS_ASYNC = 4, 80 JLOG_STREAMER_WANT_CHKPT = 5, 76 81 } state; 77 82 int count; /* Number of jlog messages we need to read */ … … 82 87 u_int32_t message_len; 83 88 } header; 89 90 u_int64_t total_events; 91 u_int64_t total_bytes_read; 84 92 85 93 void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *);