Changeset 48e115ad8158970b1f83732d0301d0289f42c7fb

Show
Ignore:
Timestamp:
09/09/09 04:36:13 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1252470973 +0000
git-parent:

[99a21c2049fcefe65568396291a4f1fef7f1a547]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1252470973 +0000
Message:

first stab at noit connection metrics, refs #170

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon_jlog_streamer.c

    r23cfce8 r48e115a  
    5252#include <arpa/inet.h> 
    5353 
     54pthread_mutex_t noits_lock; 
    5455noit_hash_table noits = NOIT_HASH_EMPTY; 
    5556 
    5657static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx); 
     58 
     59static int 
     60remote_str_sort(const void *a, const void *b) { 
     61  noit_connection_ctx_t * const *actx = a; 
     62  noit_connection_ctx_t * const *bctx = b; 
     63  return strcmp((*actx)->remote_str, (*bctx)->remote_str); 
     64} 
     65static void 
     66nc_print_noit_conn_brief(noit_console_closure_t ncct, 
     67                          noit_connection_ctx_t *ctx) { 
     68  struct timeval now, diff, session_duration; 
     69  gettimeofday(&now, NULL); 
     70  const char *lasttime = "never"; 
     71  if(ctx->last_connect.tv_sec != 0) { 
     72    char cmdbuf[4096]; 
     73    time_t r = ctx->last_connect.tv_sec; 
     74    struct tm tbuf, *tm; 
     75    tm = gmtime_r(&r, &tbuf); 
     76    strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm); 
     77    lasttime = cmdbuf; 
     78  } 
     79  nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str, 
     80            ctx->timeout_event ? "disconnected" : "connected", lasttime); 
     81  if(ctx->timeout_event) { 
     82    sub_timeval(now, ctx->timeout_event->whence, &diff); 
     83    nc_printf(ncct, "\tNext attempet in %llu.%06us\n", diff.tv_sec, diff.tv_usec); 
     84  } 
     85  else { 
     86    nc_printf(ncct, "\tRemote CN: '%s'\n", 
     87              ctx->remote_cn ? ctx->remote_cn : "???"); 
     88    if(ctx->consumer_callback == stratcon_jlog_recv_handler) { 
     89      jlog_streamer_ctx_t *jctx = ctx->consumer_ctx; 
     90      struct timeval last; 
     91      double session_duration_seconds; 
     92      const char *feedtype = "unknown"; 
     93      const char *state = "unknown"; 
     94 
     95      switch(ntohl(jctx->jlog_feed_cmd)) { 
     96        case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break; 
     97        case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break; 
     98      } 
     99      switch(jctx->state) { 
     100        case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break; 
     101        case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break; 
     102        case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break; 
     103        case JLOG_STREAMER_WANT_BODY: state = "reading body"; break; 
     104        case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break; 
     105        case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break; 
     106      } 
     107      last.tv_sec = jctx->header.tv_sec; 
     108      last.tv_usec = jctx->header.tv_usec; 
     109      sub_timeval(now, last, &diff); 
     110      sub_timeval(now, ctx->last_connect, &session_duration); 
     111      session_duration_seconds = session_duration.tv_sec + 
     112                                 (double)session_duration.tv_usec/1000000.0; 
     113      nc_printf(ncct, "\tJLog event streamer [%s]\n\tState: %s\n" 
     114                      "\tNext checkpoint: [%08x:%08x]\n" 
     115                      "\tLast event: %llu.%06us ago\n" 
     116                      "\tEvents this session: %llu (%0.2f/s)\n" 
     117                      "\tOctets this session: %llu (%0.2f/s)\n", 
     118                feedtype, state, 
     119                jctx->header.chkpt.log, jctx->header.chkpt.marker, 
     120                diff.tv_sec, diff.tv_usec, 
     121                jctx->total_events, 
     122                (double)jctx->total_events/session_duration_seconds, 
     123                jctx->total_bytes_read, 
     124                (double)jctx->total_bytes_read/session_duration_seconds); 
     125    } 
     126    else { 
     127      nc_printf(ncct, "\tUnknown type.\n"); 
     128    } 
     129  } 
     130} 
    57131 
    58132jlog_streamer_ctx_t * 
     
    72146noit_connection_ctx_t * 
    73147noit_connection_ctx_alloc(void) { 
    74   noit_connection_ctx_t *ctx
     148  noit_connection_ctx_t *ctx, **pctx
    75149  ctx = calloc(1, sizeof(*ctx)); 
     150  ctx->refcnt = 1; 
     151  pctx = malloc(sizeof(*pctx)); 
     152  *pctx = ctx; 
     153  pthread_mutex_lock(&noits_lock); 
     154  noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx); 
     155  pthread_mutex_unlock(&noits_lock); 
    76156  return ctx; 
    77157} 
     
    90170  const char *v; 
    91171  u_int32_t min_interval = 1000, max_interval = 8000; 
     172  if(ctx->remote_cn) { 
     173    free(ctx->remote_cn); 
     174    ctx->remote_cn = NULL; 
     175  } 
    92176  if(noit_hash_retr_str(ctx->config, 
    93177                        "reconnect_initial_interval", 
     
    126210  eventer_add(ctx->timeout_event); 
    127211} 
    128 void 
     212static void 
    129213noit_connection_ctx_free(noit_connection_ctx_t *ctx) { 
    130214  if(ctx->remote_cn) free(ctx->remote_cn); 
     
    136220  ctx->consumer_free(ctx->consumer_ctx); 
    137221  free(ctx); 
     222} 
     223void 
     224noit_connection_ctx_deref(noit_connection_ctx_t *ctx) { 
     225  if(noit_atomic_dec32(&ctx->refcnt) == 0) 
     226    noit_connection_ctx_free(ctx); 
     227} 
     228void 
     229noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) { 
     230  noit_connection_ctx_t **pctx = &ctx; 
     231  pthread_mutex_lock(&noits_lock); 
     232  noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx), 
     233                   free, (void (*)(void *))noit_connection_ctx_deref); 
     234  pthread_mutex_unlock(&noits_lock); 
    138235} 
    139236void 
     
    159256     * if(len == 0) return ctx->bytes_read; 
    160257     */ 
     258    ctx->total_bytes_read += len; 
    161259    ctx->bytes_read += len; 
    162260  } 
     
    266364        ctx->buffer = NULL; 
    267365        ctx->count--; 
     366        ctx->total_events++; 
    268367        if(ctx->count == 0) { 
    269368          eventer_t completion_e; 
     
    280379        break; 
    281380 
     381      case JLOG_STREAMER_IS_ASYNC: 
     382        ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */ 
    282383      case JLOG_STREAMER_WANT_CHKPT: 
    283384        noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n", 
     
    397498  if(!sslctx) goto connect_error; 
    398499 
     500  memcpy(&nctx->last_connect, now, sizeof(*now)); 
    399501  eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert, 
    400502                             nctx->sslconfig); 
     
    409511  int rv, fd = -1; 
    410512 
     513  if(nctx->wants_permanent_shutdown) { 
     514    noit_connection_ctx_dealloc(nctx); 
     515    return; 
     516  } 
    411517  /* Open a socket */ 
    412518  fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0); 
     
    572678} 
    573679 
     680static int 
     681stratcon_console_show_noits(noit_console_closure_t ncct, 
     682                            int argc, char **argv, 
     683                            noit_console_state_t *dstate, 
     684                            void *closure) { 
     685  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     686  uuid_t key_id; 
     687  int klen, n = 0, i; 
     688  void *vconn; 
     689  noit_connection_ctx_t **ctx; 
     690 
     691  pthread_mutex_lock(&noits_lock); 
     692  ctx = malloc(sizeof(*ctx) * noits.size); 
     693  while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen, 
     694                       &vconn)) { 
     695    ctx[n] = (noit_connection_ctx_t *)vconn; 
     696    noit_atomic_inc32(&ctx[n]->refcnt); 
     697    n++; 
     698  } 
     699  pthread_mutex_unlock(&noits_lock); 
     700  qsort(ctx, n, sizeof(*ctx), remote_str_sort); 
     701  for(i=0; i<n; i++) { 
     702    nc_print_noit_conn_brief(ncct, ctx[i]); 
     703    noit_connection_ctx_deref(ctx[i]); 
     704  } 
     705  return 0; 
     706} 
     707 
     708static void 
     709register_console_streamer_commands() { 
     710  noit_console_state_t *tl; 
     711  cmd_info_t *showcmd; 
     712 
     713  tl = noit_console_state_initial(); 
     714  showcmd = noit_console_state_get_cmd(tl, "show"); 
     715  assert(showcmd && showcmd->dstate); 
     716 
     717  noit_console_state_add_cmd(showcmd->dstate, 
     718    NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL)); 
     719} 
     720 
    574721void 
    575722stratcon_jlog_streamer_init(const char *toplevel) { 
     723  pthread_mutex_init(&noits_lock, NULL); 
    576724  eventer_name_callback("noit_connection_reinitiate", 
    577725                        noit_connection_reinitiate); 
     
    582730  eventer_name_callback("noit_connection_complete_connect", 
    583731                        noit_connection_complete_connect); 
     732  register_console_streamer_commands(); 
    584733  stratcon_jlog_streamer_reload(toplevel); 
    585734} 
  • src/stratcon_jlog_streamer.h

    r9488f45 r48e115a  
    3535 
    3636#include "noit_conf.h" 
     37#include "utils/noit_atomic.h" 
    3738#include "jlog/jlog.h" 
    3839#include <netinet/in.h> 
     
    4243 
    4344typedef struct noit_connection_ctx_t { 
     45  noit_atomic32_t refcnt; 
    4446  union { 
    4547    struct sockaddr remote; 
     
    5355  u_int32_t current_backoff; 
    5456  int wants_shutdown; 
     57  int wants_permanent_shutdown; 
    5558  noit_hash_table *config; 
    5659  noit_hash_table *sslconfig; 
     60  struct timeval last_connect; 
    5761  eventer_t timeout_event; 
    5862 
     
    7377    JLOG_STREAMER_WANT_HEADER = 2, 
    7478    JLOG_STREAMER_WANT_BODY = 3, 
    75     JLOG_STREAMER_WANT_CHKPT = 4, 
     79    JLOG_STREAMER_IS_ASYNC = 4, 
     80    JLOG_STREAMER_WANT_CHKPT = 5, 
    7681  } state; 
    7782  int count;            /* Number of jlog messages we need to read */ 
     
    8287    u_int32_t message_len; 
    8388  } header; 
     89 
     90  u_int64_t total_events; 
     91  u_int64_t total_bytes_read; 
    8492 
    8593  void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *);