[Reconnoiter-devel] [reconnoiter commit] r819 - trunk/src

svn-commit at lists.omniti.com svn-commit at lists.omniti.com
Wed Sep 9 00:36:13 EDT 2009


Author: jesus
Date: 2009-09-09 00:36:13 -0400 (Wed, 09 Sep 2009)
New Revision: 819

Modified:
   trunk/src/stratcon_jlog_streamer.c
   trunk/src/stratcon_jlog_streamer.h
Log:
first stab at noit connection metrics, refs #170

Modified: trunk/src/stratcon_jlog_streamer.c
===================================================================
--- trunk/src/stratcon_jlog_streamer.c	2009-09-09 04:12:49 UTC (rev 818)
+++ trunk/src/stratcon_jlog_streamer.c	2009-09-09 04:36:13 UTC (rev 819)
@@ -51,10 +51,84 @@
 #include <sys/un.h>
 #include <arpa/inet.h>
 
+pthread_mutex_t noits_lock;
 noit_hash_table noits = NOIT_HASH_EMPTY;
 
 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
 
+static int
+remote_str_sort(const void *a, const void *b) {
+  noit_connection_ctx_t * const *actx = a;
+  noit_connection_ctx_t * const *bctx = b;
+  return strcmp((*actx)->remote_str, (*bctx)->remote_str);
+}
+static void
+nc_print_noit_conn_brief(noit_console_closure_t ncct,
+                          noit_connection_ctx_t *ctx) {
+  struct timeval now, diff, session_duration;
+  gettimeofday(&now, NULL);
+  const char *lasttime = "never";
+  if(ctx->last_connect.tv_sec != 0) {
+    char cmdbuf[4096];
+    time_t r = ctx->last_connect.tv_sec;
+    struct tm tbuf, *tm;
+    tm = gmtime_r(&r, &tbuf);
+    strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
+    lasttime = cmdbuf;
+  }
+  nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
+            ctx->timeout_event ? "disconnected" : "connected", lasttime);
+  if(ctx->timeout_event) {
+    sub_timeval(now, ctx->timeout_event->whence, &diff);
+    nc_printf(ncct, "\tNext attempet in %llu.%06us\n", diff.tv_sec, diff.tv_usec);
+  }
+  else {
+    nc_printf(ncct, "\tRemote CN: '%s'\n",
+              ctx->remote_cn ? ctx->remote_cn : "???");
+    if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
+      jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
+      struct timeval last;
+      double session_duration_seconds;
+      const char *feedtype = "unknown";
+      const char *state = "unknown";
+
+      switch(ntohl(jctx->jlog_feed_cmd)) {
+        case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
+        case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
+      }
+      switch(jctx->state) {
+        case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
+        case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
+        case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
+        case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
+        case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
+        case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
+      }
+      last.tv_sec = jctx->header.tv_sec;
+      last.tv_usec = jctx->header.tv_usec;
+      sub_timeval(now, last, &diff);
+      sub_timeval(now, ctx->last_connect, &session_duration);
+      session_duration_seconds = session_duration.tv_sec +
+                                 (double)session_duration.tv_usec/1000000.0;
+      nc_printf(ncct, "\tJLog event streamer [%s]\n\tState: %s\n"
+                      "\tNext checkpoint: [%08x:%08x]\n"
+                      "\tLast event: %llu.%06us ago\n"
+                      "\tEvents this session: %llu (%0.2f/s)\n"
+                      "\tOctets this session: %llu (%0.2f/s)\n",
+                feedtype, state,
+                jctx->header.chkpt.log, jctx->header.chkpt.marker,
+                diff.tv_sec, diff.tv_usec,
+                jctx->total_events,
+                (double)jctx->total_events/session_duration_seconds,
+                jctx->total_bytes_read,
+                (double)jctx->total_bytes_read/session_duration_seconds);
+    }
+    else {
+      nc_printf(ncct, "\tUnknown type.\n");
+    }
+  }
+}
+
 jlog_streamer_ctx_t *
 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
   jlog_streamer_ctx_t *ctx;
