root/src/stratcon_jlog_streamer.c

Revision bf14ffcb68f78a1f42d549092e41a543deb1f387, 53.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

quiet this down

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "utils/noit_getip.h"
39 #include "noit_jlog_listener.h"
40 #include "noit_rest.h"
41 #include "stratcon_datastore.h"
42 #include "stratcon_jlog_streamer.h"
43 #include "stratcon_iep.h"
44
45 #include <unistd.h>
46 #include <assert.h>
47 #include <errno.h>
48 #include <sys/types.h>
49 #include <sys/socket.h>
50 #ifdef HAVE_SYS_FILIO_H
51 #include <sys/filio.h>
52 #endif
53 #include <netinet/in.h>
54 #include <sys/un.h>
55 #include <arpa/inet.h>
56
57 pthread_mutex_t noits_lock;
58 noit_hash_table noits = NOIT_HASH_EMPTY;
59 pthread_mutex_t noit_ip_by_cn_lock;
60 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
61 static uuid_t self_stratcon_id;
62 static char self_stratcon_hostname[256] = "\0";
63 static struct sockaddr_in self_stratcon_ip;
64
65 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
66
67 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
68
69 static const char *feed_type_to_str(int jlog_feed_cmd) {
70   switch(jlog_feed_cmd) {
71     case NOIT_JLOG_DATA_FEED: return "durable/storage";
72     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
73   }
74   return "unknown";
75 }
76
77 static int
78 remote_str_sort(const void *a, const void *b) {
79   int rv;
80   noit_connection_ctx_t * const *actx = a;
81   noit_connection_ctx_t * const *bctx = b;
82   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
83   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
84   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
85   if(rv) return rv;
86   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
87            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
88 }
89 static void
90 nc_print_noit_conn_brief(noit_console_closure_t ncct,
91                           noit_connection_ctx_t *ctx) {
92   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
93   struct timeval now, diff, session_duration;
94   const char *feedtype = "unknown";
95   const char *lasttime = "never";
96   if(ctx->last_connect.tv_sec != 0) {
97     char cmdbuf[4096];
98     time_t r = ctx->last_connect.tv_sec;
99     struct tm tbuf, *tm;
100     tm = gmtime_r(&r, &tbuf);
101     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
102     lasttime = cmdbuf;
103   }
104   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
105             ctx->remote_cn ? "connected" :
106                              (ctx->retry_event ? "disconnected" :
107                                                    "connecting"), lasttime);
108   if(ctx->e) {
109     char buff[128];
110     const char *addrstr = NULL;
111     struct sockaddr_in6 addr6;
112     socklen_t len = sizeof(addr6);
113     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
114       unsigned short port = 0;
115       if(addr6.sin6_family == AF_INET) {
116         addrstr = inet_ntop(addr6.sin6_family,
117                             &((struct sockaddr_in *)&addr6)->sin_addr,
118                             buff, sizeof(buff));
119         port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
120       }
121       else if(addr6.sin6_family == AF_INET6) {
122         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
123                             buff, sizeof(buff));
124         port = ntohs(addr6.sin6_port);
125       }
126       if(addrstr != NULL)
127         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
128       else
129         nc_printf(ncct, "\tLocal address not interpretable\n");
130     }
131     else {
132       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
133                 ctx->e->fd, strerror(errno));
134     }
135   }
136   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
137   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
138   gettimeofday(&now, NULL);
139   if(ctx->retry_event) {
140     sub_timeval(ctx->retry_event->whence, now, &diff);
141     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
142               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
143   }
144   else if(ctx->remote_cn) {
145     nc_printf(ncct, "\tRemote CN: '%s'\n",
146               ctx->remote_cn ? ctx->remote_cn : "???");
147     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
148       struct timeval last;
149       double session_duration_seconds;
150       const char *state = "unknown";
151
152       switch(jctx->state) {
153         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
154         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
155         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
156         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
157         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
158         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
159         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
160       }
161       last.tv_sec = jctx->header.tv_sec;
162       last.tv_usec = jctx->header.tv_usec;
163       sub_timeval(now, last, &diff);
164       sub_timeval(now, ctx->last_connect, &session_duration);
165       session_duration_seconds = session_duration.tv_sec +
166                                  (double)session_duration.tv_usec/1000000.0;
167       nc_printf(ncct, "\tState: %s\n"
168                       "\tNext checkpoint: [%08x:%08x]\n"
169                       "\tLast event: %lld.%06us ago\n"
170                       "\tEvents this session: %llu (%0.2f/s)\n"
171                       "\tOctets this session: %llu (%0.2f/s)\n",
172                 state,
173                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
174                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
175                 jctx->total_events,
176                 (double)jctx->total_events/session_duration_seconds,
177                 jctx->total_bytes_read,
178                 (double)jctx->total_bytes_read/session_duration_seconds);
179     }
180     else {
181       nc_printf(ncct, "\tUnknown type.\n");
182     }
183   }
184 }
185
186 jlog_streamer_ctx_t *
187 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
188   jlog_streamer_ctx_t *ctx;
189   ctx = stratcon_jlog_streamer_ctx_alloc();
190   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
191   ctx->push = stratcon_datastore_push;
192   return ctx;
193 }
194 jlog_streamer_ctx_t *
195 stratcon_jlog_streamer_ctx_alloc(void) {
196   jlog_streamer_ctx_t *ctx;
197   ctx = calloc(1, sizeof(*ctx));
198   return ctx;
199 }
200 noit_connection_ctx_t *
201 noit_connection_ctx_alloc(void) {
202   noit_connection_ctx_t *ctx, **pctx;
203   ctx = calloc(1, sizeof(*ctx));
204   ctx->refcnt = 1;
205   pctx = malloc(sizeof(*pctx));
206   *pctx = ctx;
207   pthread_mutex_lock(&noits_lock);
208   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
209   pthread_mutex_unlock(&noits_lock);
210   return ctx;
211 }
212 int
213 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
214                          struct timeval *now) {
215   noit_connection_ctx_t *ctx = closure;
216   ctx->retry_event = NULL;
217   noit_connection_initiate_connection(closure);
218   return 0;
219 }
220 void
221 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
222                                    struct timeval *now) {
223   struct timeval __now, interval;
224   const char *v;
225   u_int32_t min_interval = 1000, max_interval = 8000;
226
227   noit_connection_disable_timeout(ctx);
228   if(ctx->remote_cn) {
229     free(ctx->remote_cn);
230     ctx->remote_cn = NULL;
231   }
232   if(noit_hash_retr_str(ctx->config,
233                         "reconnect_initial_interval",
234                         strlen("reconnect_initial_interval"),
235                         &v)) {
236     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
237   }
238   if(noit_hash_retr_str(ctx->config,
239                         "reconnect_maximum_interval",
240                         strlen("reconnect_maximum_interval"),
241                         &v)) {
242     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
243   }
244   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
245   else {
246     ctx->current_backoff *= 2;
247     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
248     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
249   }
250   if(!now) {
251     gettimeofday(&__now, NULL);
252     now = &__now;
253   }
254   interval.tv_sec = ctx->current_backoff / 1000;
255   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
256   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
257         ctx->current_backoff);
258   if(ctx->retry_event)
259     eventer_remove(ctx->retry_event);
260   else
261     ctx->retry_event = eventer_alloc();
262   ctx->retry_event->callback = noit_connection_reinitiate;
263   ctx->retry_event->closure = ctx;
264   ctx->retry_event->mask = EVENTER_TIMER;
265   add_timeval(*now, interval, &ctx->retry_event->whence);
266   eventer_add(ctx->retry_event);
267 }
268 static void
269 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
270   if(ctx->remote_cn) free(ctx->remote_cn);
271   if(ctx->remote_str) free(ctx->remote_str);
272   if(ctx->retry_event) {
273     eventer_remove(ctx->retry_event);
274     eventer_free(ctx->retry_event);
275   }
276   if(ctx->timeout_event) {
277     eventer_remove(ctx->timeout_event);
278     eventer_free(ctx->timeout_event);
279   }
280   ctx->consumer_free(ctx->consumer_ctx);
281   free(ctx);
282 }
283 void
284 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
285   if(noit_atomic_dec32(&ctx->refcnt) == 0)
286     noit_connection_ctx_free(ctx);
287 }
288 void
289 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
290   noit_connection_ctx_t **pctx = &ctx;
291   pthread_mutex_lock(&noits_lock);
292   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
293                    free, (void (*)(void *))noit_connection_ctx_deref);
294   pthread_mutex_unlock(&noits_lock);
295 }
296 void
297 jlog_streamer_ctx_free(void *cl) {
298   jlog_streamer_ctx_t *ctx = cl;
299   if(ctx->buffer) free(ctx->buffer);
300   free(ctx);
301 }
302
303 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
304 static int
305 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
306   int len, mask;
307   while(ctx->bytes_read < ctx->bytes_expected) {
308     len = Eread(ctx->buffer + ctx->bytes_read,
309                 ctx->bytes_expected - ctx->bytes_read);
310     if(len < 0) {
311       *newmask = mask;
312       return -1;
313     }
314     /* if we get 0 inside SSL, and there was a real error, we
315      * will actually get a -1 here.
316      * if(len == 0) return ctx->bytes_read;
317      */
318     ctx->total_bytes_read += len;
319     ctx->bytes_read += len;
320   }
321   assert(ctx->bytes_read == ctx->bytes_expected);
322   return ctx->bytes_read;
323 }
324 #define FULLREAD(e,ctx,size) do { \
325   int mask, len; \
326   if(!ctx->bytes_expected) { \
327     ctx->bytes_expected = size; \
328     if(ctx->buffer) free(ctx->buffer); \
329     ctx->buffer = malloc(size + 1); \
330     if(ctx->buffer == NULL) { \
331       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
332       goto socket_error; \
333     } \
334     ctx->buffer[size] = '\0'; \
335   } \
336   len = __read_on_ctx(e, ctx, &mask); \
337   if(len < 0) { \
338     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
339     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
340     goto socket_error; \
341   } \
342   ctx->bytes_read = 0; \
343   ctx->bytes_expected = 0; \
344   if(len != size) { \
345     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
346           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
347     goto socket_error; \
348   } \
349 } while(0)
350
351 int
352 noit_connection_session_timeout(eventer_t e, int mask, void *closure,
353                                 struct timeval *now) {
354   noit_connection_ctx_t *nctx = closure;
355   eventer_t fde = nctx->e;
356   nctx->timeout_event = NULL;
357   noitL(noit_error, "Timing out jlog session: %s\n",
358         nctx->remote_cn ? nctx->remote_cn : "(null)");
359   if(fde)
360     eventer_trigger(fde, EVENTER_EXCEPTION);
361   return 0;
362 }
363 int
364 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
365                            struct timeval *now) {
366   noit_connection_ctx_t *nctx = closure;
367   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
368   jlog_streamer_ctx_t dummy;
369   int len;
370   jlog_id n_chkpt;
371
372   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
373     if(write(e->fd, e, 0) == -1)
374       noitL(noit_error, "socket error: %s\n", strerror(errno));
375  socket_error:
376     ctx->state = JLOG_STREAMER_WANT_INITIATE;
377     ctx->count = 0;
378     ctx->needs_chkpt = 0;
379     ctx->bytes_read = 0;
380     ctx->bytes_expected = 0;
381     if(ctx->buffer) free(ctx->buffer);
382     ctx->buffer = NULL;
383     noit_connection_schedule_reattempt(nctx, now);
384     eventer_remove_fd(e->fd);
385     nctx->e = NULL;
386     e->opset->close(e->fd, &mask, e);
387     return 0;
388   }
389
390   noit_connection_update_timeout(nctx);
391   while(1) {
392     switch(ctx->state) {
393       case JLOG_STREAMER_WANT_INITIATE:
394         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
395                               sizeof(ctx->jlog_feed_cmd),
396                               &mask, e);
397         if(len < 0) {
398           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
399           goto socket_error;
400         }
401         if(len != sizeof(ctx->jlog_feed_cmd)) {
402           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
403                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
404           goto socket_error;
405         }
406         ctx->state = JLOG_STREAMER_WANT_COUNT;
407         break;
408
409       case JLOG_STREAMER_WANT_ERROR:
410         FULLREAD(e, ctx, 0 - ctx->count);
411         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
412               0 - ctx->count, ctx->buffer);
413         free(ctx->buffer); ctx->buffer = NULL;
414         goto socket_error;
415         break;
416
417       case JLOG_STREAMER_WANT_COUNT:
418         FULLREAD(e, ctx, sizeof(u_int32_t));
419         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
420         ctx->count = ntohl(dummy.count);
421         ctx->needs_chkpt = 0;
422         free(ctx->buffer); ctx->buffer = NULL;
423         if(ctx->count < 0)
424           ctx->state = JLOG_STREAMER_WANT_ERROR;
425         else
426           ctx->state = JLOG_STREAMER_WANT_HEADER;
427         break;
428
429       case JLOG_STREAMER_WANT_HEADER:
430         if(ctx->count == 0) {
431           ctx->state = JLOG_STREAMER_WANT_COUNT;
432           break;
433         }
434         FULLREAD(e, ctx, sizeof(ctx->header));
435         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
436         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
437         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
438         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
439         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
440         ctx->header.message_len = ntohl(dummy.header.message_len);
441         free(ctx->buffer); ctx->buffer = NULL;
442         ctx->state = JLOG_STREAMER_WANT_BODY;
443         break;
444
445       case JLOG_STREAMER_WANT_BODY:
446         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
447         if(ctx->header.message_len > 0) {
448           ctx->needs_chkpt = 1;
449           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
450                     ctx->buffer, NULL);
451         }
452         else if(ctx->buffer)
453           free(ctx->buffer);
454         /* Don't free the buffer, it's used by the datastore process. */
455         ctx->buffer = NULL;
456         ctx->count--;
457         ctx->total_events++;
458         if(ctx->count == 0 && ctx->needs_chkpt) {
459           eventer_t completion_e;
460           eventer_remove_fd(e->fd);
461           completion_e = eventer_alloc();
462           memcpy(completion_e, e, sizeof(*e));
463           nctx->e = completion_e;
464           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
465           ctx->state = JLOG_STREAMER_IS_ASYNC;
466           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
467                     NULL, completion_e);
468           noitL(noit_debug, "Pushing %s batch async [%s]: [%u/%u]\n",
469                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
470                 nctx->remote_cn ? nctx->remote_cn : "(null)",
471                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
472           noit_connection_disable_timeout(nctx);
473           return 0;
474         }
475         else if(ctx->count == 0)
476           ctx->state = JLOG_STREAMER_WANT_CHKPT;
477         else
478           ctx->state = JLOG_STREAMER_WANT_HEADER;
479         break;
480
481       case JLOG_STREAMER_IS_ASYNC:
482         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
483       case JLOG_STREAMER_WANT_CHKPT:
484         noitL(noit_debug, "Pushing %s checkpoint [%s]: [%u/%u]\n",
485               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
486               nctx->remote_cn ? nctx->remote_cn : "(null)",
487               ctx->header.chkpt.log, ctx->header.chkpt.marker);
488         n_chkpt.log = htonl(ctx->header.chkpt.log);
489         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
490
491         /* screw short writes.  I'd rather die than not write my data! */
492         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
493                               &mask, e);
494         if(len < 0) {
495           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
496           goto socket_error;
497         }
498         if(len != sizeof(jlog_id)) {
499           noitL(noit_error, "short write on checkpointing stream.\n");
500           goto socket_error;
501         }
502         ctx->state = JLOG_STREAMER_WANT_COUNT;
503         break;
504     }
505   }
506   /* never get here */
507 }
508
509 int
510 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
511                             struct timeval *now) {
512   noit_connection_ctx_t *nctx = closure;
513   int rv;
514   const char *error = NULL;
515
516   rv = eventer_SSL_connect(e, &mask);
517   if(rv > 0) {
518     eventer_ssl_ctx_t *sslctx;
519     e->callback = nctx->consumer_callback;
520     /* We must make a copy of the acceptor_closure_t for each new
521      * connection.
522      */
523     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
524       const char *cn, *end;
525       void *vcn;
526       cn = eventer_ssl_get_peer_subject(sslctx);
527       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
528         cn += 3;
529         end = cn;
530         while(*end && *end != '/') end++;
531         nctx->remote_cn = malloc(end - cn + 1);
532         memcpy(nctx->remote_cn, cn, end - cn);
533         nctx->remote_cn[end-cn] = '\0';
534       }
535       if(nctx->config &&
536          noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) {
537         const char *cn_expected = vcn;
538         if(!nctx->remote_cn ||
539            strcmp(nctx->remote_cn, cn_expected)) {
540           error = "jlog connect CN mismatch\n";
541           goto error;
542         }
543       }
544     }
545     return e->callback(e, mask, e->closure, now);
546   }
547   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
548   noitL(noit_debug, "jlog streamer SSL upgrade failed.\n");
549
550  error:
551   if(error) noitL(noit_error, "%s", error);
552   eventer_remove_fd(e->fd);
553   nctx->e = NULL;
554   e->opset->close(e->fd, &mask, e);
555   noit_connection_schedule_reattempt(nctx, now);
556   return 0;
557 }
558 int
559 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
560                                  struct timeval *now) {
561   noit_connection_ctx_t *nctx = closure;
562   const char *cert, *key, *ca, *ciphers, *crl = NULL;
563   char remote_str[128], tmp_str[128];
564   eventer_ssl_ctx_t *sslctx;
565   int aerrno, len;
566   socklen_t aerrno_len = sizeof(aerrno);
567
568   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
569     if(aerrno != 0) goto connect_error;
570   aerrno = 0;
571
572   if(mask & EVENTER_EXCEPTION) {
573     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
574       aerrno = errno;
575  connect_error:
576     switch(nctx->r.remote.sa_family) {
577       case AF_INET:
578         len = sizeof(struct sockaddr_in);
579         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
580                   tmp_str, len);
581         snprintf(remote_str, sizeof(remote_str), "%s:%d",
582                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
583         break;
584       case AF_INET6:
585         len = sizeof(struct sockaddr_in6);
586         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
587                   tmp_str, len);
588         snprintf(remote_str, sizeof(remote_str), "%s:%d",
589                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
590        break;
591       case AF_UNIX:
592         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
593         break;
594       default:
595         snprintf(remote_str, sizeof(remote_str), "(unknown)");
596     }
597     noitL(noit_error, "Error connecting to %s: %s\n",
598           remote_str, strerror(aerrno));
599     eventer_remove_fd(e->fd);
600     nctx->e = NULL;
601     e->opset->close(e->fd, &mask, e);
602     noit_connection_schedule_reattempt(nctx, now);
603     return 0;
604   }
605
606 #define SSLCONFGET(var,name) do { \
607   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
608                          &var)) var = NULL; } while(0)
609   SSLCONFGET(cert, "certificate_file");
610   SSLCONFGET(key, "key_file");
611   SSLCONFGET(ca, "ca_chain");
612   SSLCONFGET(ciphers, "ciphers");
613   SSLCONFGET(crl, "crl");
614   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
615   if(!sslctx) goto connect_error;
616   if(crl) {
617     if(!eventer_ssl_use_crl(sslctx, crl)) {
618       noitL(noit_error, "Failed to load CRL from %s\n", crl);
619       eventer_ssl_ctx_free(sslctx);
620       goto connect_error;
621     }
622   }
623
624   memcpy(&nctx->last_connect, now, sizeof(*now));
625   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
626                              nctx->sslconfig);
627   EVENTER_ATTACH_SSL(e, sslctx);
628   e->callback = noit_connection_ssl_upgrade;
629   return e->callback(e, mask, closure, now);
630 }
631 static void
632 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
633   struct timeval __now;
634   eventer_t e;
635   int rv, fd = -1;
636 #ifdef SO_KEEPALIVE
637   int optval;
638   socklen_t optlen = sizeof(optval);
639 #endif
640
641   nctx->e = NULL;
642   if(nctx->wants_permanent_shutdown) {
643     noit_connection_ctx_dealloc(nctx);
644     return;
645   }
646   /* Open a socket */
647   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
648   if(fd < 0) goto reschedule;
649
650   /* Make it non-blocking */
651   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
652 #define set_or_bail(type, opt, val) do { \
653   optval = val; \
654   optlen = sizeof(optval); \
655   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
656     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
657           strerror(errno)); \
658     goto reschedule; \
659   } \
660 } while(0)
661 #ifdef SO_KEEPALIVE
662   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
663 #endif
664 #ifdef TCP_KEEPALIVE_THRESHOLD
665   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
666 #endif
667 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
668   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
669 #endif
670 #ifdef TCP_CONN_NOTIFY_THRESHOLD
671   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
672 #endif
673 #ifdef TCP_CONN_ABORT_THRESHOLD
674   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
675 #endif
676
677   /* Initiate a connection */
678   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
679   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
680
681   /* Register a handler for connection completion */
682   e = eventer_alloc();
683   e->fd = fd;
684   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
685   e->callback = noit_connection_complete_connect;
686   e->closure = nctx;
687   nctx->e = e;
688   eventer_add(e);
689
690   noit_connection_update_timeout(nctx);
691   return;
692
693  reschedule:
694   if(fd >= 0) close(fd);
695   gettimeofday(&__now, NULL);
696   noit_connection_schedule_reattempt(nctx, &__now);
697   return;
698 }
699
700 int
701 noit_connection_update_timeout(noit_connection_ctx_t *nctx) {
702   struct timeval now, diff;
703   if(nctx->max_silence == 0) return 0;
704
705   diff.tv_sec = nctx->max_silence / 1000;
706   diff.tv_usec = (nctx->max_silence % 1000) * 1000;
707   gettimeofday(&now, NULL);
708
709   if(!nctx->timeout_event) {
710     nctx->timeout_event = eventer_alloc();
711     nctx->timeout_event->mask = EVENTER_TIMER;
712     nctx->timeout_event->closure = nctx;
713     nctx->timeout_event->callback = noit_connection_session_timeout;
714     add_timeval(now, diff, &nctx->timeout_event->whence);
715     eventer_add(nctx->timeout_event);
716   }
717   else {
718     add_timeval(now, diff, &nctx->timeout_event->whence);
719     eventer_update(nctx->timeout_event, EVENTER_TIMER);
720   }
721   return 0;
722 }
723
724 int
725 noit_connection_disable_timeout(noit_connection_ctx_t *nctx) {
726   if(nctx->timeout_event) {
727     eventer_remove(nctx->timeout_event);
728     eventer_free(nctx->timeout_event);
729     nctx->timeout_event = NULL;
730   }
731   return 0;
732 }
733
734 int
735 initiate_noit_connection(const char *host, unsigned short port,
736                          noit_hash_table *sslconfig, noit_hash_table *config,
737                          eventer_func_t handler, void *closure,
738                          void (*freefunc)(void *)) {
739   noit_connection_ctx_t *ctx;
740   const char *stimeout;
741   int8_t family;
742   int rv;
743   union {
744     struct in_addr addr4;
745     struct in6_addr addr6;
746   } a;
747
748   if(host[0] == '/') {
749     family = AF_UNIX;
750   }
751   else {
752     family = AF_INET;
753     rv = inet_pton(family, host, &a);
754     if(rv != 1) {
755       family = AF_INET6;
756       rv = inet_pton(family, host, &a);
757       if(rv != 1) {
758         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
759         return -1;
760       }
761     }
762   }
763
764   ctx = noit_connection_ctx_alloc();
765   ctx->remote_str = calloc(1, strlen(host) + 7);
766   snprintf(ctx->remote_str, strlen(host) + 7,
767            "%s:%d", host, port);
768  
769   memset(&ctx->r, 0, sizeof(ctx->r));
770   if(family == AF_UNIX) {
771     struct sockaddr_un *s = &ctx->r.remote_un;
772     s->sun_family = AF_UNIX;
773     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
774     ctx->remote_len = sizeof(*s);
775   }
776   else if(family == AF_INET) {
777     struct sockaddr_in *s = &ctx->r.remote_in;
778     s->sin_family = family;
779     s->sin_port = htons(port);
780     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
781     ctx->remote_len = sizeof(*s);
782   }
783   else {
784     struct sockaddr_in6 *s = &ctx->r.remote_in6;
785     s->sin6_family = family;
786     s->sin6_port = htons(port);
787     memcpy(&s->sin6_addr, &a, sizeof(a));
788     ctx->remote_len = sizeof(*s);
789   }
790
791   if(ctx->sslconfig)
792     noit_hash_delete_all(ctx->sslconfig, free, free);
793   else
794     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
795   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
796   if(ctx->config)
797     noit_hash_delete_all(ctx->config, free, free);
798   else
799     ctx->config = calloc(1, sizeof(noit_hash_table));
800   noit_hash_merge_as_dict(ctx->config, config);
801
802   if(noit_hash_retr_str(ctx->config, "timeout", strlen("timeout"), &stimeout))
803     ctx->max_silence = atoi(stimeout);
804   else
805     ctx->max_silence = DEFAULT_NOIT_CONNECTION_TIMEOUT;
806   ctx->consumer_callback = handler;
807   ctx->consumer_free = freefunc;
808   ctx->consumer_ctx = closure;
809   noit_connection_initiate_connection(ctx);
810   return 0;
811 }
812
813 void
814 stratcon_streamer_connection(const char *toplevel, const char *destination,
815                              eventer_func_t handler,
816                              void *(*handler_alloc)(void), void *handler_ctx,
817                              void (*handler_free)(void *)) {
818   int i, cnt = 0;
819   noit_conf_section_t *noit_configs;
820   char path[256];
821
822   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
823   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
824   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
825   for(i=0; i<cnt; i++) {
826     char address[256];
827     unsigned short port;
828     int portint;
829     noit_hash_table *sslconfig, *config;
830
831     if(!noit_conf_get_stringbuf(noit_configs[i],
832                                 "ancestor-or-self::node()/@address",
833                                 address, sizeof(address))) {
834       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
835       continue;
836     }
837     /* if destination is specified, exact match it */
838     if(destination && strcmp(address, destination)) continue;
839
840     if(!noit_conf_get_int(noit_configs[i],
841                           "ancestor-or-self::node()/@port", &portint))
842       portint = 0;
843     port = (unsigned short) portint;
844     if(address[0] != '/' && (portint == 0 || (port != portint))) {
845       /* UNIX sockets don't require a port (they'll ignore it if specified */
846       noitL(noit_stderr,
847             "Invalid port [%d] specified in stanza %d\n", port, i+1);
848       continue;
849     }
850     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
851     config = noit_conf_get_hash(noit_configs[i], "config");
852
853     noitL(noit_error, "initiating to %s\n", address);
854     initiate_noit_connection(address, port, sslconfig, config,
855                              handler,
856                              handler_alloc ? handler_alloc() : handler_ctx,
857                              handler_free);
858     noit_hash_destroy(sslconfig,free,free);
859     free(sslconfig);
860     noit_hash_destroy(config,free,free);
861     free(config);
862   }
863   free(noit_configs);
864 }
865 int
866 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
867   int rv = -1;
868   void *vip;
869   pthread_mutex_lock(&noit_ip_by_cn_lock);
870   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
871     int new_len;
872     char *new_ip = (char *)vip;
873     new_len = strlen(new_ip);
874     strlcpy(ip, new_ip, len);
875     if(new_len >= len) rv = new_len+1;
876     else rv = 0;
877   }
878   pthread_mutex_unlock(&noit_ip_by_cn_lock);
879   return rv;
880 }
881 void
882 stratcon_jlog_streamer_recache_noit() {
883   int di, cnt;
884   noit_conf_section_t *noit_configs;
885   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
886   pthread_mutex_lock(&noit_ip_by_cn_lock);
887   noit_hash_delete_all(&noit_ip_by_cn, free, free);
888   for(di=0; di<cnt; di++) {
889     char address[64];
890     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
891                                  address, sizeof(address))) {
892       char expected_cn[256];
893       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
894                                  expected_cn, sizeof(expected_cn)))
895         noit_hash_store(&noit_ip_by_cn,
896                         strdup(expected_cn), strlen(expected_cn),
897                         strdup(address));
898     }
899   }
900   free(noit_configs);
901   pthread_mutex_unlock(&noit_ip_by_cn_lock);
902 }
903 void
904 stratcon_jlog_streamer_reload(const char *toplevel) {
905   /* flush and repopulate the cn cache */
906   stratcon_jlog_streamer_recache_noit();
907   if(!stratcon_datastore_get_enabled()) return;
908   stratcon_streamer_connection(toplevel, NULL,
909                                stratcon_jlog_recv_handler,
910                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
911                                NULL,
912                                jlog_streamer_ctx_free);
913 }
914
915 static int
916 stratcon_console_show_noits(noit_console_closure_t ncct,
917                             int argc, char **argv,
918                             noit_console_state_t *dstate,
919                             void *closure) {
920   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
921   void *key_id;
922   int klen, n = 0, i;
923   void *vconn;
924   noit_connection_ctx_t **ctx;
925
926   pthread_mutex_lock(&noits_lock);
927   ctx = malloc(sizeof(*ctx) * noits.size);
928   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
929                        &vconn)) {
930     ctx[n] = (noit_connection_ctx_t *)vconn;
931     noit_atomic_inc32(&ctx[n]->refcnt);
932     n++;
933   }
934   pthread_mutex_unlock(&noits_lock);
935   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
936   for(i=0; i<n; i++) {
937     nc_print_noit_conn_brief(ncct, ctx[i]);
938     noit_connection_ctx_deref(ctx[i]);
939   }
940   free(ctx);
941   return 0;
942 }
943
944 static void
945 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
946                        noit_connection_ctx_t *nctx) {
947   struct timeval last, session_duration, diff;
948   u_int64_t session_duration_ms, last_event_ms;
949   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
950   char str[1024], *wr;
951   int len;
952   void *vcn;
953   const char *cn_expected;
954   const char *feedtype = "unknown";
955
956   if(jctx->push == stratcon_datastore_push)
957     feedtype = "storage";
958   else if(jctx->push == stratcon_iep_line_processor)
959     feedtype = "iep";
960   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
961
962   noit_hash_retrieve(nctx->config, "cn", 2, &vcn);
963   if(!vcn) return;
964   cn_expected = vcn;
965
966   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
967            now->tv_sec, now->tv_usec/1000UL, uuid_str, cn_expected, feedtype);
968   wr = str + strlen(str);
969   len = sizeof(str) - (wr - str);
970
971   /* Now we write NAME TYPE VALUE into wr each time and push it */
972 #define push_noit_m_str(name, value) do { \
973   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
974   stratcon_datastore_push(DS_OP_INSERT, \
975                           (struct sockaddr *)&self_stratcon_ip, \
976                           self_stratcon_hostname, strdup(str), NULL); \
977   stratcon_iep_line_processor(DS_OP_INSERT, \
978                               (struct sockaddr *)&self_stratcon_ip, \
979                               self_stratcon_hostname, strdup(str), NULL); \
980 } while(0)
981 #define push_noit_m_u64(name, value) do { \
982   snprintf(wr, len, "%s\tL\t%llu\n", name, value); \
983   stratcon_datastore_push(DS_OP_INSERT, \
984                           (struct sockaddr *)&self_stratcon_ip, \
985                           self_stratcon_hostname, strdup(str), NULL); \
986   stratcon_iep_line_processor(DS_OP_INSERT, \
987                               (struct sockaddr *)&self_stratcon_ip, \
988                               self_stratcon_hostname, strdup(str), NULL); \
989 } while(0)
990
991   last.tv_sec = jctx->header.tv_sec;
992   last.tv_usec = jctx->header.tv_usec;
993   sub_timeval(*now, last, &diff);
994   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
995   sub_timeval(*now, nctx->last_connect, &session_duration);
996   session_duration_ms = session_duration.tv_sec * 1000 +
997                         session_duration.tv_usec / 1000;
998
999   push_noit_m_str("state", nctx->remote_cn ? "connected" :
1000                              (nctx->retry_event ? "disconnected" :
1001                                                   "connecting"));
1002   push_noit_m_u64("last_event_age_ms", last_event_ms);
1003   push_noit_m_u64("session_length_ms", last_event_ms);
1004 }
1005 static int
1006 periodic_noit_metrics(eventer_t e, int mask, void *closure,
1007                       struct timeval *now) {
1008   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1009   noit_connection_ctx_t **ctxs;
1010   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1011   void *key_id;
1012   void *vconn;
1013   int klen, n = 0, i;
1014   char str[1024];
1015   char uuid_str[UUID_STR_LEN+1];
1016
1017   uuid_unparse_lower(self_stratcon_id, uuid_str);
1018
1019   if(closure == NULL) {
1020     /* Only do this the first time we get called */
1021     char ip_str[128];
1022     inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
1023               sizeof(ip_str));
1024     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
1025              now->tv_sec, now->tv_usec/1000UL, uuid_str, ip_str,
1026              self_stratcon_hostname);
1027     stratcon_datastore_push(DS_OP_INSERT,
1028                             (struct sockaddr *)&self_stratcon_ip,
1029                             self_stratcon_hostname, strdup(str), NULL);
1030     stratcon_iep_line_processor(DS_OP_INSERT,
1031                                 (struct sockaddr *)&self_stratcon_ip,
1032                                 self_stratcon_hostname, strdup(str), NULL);
1033   }
1034
1035   pthread_mutex_lock(&noits_lock);
1036   ctxs = malloc(sizeof(*ctxs) * noits.size);
1037   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
1038                        &vconn)) {
1039     ctxs[n] = (noit_connection_ctx_t *)vconn;
1040     noit_atomic_inc32(&ctxs[n]->refcnt);
1041     n++;
1042   }
1043   pthread_mutex_unlock(&noits_lock);
1044
1045   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok\n",
1046            now->tv_sec, now->tv_usec/1000UL, uuid_str);
1047   stratcon_datastore_push(DS_OP_INSERT,
1048                           (struct sockaddr *)&self_stratcon_ip,
1049                           self_stratcon_hostname, strdup(str), NULL);
1050   stratcon_iep_line_processor(DS_OP_INSERT, \
1051                               (struct sockaddr *)&self_stratcon_ip, \
1052                               self_stratcon_hostname, strdup(str), NULL); \
1053   for(i=0; i<n; i++) {
1054     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
1055     noit_connection_ctx_deref(ctxs[i]);
1056   }
1057   free(ctxs);
1058   stratcon_datastore_push(DS_OP_CHKPT,
1059                           (struct sockaddr *)&self_stratcon_ip,
1060                           self_stratcon_hostname, NULL, NULL);
1061   stratcon_iep_line_processor(DS_OP_CHKPT, \
1062                               (struct sockaddr *)&self_stratcon_ip, \
1063                               self_stratcon_hostname, NULL, NULL); \
1064
1065   add_timeval(e->whence, whence, &whence);
1066   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
1067   return 0;
1068 }
1069
1070 static int
1071 rest_show_noits(noit_http_rest_closure_t *restc,
1072                 int npats, char **pats) {
1073   xmlDocPtr doc;
1074   xmlNodePtr root;
1075   noit_hash_table seen = NOIT_HASH_EMPTY;
1076   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1077   char path[256];
1078   void *key_id;
1079   const char *type = NULL, *want_cn = NULL;
1080   int klen, n = 0, i, di, cnt;
1081   void *vconn;
1082   noit_connection_ctx_t **ctxs;
1083   noit_conf_section_t *noit_configs;
1084   struct timeval now, diff, last;
1085   xmlNodePtr node;
1086   noit_http_request *req = noit_http_session_request(restc->http_ctx);
1087
1088   noit_http_process_querystring(req);
1089   type = noit_http_request_querystring(req, "type");
1090   want_cn = noit_http_request_querystring(req, "cn");
1091
1092   gettimeofday(&now, NULL);
1093
1094   pthread_mutex_lock(&noits_lock);
1095   ctxs = malloc(sizeof(*ctxs) * noits.size);
1096   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
1097                        &vconn)) {
1098     ctxs[n] = (noit_connection_ctx_t *)vconn;
1099     noit_atomic_inc32(&ctxs[n]->refcnt);
1100     n++;
1101   }
1102   pthread_mutex_unlock(&noits_lock);
1103   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
1104
1105   doc = xmlNewDoc((xmlChar *)"1.0");
1106   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
1107   xmlDocSetRootElement(doc, root);
1108
1109   for(i=0; i<n; i++) {
1110     char buff[256];
1111     const char *feedtype = "unknown", *state = "unknown";
1112     noit_connection_ctx_t *ctx = ctxs[i];
1113     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
1114
1115     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1116
1117     /* If the user requested a specific type and we're not it, skip. */
1118     if(type && strcmp(feedtype, type)) continue;
1119     /* If the user wants a specific CN... limit to that. */
1120     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn)))
1121       continue;
1122
1123     node = xmlNewNode(NULL, (xmlChar *)"noit");
1124     snprintf(buff, sizeof(buff), "%llu.%06d",
1125              (long long unsigned)ctx->last_connect.tv_sec,
1126              (int)ctx->last_connect.tv_usec);
1127     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1128     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1129                (xmlChar *)"connected" :
1130                (ctx->retry_event ? (xmlChar *)"disconnected" :
1131                                     (xmlChar *)"connecting"));
1132     if(ctx->e) {
1133       char buff[128];
1134       const char *addrstr = NULL;
1135       struct sockaddr_in6 addr6;
1136       socklen_t len = sizeof(addr6);
1137       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1138         unsigned short port = 0;
1139         if(addr6.sin6_family == AF_INET) {
1140           addrstr = inet_ntop(addr6.sin6_family,
1141                               &((struct sockaddr_in *)&addr6)->sin_addr,
1142                               buff, sizeof(buff));
1143           port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
1144         }
1145         else if(addr6.sin6_family == AF_INET6) {
1146           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1147                               buff, sizeof(buff));
1148           port = ntohs(addr6.sin6_port);
1149         }
1150         if(addrstr != NULL) {
1151           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1152                    ":%u", port);
1153           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1154         }
1155       }
1156     }
1157     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1158                       0, free, NULL);
1159     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1160     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1161     if(ctx->retry_event) {
1162       sub_timeval(ctx->retry_event->whence, now, &diff);
1163       snprintf(buff, sizeof(buff), "%llu.%06d",
1164                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1165       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1166     }
1167     else if(ctx->remote_cn) {
1168       if(ctx->remote_cn)
1169         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1170  
1171       switch(jctx->state) {
1172         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1173         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1174         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1175         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1176         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1177         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1178         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1179       }
1180       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1181       snprintf(buff, sizeof(buff), "%08x:%08x",
1182                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1183       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1184       snprintf(buff, sizeof(buff), "%llu",
1185                (unsigned long long)jctx->total_events);
1186       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1187       snprintf(buff, sizeof(buff), "%llu",
1188                (unsigned long long)jctx->total_bytes_read);
1189       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1190  
1191       sub_timeval(now, ctx->last_connect, &diff);
1192       snprintf(buff, sizeof(buff), "%lld.%06d",
1193                (long long)diff.tv_sec, (int)diff.tv_usec);
1194       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1195  
1196       if(jctx->header.tv_sec) {
1197         last.tv_sec = jctx->header.tv_sec;
1198         last.tv_usec = jctx->header.tv_usec;
1199         snprintf(buff, sizeof(buff), "%llu.%06d",
1200                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1201         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1202         sub_timeval(now, last, &diff);
1203         snprintf(buff, sizeof(buff), "%lld.%06d",
1204                  (long long)diff.tv_sec, (int)diff.tv_usec);
1205         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1206       }
1207     }
1208
1209     xmlAddChild(root, node);
1210     noit_connection_ctx_deref(ctx);
1211   }
1212   free(ctxs);
1213
1214   if(!type || !strcmp(type, "configured")) {
1215     snprintf(path, sizeof(path), "//noits//noit");
1216     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1217     for(di=0; di<cnt; di++) {
1218       char address[64], port_str[32], remote_str[98];
1219       char expected_cn_buff[256], *expected_cn = NULL;
1220       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1221                                  expected_cn_buff, sizeof(expected_cn_buff)))
1222         expected_cn = expected_cn_buff;
1223       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1224       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1225                                  address, sizeof(address))) {
1226         void *v;
1227         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1228                                    port_str, sizeof(port_str)))
1229           strlcpy(port_str, "43191", sizeof(port_str));
1230
1231         /* If the user wants a specific CN... limit to that. */
1232           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1233             continue;
1234
1235         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1236         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1237           node = xmlNewNode(NULL, (xmlChar *)"noit");
1238           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1239           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1240           if(expected_cn)
1241             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1242           xmlAddChild(root, node);
1243         }
1244       }
1245     }
1246     free(noit_configs);
1247   }
1248   noit_hash_destroy(&seen, free, NULL);
1249
1250   noit_http_response_ok(restc->http_ctx, "text/xml");
1251   noit_http_response_xml(restc->http_ctx, doc);
1252   noit_http_response_end(restc->http_ctx);
1253   xmlFreeDoc(doc);
1254   return 0;
1255 }
1256 static int
1257 stratcon_add_noit(const char *target, unsigned short port,
1258                   const char *cn) {
1259   int cnt;
1260   char path[256];
1261   char port_str[6];
1262   noit_conf_section_t *noit_configs, parent;
1263   xmlNodePtr newnoit, config, cnnode;
1264
1265   snprintf(path, sizeof(path),
1266            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1267   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1268   free(noit_configs);
1269   if(cnt != 0) return -1;
1270
1271   parent = noit_conf_get_section(NULL, "//noits");
1272   if(!parent) return -1;
1273   snprintf(port_str, sizeof(port_str), "%d", port);
1274   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1275   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1276   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1277   xmlAddChild(parent, newnoit);
1278   if(cn) {
1279     config = xmlNewNode(NULL, (xmlChar *)"config");
1280     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1281     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1282     xmlAddChild(config, cnnode);
1283     xmlAddChild(newnoit, config);
1284     pthread_mutex_lock(&noit_ip_by_cn_lock);
1285     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1286                       strdup(target), free, free);
1287     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1288   }
1289   if(stratcon_datastore_get_enabled())
1290     stratcon_streamer_connection(NULL, target,
1291                                  stratcon_jlog_recv_handler,
1292                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1293                                  NULL,
1294                                  jlog_streamer_ctx_free);
1295   if(stratcon_iep_get_enabled())
1296     stratcon_streamer_connection(NULL, target,
1297                                  stratcon_jlog_recv_handler,
1298                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1299                                  NULL,
1300                                  jlog_streamer_ctx_free);
1301   return 1;
1302 }
1303 static int
1304 stratcon_remove_noit(const char *target, unsigned short port) {
1305   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1306   void *key_id;
1307   int klen, n = -1, i, cnt = 0;
1308   void *vconn;
1309   noit_connection_ctx_t **ctx;
1310   noit_conf_section_t *noit_configs;
1311   char path[256];
1312   char remote_str[256];
1313
1314   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1315
1316   snprintf(path, sizeof(path),
1317            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1318   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1319   for(i=0; i<cnt; i++) {
1320     char expected_cn[256];
1321     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1322                                expected_cn, sizeof(expected_cn))) {
1323       pthread_mutex_lock(&noit_ip_by_cn_lock);
1324       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1325                        free, free);
1326       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1327     }
1328     xmlUnlinkNode(noit_configs[i]);
1329     xmlFreeNode(noit_configs[i]);
1330     n = 0;
1331   }
1332   free(noit_configs);
1333
1334   pthread_mutex_lock(&noits_lock);
1335   ctx = malloc(sizeof(*ctx) * noits.size);
1336   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
1337                        &vconn)) {
1338     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1339       ctx[n] = (noit_connection_ctx_t *)vconn;
1340       noit_atomic_inc32(&ctx[n]->refcnt);
1341       n++;
1342     }
1343   }
1344   pthread_mutex_unlock(&noits_lock);
1345   for(i=0; i<n; i++) {
1346     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1347     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1348   }
1349   free(ctx);
1350   return n;
1351 }
1352 static int
1353 rest_set_noit(noit_http_rest_closure_t *restc,
1354               int npats, char **pats) {
1355   const char *cn = NULL;
1356   noit_http_session_ctx *ctx = restc->http_ctx;
1357   noit_http_request *req = noit_http_session_request(ctx);
1358   unsigned short port = 43191;
1359   if(npats < 1 || npats > 2)
1360     noit_http_response_server_error(ctx, "text/xml");
1361   if(npats == 2) port = atoi(pats[1]);
1362   noit_http_process_querystring(req);
1363   cn = noit_http_request_querystring(req, "cn");
1364   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1365     noit_http_response_ok(ctx, "text/xml");
1366   else
1367     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1368   if(noit_conf_write_file(NULL) != 0)
1369     noitL(noit_error, "local config write failed\n");
1370   noit_conf_mark_changed();
1371   noit_http_response_end(ctx);
1372   return 0;
1373 }
1374 static int
1375 rest_delete_noit(noit_http_rest_closure_t *restc,
1376                  int npats, char **pats) {
1377   noit_http_session_ctx *ctx = restc->http_ctx;
1378   unsigned short port = 43191;
1379   if(npats < 1 || npats > 2)
1380     noit_http_response_server_error(ctx, "text/xml");
1381   if(npats == 2) port = atoi(pats[1]);
1382   if(stratcon_remove_noit(pats[0], port) >= 0)
1383     noit_http_response_ok(ctx, "text/xml");
1384   else
1385     noit_http_response_not_found(ctx, "text/xml");
1386   if(noit_conf_write_file(NULL) != 0)
1387     noitL(noit_error, "local config write failed\n");
1388   noit_conf_mark_changed();
1389   noit_http_response_end(ctx);
1390   return 0;
1391 }
1392 static int
1393 stratcon_console_conf_noits(noit_console_closure_t ncct,
1394                             int argc, char **argv,
1395                             noit_console_state_t *dstate,
1396                             void *closure) {
1397   char *cp, target[128];
1398   unsigned short port = 43191;
1399   int adding = (int)(vpsized_int)closure;
1400   if(argc != 1)
1401     return -1;
1402
1403   cp = strchr(argv[0], ':');
1404   if(cp) {
1405     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1406     port = atoi(cp+1);
1407   }
1408   else strlcpy(target, argv[0], sizeof(target));
1409   if(adding) {
1410     if(stratcon_add_noit(target, port, NULL) >= 0) {
1411       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1412     }
1413     else {
1414       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1415     }
1416   }
1417   else {
1418     if(stratcon_remove_noit(target, port) >= 0) {
1419       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1420     }
1421     else {
1422       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1423     }
1424   }
1425   return 0;
1426 }
1427
1428 static void
1429 register_console_streamer_commands() {
1430   noit_console_state_t *tl;
1431   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1432
1433   tl = noit_console_state_initial();
1434   showcmd = noit_console_state_get_cmd(tl, "show");
1435   assert(showcmd && showcmd->dstate);
1436   confcmd = noit_console_state_get_cmd(tl, "configure");
1437   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1438   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1439
1440   noit_console_state_add_cmd(conftcmd->dstate,
1441     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1442   noit_console_state_add_cmd(conftnocmd->dstate,
1443     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1444
1445   noit_console_state_add_cmd(showcmd->dstate,
1446     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1447 }
1448
1449 void
1450 stratcon_jlog_streamer_init(const char *toplevel) {
1451   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1452   struct in_addr remote;
1453   char uuid_str[UUID_STR_LEN + 1];
1454
1455   pthread_mutex_init(&noits_lock, NULL);
1456   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1457   eventer_name_callback("noit_connection_reinitiate",
1458                         noit_connection_reinitiate);
1459   eventer_name_callback("stratcon_jlog_recv_handler",
1460                         stratcon_jlog_recv_handler);
1461   eventer_name_callback("noit_connection_ssl_upgrade",
1462                         noit_connection_ssl_upgrade);
1463   eventer_name_callback("noit_connection_complete_connect",
1464                         noit_connection_complete_connect);
1465   eventer_name_callback("noit_connection_session_timeout",
1466                         noit_connection_session_timeout);
1467   register_console_streamer_commands();
1468   stratcon_jlog_streamer_reload(toplevel);
1469   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1470   assert(noit_http_rest_register_auth(
1471     "GET", "/noits/", "^show$", rest_show_noits,
1472              noit_http_rest_client_cert_auth
1473   ) == 0);
1474   assert(noit_http_rest_register_auth(
1475     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1476              noit_http_rest_client_cert_auth
1477   ) == 0);
1478   assert(noit_http_rest_register_auth(
1479     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1480              noit_http_rest_client_cert_auth
1481   ) == 0);
1482   assert(noit_http_rest_register_auth(
1483     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1484              noit_http_rest_client_cert_auth
1485   ) == 0);
1486   assert(noit_http_rest_register_auth(
1487     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1488              noit_http_rest_client_cert_auth
1489   ) == 0);
1490
1491   uuid_clear(self_stratcon_id);
1492
1493   if(noit_conf_get_stringbuf(NULL, "/stratcon/@id",
1494                              uuid_str, sizeof(uuid_str)) &&
1495      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1496     int period;
1497     /* If a UUID was provided for stratcon itself, we will report metrics
1498      * on a large variety of things (including all noits).
1499      */
1500     if(noit_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1501        period > 0) {
1502       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1503       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1504     }
1505     self_stratcon_ip.sin_family = AF_INET;
1506     remote.s_addr = 0xffffffff;
1507     noit_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1508     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1509     eventer_add_in(periodic_noit_metrics, NULL, whence);
1510   }
1511 }
1512
Note: See TracBrowser for help on using the browser.