@@ -71,8 +145,14 @@
 }
 noit_connection_ctx_t *
 noit_connection_ctx_alloc(void) {
-  noit_connection_ctx_t *ctx;
+  noit_connection_ctx_t *ctx, **pctx;
   ctx = calloc(1, sizeof(*ctx));
+  ctx->refcnt = 1;
+  pctx = malloc(sizeof(*pctx));
+  *pctx = ctx;
+  pthread_mutex_lock(&noits_lock);
+  noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
+  pthread_mutex_unlock(&noits_lock);
   return ctx;
 }
 int
@@ -89,6 +169,10 @@
   struct timeval __now, interval;
   const char *v;
   u_int32_t min_interval = 1000, max_interval = 8000;
+  if(ctx->remote_cn) {
+    free(ctx->remote_cn);
+    ctx->remote_cn = NULL;
+  }
   if(noit_hash_retr_str(ctx->config,
                         "reconnect_initial_interval",
                         strlen("reconnect_initial_interval"),
@@ -125,7 +209,7 @@
   add_timeval(*now, interval, &ctx->timeout_event->whence);
   eventer_add(ctx->timeout_event);
 }
-void
+static void
 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
   if(ctx->remote_cn) free(ctx->remote_cn);
   if(ctx->remote_str) free(ctx->remote_str);
@@ -137,6 +221,19 @@
   free(ctx);
 }
 void
+noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
+  if(noit_atomic_dec32(&ctx->refcnt) == 0)
+    noit_connection_ctx_free(ctx);
+}
+void
+noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
+  noit_connection_ctx_t **pctx = &ctx;
+  pthread_mutex_lock(&noits_lock);
+  noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
+                   free, (void (*)(void *))noit_connection_ctx_deref);
+  pthread_mutex_unlock(&noits_lock);
+}
+void
 jlog_streamer_ctx_free(void *cl) {
   jlog_streamer_ctx_t *ctx = cl;
   if(ctx->buffer) free(ctx->buffer);
@@ -158,6 +255,7 @@
      * will actually get a -1 here.
      * if(len == 0) return ctx->bytes_read;
      */
+    ctx->total_bytes_read += len;
     ctx->bytes_read += len;
   }
   assert(ctx->bytes_read == ctx->bytes_expected);
@@ -265,6 +363,7 @@
         /* Don't free the buffer, it's used by the datastore process. */
         ctx->buffer = NULL;
         ctx->count--;
+        ctx->total_events++;
         if(ctx->count == 0) {
           eventer_t completion_e;
           eventer_remove_fd(e->fd);
@@ -279,6 +378,8 @@
           ctx->state = JLOG_STREAMER_WANT_HEADER;
         break;
 
+      case JLOG_STREAMER_IS_ASYNC:
+        ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
       case JLOG_STREAMER_WANT_CHKPT:
         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
               ctx->header.chkpt.log, ctx->header.chkpt.marker);
@@ -396,6 +497,7 @@
   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
   if(!sslctx) goto connect_error;
 
+  memcpy(&nctx->last_connect, now, sizeof(*now));
   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
                              nctx->sslconfig);
   EVENTER_ATTACH_SSL(e, sslctx);
@@ -408,6 +510,10 @@
   eventer_t e;
   int rv, fd = -1;
 
+  if(nctx->wants_permanent_shutdown) {
+    noit_connection_ctx_dealloc(nctx);
+    return;
+  }
   /* Open a socket */
   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
   if(fd < 0) goto reschedule;
@@ -571,8 +677,50 @@
                                jlog_streamer_ctx_free);
 }
 
+static int
+stratcon_console_show_noits(noit_console_closure_t ncct,
+                            int argc, char **argv,
+                            noit_console_state_t *dstate,
+                            void *closure) {
+  noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
+  uuid_t key_id;
+  int klen, n = 0, i;
+  void *vconn;
+  noit_connection_ctx_t **ctx;
+
+  pthread_mutex_lock(&noits_lock);
+  ctx = malloc(sizeof(*ctx) * noits.size);
+  while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
+                       &vconn)) {
+    ctx[n] = (noit_connection_ctx_t *)vconn;
+    noit_atomic_inc32(&ctx[n]->refcnt);
+    n++;
+  }
+  pthread_mutex_unlock(&noits_lock);
+  qsort(ctx, n, sizeof(*ctx), remote_str_sort);
+  for(i=0; i<n; i++) {
+    nc_print_noit_conn_brief(ncct, ctx[i]);
+    noit_connection_ctx_deref(ctx[i]);
+  }
+  return 0;
+}
+
+static void
+register_console_streamer_commands() {
+  noit_console_state_t *tl;
+  cmd_info_t *showcmd;
+
+  tl = noit_console_state_initial();
+  showcmd = noit_console_state_get_cmd(tl, "show");
+  assert(showcmd && showcmd->dstate);
+
+  noit_console_state_add_cmd(showcmd->dstate,
+    NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
+}
+
 void
 stratcon_jlog_streamer_init(const char *toplevel) {
+  pthread_mutex_init(&noits_lock, NULL);
   eventer_name_callback("noit_connection_reinitiate",
                         noit_connection_reinitiate);
   eventer_name_callback("stratcon_jlog_recv_handler",
@@ -581,5 +729,6 @@
                         noit_connection_ssl_upgrade);
   eventer_name_callback("noit_connection_complete_connect",
                         noit_connection_complete_connect);
+  register_console_streamer_commands();
   stratcon_jlog_streamer_reload(toplevel);
 }

Modified: trunk/src/stratcon_jlog_streamer.h
===================================================================
--- trunk/src/stratcon_jlog_streamer.h	2009-09-09 04:12:49 UTC (rev 818)
+++ trunk/src/stratcon_jlog_streamer.h	2009-09-09 04:36:13 UTC (rev 819)
@@ -34,6 +34,7 @@
 #define _STRATCON_LOG_STREAMER_H
 
 #include "noit_conf.h"
+#include "utils/noit_atomic.h"
 #include "jlog/jlog.h"
 #include <netinet/in.h>
 #include <sys/un.h>
@@ -41,6 +42,7 @@
 #include "stratcon_datastore.h"
 
 typedef struct noit_connection_ctx_t {
+  noit_atomic32_t refcnt;
   union {
     struct sockaddr remote;
     struct sockaddr_un remote_un;
@@ -52,8 +54,10 @@
   char *remote_cn;
   u_int32_t current_backoff;
   int wants_shutdown;
+  int wants_permanent_shutdown;
   noit_hash_table *config;
   noit_hash_table *sslconfig;
+  struct timeval last_connect;
   eventer_t timeout_event;
 
   eventer_func_t consumer_callback;
@@ -72,7 +76,8 @@
     JLOG_STREAMER_WANT_COUNT = 1,
     JLOG_STREAMER_WANT_HEADER = 2,
     JLOG_STREAMER_WANT_BODY = 3,
-    JLOG_STREAMER_WANT_CHKPT = 4,
+    JLOG_STREAMER_IS_ASYNC = 4,
+    JLOG_STREAMER_WANT_CHKPT = 5,
   } state;
   int count;            /* Number of jlog messages we need to read */
   struct {
@@ -82,6 +87,9 @@
     u_int32_t message_len;
   } header;
 
+  u_int64_t total_events;
+  u_int64_t total_bytes_read;
+
   void (*push)(stratcon_datastore_op_t, struct sockaddr *, void *);
 } jlog_streamer_ctx_t;
 



More information about the Reconnoiter-devel mailing list