root/src/stratcon_jlog_streamer.c

Revision dd70a5f3aa29668a783b4e0da5cee3a1d3aab102, 59.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 months ago)

some starter support for SOCK_CLOEXEC and O_CLOEXEC

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "dtrace_probes.h"
35 #include "eventer/eventer.h"
36 #include "noit_conf.h"
37 #include "utils/noit_hash.h"
38 #include "utils/noit_log.h"
39 #include "utils/noit_getip.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_rest.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_jlog_streamer.h"
44 #include "stratcon_iep.h"
45
46 #include <unistd.h>
47 #include <assert.h>
48 #include <errno.h>
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #ifdef HAVE_SYS_FILIO_H
52 #include <sys/filio.h>
53 #endif
54 #include <netinet/in.h>
55 #include <sys/un.h>
56 #include <arpa/inet.h>
57
58 pthread_mutex_t noits_lock;
59 noit_hash_table noits = NOIT_HASH_EMPTY;
60 pthread_mutex_t noit_ip_by_cn_lock;
61 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
62 static uuid_t self_stratcon_id;
63 static char self_stratcon_hostname[256] = "\0";
64 static struct sockaddr_in self_stratcon_ip;
65 static noit_boolean stratcon_selfcheck_extended_id = noit_true;
66
67 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
68
69 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
70
71 static const char *feed_type_to_str(int jlog_feed_cmd) {
72   switch(jlog_feed_cmd) {
73     case NOIT_JLOG_DATA_FEED: return "durable/storage";
74     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
75   }
76   return "unknown";
77 }
78
79 #define GET_EXPECTED_CN(nctx, cn) do { \
80   void *vcn; \
81   cn = NULL; \
82   if(nctx->config && \
83      noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
84      cn = vcn; \
85   } \
86 } while(0)
87 #define GET_FEEDTYPE(nctx, feedtype) do { \
88   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
89   feedtype = "unknown"; \
90   if(_jctx->push == stratcon_datastore_push) \
91     feedtype = "storage"; \
92   else if(_jctx->push == stratcon_iep_line_processor) \
93     feedtype = "iep"; \
94 } while(0)
95
96 static int
97 remote_str_sort(const void *a, const void *b) {
98   int rv;
99   noit_connection_ctx_t * const *actx = a;
100   noit_connection_ctx_t * const *bctx = b;
101   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
102   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
103   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
104   if(rv) return rv;
105   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
106            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
107 }
108 static void
109 nc_print_noit_conn_brief(noit_console_closure_t ncct,
110                           noit_connection_ctx_t *ctx) {
111   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
112   struct timeval now, diff, session_duration;
113   const char *feedtype = "unknown";
114   const char *lasttime = "never";
115   if(ctx->last_connect.tv_sec != 0) {
116     char cmdbuf[4096];
117     time_t r = ctx->last_connect.tv_sec;
118     struct tm tbuf, *tm;
119     tm = gmtime_r(&r, &tbuf);
120     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
121     lasttime = cmdbuf;
122   }
123   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
124             ctx->remote_cn ? "connected" :
125                              (ctx->retry_event ? "disconnected" :
126                                                    "connecting"), lasttime);
127   if(ctx->e) {
128     char buff[128];
129     const char *addrstr = NULL;
130     struct sockaddr_in6 addr6;
131     socklen_t len = sizeof(addr6);
132     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
133       unsigned short port = 0;
134       if(addr6.sin6_family == AF_INET) {
135         addrstr = inet_ntop(addr6.sin6_family,
136                             &((struct sockaddr_in *)&addr6)->sin_addr,
137                             buff, sizeof(buff));
138         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
139         port = ntohs(port);
140       }
141       else if(addr6.sin6_family == AF_INET6) {
142         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
143                             buff, sizeof(buff));
144         port = ntohs(addr6.sin6_port);
145       }
146       if(addrstr != NULL)
147         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
148       else
149         nc_printf(ncct, "\tLocal address not interpretable\n");
150     }
151     else {
152       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
153                 ctx->e->fd, strerror(errno));
154     }
155   }
156   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
157   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
158   gettimeofday(&now, NULL);
159   if(ctx->retry_event) {
160     sub_timeval(ctx->retry_event->whence, now, &diff);
161     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
162               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
163   }
164   else if(ctx->remote_cn) {
165     nc_printf(ncct, "\tRemote CN: '%s'\n",
166               ctx->remote_cn ? ctx->remote_cn : "???");
167     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
168       struct timeval last;
169       double session_duration_seconds;
170       const char *state = "unknown";
171
172       switch(jctx->state) {
173         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
174         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
175         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
176         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
177         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
178         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
179         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
180       }
181       last.tv_sec = jctx->header.tv_sec;
182       last.tv_usec = jctx->header.tv_usec;
183       sub_timeval(now, last, &diff);
184       sub_timeval(now, ctx->last_connect, &session_duration);
185       session_duration_seconds = session_duration.tv_sec +
186                                  (double)session_duration.tv_usec/1000000.0;
187       nc_printf(ncct, "\tState: %s\n"
188                       "\tNext checkpoint: [%08x:%08x]\n"
189                       "\tLast event: %lld.%06us ago\n"
190                       "\tEvents this session: %llu (%0.2f/s)\n"
191                       "\tOctets this session: %llu (%0.2f/s)\n",
192                 state,
193                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
194                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
195                 jctx->total_events,
196                 (double)jctx->total_events/session_duration_seconds,
197                 jctx->total_bytes_read,
198                 (double)jctx->total_bytes_read/session_duration_seconds);
199     }
200     else {
201       nc_printf(ncct, "\tUnknown type.\n");
202     }
203   }
204 }
205
206 jlog_streamer_ctx_t *
207 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
208   jlog_streamer_ctx_t *ctx;
209   ctx = stratcon_jlog_streamer_ctx_alloc();
210   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
211   ctx->push = stratcon_datastore_push;
212   return ctx;
213 }
214 jlog_streamer_ctx_t *
215 stratcon_jlog_streamer_ctx_alloc(void) {
216   jlog_streamer_ctx_t *ctx;
217   ctx = calloc(1, sizeof(*ctx));
218   return ctx;
219 }
220 noit_connection_ctx_t *
221 noit_connection_ctx_alloc(void) {
222   noit_connection_ctx_t *ctx, **pctx;
223   ctx = calloc(1, sizeof(*ctx));
224   ctx->refcnt = 1;
225   pctx = malloc(sizeof(*pctx));
226   *pctx = ctx;
227   pthread_mutex_lock(&noits_lock);
228   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
229   pthread_mutex_unlock(&noits_lock);
230   return ctx;
231 }
232 int
233 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
234                          struct timeval *now) {
235   noit_connection_ctx_t *ctx = closure;
236   ctx->retry_event = NULL;
237   noit_connection_initiate_connection(closure);
238   return 0;
239 }
240 void
241 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
242                                    struct timeval *now) {
243   struct timeval __now, interval;
244   const char *v, *feedtype, *cn_expected;
245   u_int32_t min_interval = 1000, max_interval = 8000;
246
247   noit_connection_disable_timeout(ctx);
248   if(ctx->remote_cn) {
249     free(ctx->remote_cn);
250     ctx->remote_cn = NULL;
251   }
252   if(noit_hash_retr_str(ctx->config,
253                         "reconnect_initial_interval",
254                         strlen("reconnect_initial_interval"),
255                         &v)) {
256     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
257   }
258   if(noit_hash_retr_str(ctx->config,
259                         "reconnect_maximum_interval",
260                         strlen("reconnect_maximum_interval"),
261                         &v)) {
262     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
263   }
264   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
265   else {
266     ctx->current_backoff *= 2;
267     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
268     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
269   }
270   if(!now) {
271     gettimeofday(&__now, NULL);
272     now = &__now;
273   }
274   interval.tv_sec = ctx->current_backoff / 1000;
275   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
276   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
277         ctx->current_backoff);
278   if(ctx->retry_event)
279     eventer_remove(ctx->retry_event);
280   else
281     ctx->retry_event = eventer_alloc();
282   ctx->retry_event->callback = noit_connection_reinitiate;
283   ctx->retry_event->closure = ctx;
284   ctx->retry_event->mask = EVENTER_TIMER;
285   add_timeval(*now, interval, &ctx->retry_event->whence);
286   GET_EXPECTED_CN(ctx, cn_expected);
287   GET_FEEDTYPE(ctx, feedtype);
288   STRATCON_RESCHEDULE(-1, (char *)feedtype, ctx->remote_str,
289                            (char *)cn_expected, ctx->current_backoff);
290   eventer_add(ctx->retry_event);
291 }
292 static void
293 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
294   if(ctx->remote_cn) free(ctx->remote_cn);
295   if(ctx->remote_str) free(ctx->remote_str);
296   if(ctx->retry_event) {
297     eventer_remove(ctx->retry_event);
298     eventer_free(ctx->retry_event);
299   }
300   if(ctx->timeout_event) {
301     eventer_remove(ctx->timeout_event);
302     eventer_free(ctx->timeout_event);
303   }
304   ctx->consumer_free(ctx->consumer_ctx);
305   if (ctx->e) {
306     int mask = 0;
307     eventer_remove_fd(ctx->e->fd);
308     ctx->e->opset->close(ctx->e->fd, &mask, ctx->e);
309     eventer_free(ctx->e);
310   }
311   free(ctx);
312 }
313 void
314 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
315   if(noit_atomic_dec32(&ctx->refcnt) == 0) {
316     noit_connection_ctx_free(ctx);
317   }
318 }
319 void
320 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
321   noit_connection_ctx_t **pctx = &ctx;
322   pthread_mutex_lock(&noits_lock);
323   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
324                    free, (void (*)(void *))noit_connection_ctx_deref);
325   pthread_mutex_unlock(&noits_lock);
326 }
327 void
328 jlog_streamer_ctx_free(void *cl) {
329   jlog_streamer_ctx_t *ctx = cl;
330   if(ctx->buffer) free(ctx->buffer);
331   free(ctx);
332 }
333
334 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
335 static int
336 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
337   int len, mask;
338   while(ctx->bytes_read < ctx->bytes_expected) {
339     len = Eread(ctx->buffer + ctx->bytes_read,
340                 ctx->bytes_expected - ctx->bytes_read);
341     if(len < 0) {
342       *newmask = mask;
343       return -1;
344     }
345     /* if we get 0 inside SSL, and there was a real error, we
346      * will actually get a -1 here.
347      * if(len == 0) return ctx->bytes_read;
348      */
349     ctx->total_bytes_read += len;
350     ctx->bytes_read += len;
351   }
352   assert(ctx->bytes_read == ctx->bytes_expected);
353   return ctx->bytes_read;
354 }
355 #define FULLREAD(e,ctx,size) do { \
356   int mask, len; \
357   if(!ctx->bytes_expected) { \
358     ctx->bytes_expected = size; \
359     if(ctx->buffer) free(ctx->buffer); \
360     ctx->buffer = malloc(size + 1); \
361     if(ctx->buffer == NULL) { \
362       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
363       goto socket_error; \
364     } \
365     ctx->buffer[size] = '\0'; \
366   } \
367   len = __read_on_ctx(e, ctx, &mask); \
368   if(len < 0) { \
369     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
370     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
371     goto socket_error; \
372   } \
373   ctx->bytes_read = 0; \
374   ctx->bytes_expected = 0; \
375   if(len != size) { \
376     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
377           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
378     goto socket_error; \
379   } \
380 } while(0)
381
382 int
383 noit_connection_session_timeout(eventer_t e, int mask, void *closure,
384                                 struct timeval *now) {
385   noit_connection_ctx_t *nctx = closure;
386   eventer_t fde = nctx->e;
387   nctx->timeout_event = NULL;
388   noitL(noit_error, "Timing out jlog session: %s\n",
389         nctx->remote_cn ? nctx->remote_cn : "(null)");
390   if(fde)
391     eventer_trigger(fde, EVENTER_EXCEPTION);
392   return 0;
393 }
394 int
395 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
396                            struct timeval *now) {
397   noit_connection_ctx_t *nctx = closure;
398   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
399   jlog_streamer_ctx_t dummy;
400   int len;
401   jlog_id n_chkpt;
402   const char *cn_expected, *feedtype;
403   GET_EXPECTED_CN(nctx, cn_expected);
404   GET_FEEDTYPE(nctx, feedtype);
405
406   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
407     if(write(e->fd, e, 0) == -1)
408       noitL(noit_error, "socket error: %s\n", strerror(errno));
409  socket_error:
410     STRATCON_CONNECT_CLOSE(e->fd, (char *)feedtype, nctx->remote_str,
411                                 (char *)cn_expected,
412                                 nctx->wants_shutdown, errno);
413     ctx->state = JLOG_STREAMER_WANT_INITIATE;
414     ctx->count = 0;
415     ctx->needs_chkpt = 0;
416     ctx->bytes_read = 0;
417     ctx->bytes_expected = 0;
418     if(ctx->buffer) free(ctx->buffer);
419     ctx->buffer = NULL;
420     noit_connection_schedule_reattempt(nctx, now);
421     eventer_remove_fd(e->fd);
422     nctx->e = NULL;
423     e->opset->close(e->fd, &mask, e);
424     return 0;
425   }
426
427   noit_connection_update_timeout(nctx);
428   while(1) {
429     switch(ctx->state) {
430       case JLOG_STREAMER_WANT_INITIATE:
431         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
432                               sizeof(ctx->jlog_feed_cmd),
433                               &mask, e);
434         if(len < 0) {
435           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
436           goto socket_error;
437         }
438         if(len != sizeof(ctx->jlog_feed_cmd)) {
439           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
440                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
441           goto socket_error;
442         }
443         ctx->state = JLOG_STREAMER_WANT_COUNT;
444         break;
445
446       case JLOG_STREAMER_WANT_ERROR:
447         FULLREAD(e, ctx, 0 - ctx->count);
448         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
449               0 - ctx->count, ctx->buffer);
450         free(ctx->buffer); ctx->buffer = NULL;
451         goto socket_error;
452         break;
453
454       case JLOG_STREAMER_WANT_COUNT:
455         FULLREAD(e, ctx, sizeof(u_int32_t));
456         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
457         ctx->count = ntohl(dummy.count);
458         ctx->needs_chkpt = 0;
459         free(ctx->buffer); ctx->buffer = NULL;
460         STRATCON_STREAM_COUNT(e->fd, (char *)feedtype,
461                                    nctx->remote_str, (char *)cn_expected,
462                                    ctx->count);
463         if(ctx->count < 0)
464           ctx->state = JLOG_STREAMER_WANT_ERROR;
465         else
466           ctx->state = JLOG_STREAMER_WANT_HEADER;
467         break;
468
469       case JLOG_STREAMER_WANT_HEADER:
470         if(ctx->count == 0) {
471           ctx->state = JLOG_STREAMER_WANT_COUNT;
472           break;
473         }
474         FULLREAD(e, ctx, sizeof(ctx->header));
475         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
476         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
477         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
478         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
479         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
480         ctx->header.message_len = ntohl(dummy.header.message_len);
481         STRATCON_STREAM_HEADER(e->fd, (char *)feedtype,
482                                     nctx->remote_str, (char *)cn_expected,
483                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
484                                     ctx->header.tv_sec, ctx->header.tv_usec,
485                                     ctx->header.message_len);
486         free(ctx->buffer); ctx->buffer = NULL;
487         ctx->state = JLOG_STREAMER_WANT_BODY;
488         break;
489
490       case JLOG_STREAMER_WANT_BODY:
491         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
492         STRATCON_STREAM_BODY(e->fd, (char *)feedtype,
493                                   nctx->remote_str, (char *)cn_expected,
494                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
495                                   ctx->header.tv_sec, ctx->header.tv_usec,
496                                   ctx->buffer);
497         if(ctx->header.message_len > 0) {
498           ctx->needs_chkpt = 1;
499           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
500                     ctx->buffer, NULL);
501         }
502         else if(ctx->buffer)
503           free(ctx->buffer);
504         /* Don't free the buffer, it's used by the datastore process. */
505         ctx->buffer = NULL;
506         ctx->count--;
507         ctx->total_events++;
508         if(ctx->count == 0 && ctx->needs_chkpt) {
509           eventer_t completion_e;
510           eventer_remove_fd(e->fd);
511           completion_e = eventer_alloc();
512           memcpy(completion_e, e, sizeof(*e));
513           nctx->e = completion_e;
514           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
515           ctx->state = JLOG_STREAMER_IS_ASYNC;
516           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
517                     NULL, completion_e);
518           noitL(noit_debug, "Pushing %s batch async [%s]: [%u/%u]\n",
519                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
520                 nctx->remote_cn ? nctx->remote_cn : "(null)",
521                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
522           noit_connection_disable_timeout(nctx);
523           return 0;
524         }
525         else if(ctx->count == 0)
526           ctx->state = JLOG_STREAMER_WANT_CHKPT;
527         else
528           ctx->state = JLOG_STREAMER_WANT_HEADER;
529         break;
530
531       case JLOG_STREAMER_IS_ASYNC:
532         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
533       case JLOG_STREAMER_WANT_CHKPT:
534         noitL(noit_debug, "Pushing %s checkpoint [%s]: [%u/%u]\n",
535               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
536               nctx->remote_cn ? nctx->remote_cn : "(null)",
537               ctx->header.chkpt.log, ctx->header.chkpt.marker);
538         n_chkpt.log = htonl(ctx->header.chkpt.log);
539         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
540
541         /* screw short writes.  I'd rather die than not write my data! */
542         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
543                               &mask, e);
544         if(len < 0) {
545           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
546           goto socket_error;
547         }
548         if(len != sizeof(jlog_id)) {
549           noitL(noit_error, "short write on checkpointing stream.\n");
550           goto socket_error;
551         }
552         STRATCON_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
553                                         nctx->remote_str, (char *)cn_expected,
554                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
555         ctx->state = JLOG_STREAMER_WANT_COUNT;
556         break;
557     }
558   }
559   /* never get here */
560 }
561
562 int
563 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
564                             struct timeval *now) {
565   noit_connection_ctx_t *nctx = closure;
566   int rv;
567   const char *error = NULL, *cn_expected, *feedtype;
568   eventer_ssl_ctx_t *sslctx = NULL;
569
570   GET_EXPECTED_CN(nctx, cn_expected);
571   GET_FEEDTYPE(nctx, feedtype);
572   STRATCON_CONNECT_SSL(e->fd, (char *)feedtype, nctx->remote_str,
573                             (char *)cn_expected);
574   rv = eventer_SSL_connect(e, &mask);
575   sslctx = eventer_get_eventer_ssl_ctx(e);
576
577   if(rv > 0) {
578     e->callback = nctx->consumer_callback;
579     /* We must make a copy of the acceptor_closure_t for each new
580      * connection.
581      */
582     if(sslctx != NULL) {
583       const char *cn, *end;
584       cn = eventer_ssl_get_peer_subject(sslctx);
585       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
586         cn += 3;
587         end = cn;
588         while(*end && *end != '/') end++;
589         nctx->remote_cn = malloc(end - cn + 1);
590         memcpy(nctx->remote_cn, cn, end - cn);
591         nctx->remote_cn[end-cn] = '\0';
592       }
593       if(cn_expected && (!nctx->remote_cn ||
594                          strcmp(nctx->remote_cn, cn_expected))) {
595         error = "jlog connect CN mismatch";
596         goto error;
597       }
598     }
599     STRATCON_CONNECT_SSL_SUCCESS(e->fd, (char *)feedtype,
600                                       nctx->remote_str, (char *)cn_expected);
601     return e->callback(e, mask, e->closure, now);
602   }
603   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
604   if(sslctx) error = eventer_ssl_get_last_error(sslctx);
605   noitL(noit_debug, "jlog streamer SSL upgrade failed.\n");
606
607  error:
608   STRATCON_CONNECT_SSL_FAILED(e->fd, (char *)feedtype,
609                                    nctx->remote_str, (char *)cn_expected,
610                                    (char *)error, errno);
611   if(error) noitL(noit_error, "%s\n", error);
612   eventer_remove_fd(e->fd);
613   nctx->e = NULL;
614   e->opset->close(e->fd, &mask, e);
615   noit_connection_schedule_reattempt(nctx, now);
616   return 0;
617 }
618 int
619 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
620                                  struct timeval *now) {
621   noit_connection_ctx_t *nctx = closure;
622   const char *cert, *key, *ca, *ciphers, *crl = NULL, *cn_expected, *feedtype;
623   char remote_str[128], tmp_str[128];
624   eventer_ssl_ctx_t *sslctx;
625   int aerrno, len;
626   socklen_t aerrno_len = sizeof(aerrno);
627
628   GET_EXPECTED_CN(nctx, cn_expected);
629   GET_FEEDTYPE(nctx, feedtype);
630   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
631     if(aerrno != 0) goto connect_error;
632   aerrno = 0;
633
634   if(mask & EVENTER_EXCEPTION) {
635     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
636       aerrno = errno;
637  connect_error:
638     switch(nctx->r.remote.sa_family) {
639       case AF_INET:
640         len = sizeof(struct sockaddr_in);
641         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
642                   tmp_str, len);
643         snprintf(remote_str, sizeof(remote_str), "%s:%d",
644                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
645         break;
646       case AF_INET6:
647         len = sizeof(struct sockaddr_in6);
648         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
649                   tmp_str, len);
650         snprintf(remote_str, sizeof(remote_str), "%s:%d",
651                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
652        break;
653       case AF_UNIX:
654         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
655         break;
656       default:
657         snprintf(remote_str, sizeof(remote_str), "(unknown)");
658     }
659     noitL(noit_error, "Error connecting to %s: %s\n",
660           remote_str, strerror(aerrno));
661     STRATCON_CONNECT_FAILED(e->fd, (char *)feedtype, remote_str,
662                                  (char *)cn_expected, aerrno);
663     eventer_remove_fd(e->fd);
664     nctx->e = NULL;
665     e->opset->close(e->fd, &mask, e);
666     noit_connection_schedule_reattempt(nctx, now);
667     return 0;
668   }
669
670 #define SSLCONFGET(var,name) do { \
671   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
672                          &var)) var = NULL; } while(0)
673   SSLCONFGET(cert, "certificate_file");
674   SSLCONFGET(key, "key_file");
675   SSLCONFGET(ca, "ca_chain");
676   SSLCONFGET(ciphers, "ciphers");
677   SSLCONFGET(crl, "crl");
678   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
679   if(!sslctx) goto connect_error;
680   if(crl) {
681     if(!eventer_ssl_use_crl(sslctx, crl)) {
682       noitL(noit_error, "Failed to load CRL from %s\n", crl);
683       eventer_ssl_ctx_free(sslctx);
684       goto connect_error;
685     }
686   }
687
688   memcpy(&nctx->last_connect, now, sizeof(*now));
689   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
690                              nctx->sslconfig);
691   EVENTER_ATTACH_SSL(e, sslctx);
692   e->callback = noit_connection_ssl_upgrade;
693   STRATCON_CONNECT_SUCCESS(e->fd, (char *)feedtype, nctx->remote_str,
694                                 (char *)cn_expected);
695   return e->callback(e, mask, closure, now);
696 }
697 static void
698 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
699   struct timeval __now;
700   const char *cn_expected, *feedtype;
701   eventer_t e;
702   int rv, fd = -1;
703 #ifdef SO_KEEPALIVE
704   int optval;
705   socklen_t optlen = sizeof(optval);
706 #endif
707
708   GET_EXPECTED_CN(nctx, cn_expected);
709   GET_FEEDTYPE(nctx, feedtype);
710   nctx->e = NULL;
711   if(nctx->wants_permanent_shutdown) {
712     STRATCON_SHUTDOWN_PERMANENT(-1, (char *)feedtype,
713                                      nctx->remote_str, (char *)cn_expected);
714     noit_connection_ctx_dealloc(nctx);
715     return;
716   }
717   /* Open a socket */
718   fd = socket(nctx->r.remote.sa_family, NE_SOCK_CLOEXEC|SOCK_STREAM, 0);
719   if(fd < 0) goto reschedule;
720
721   /* Make it non-blocking */
722   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
723 #define set_or_bail(type, opt, val) do { \
724   optval = val; \
725   optlen = sizeof(optval); \
726   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
727     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
728           strerror(errno)); \
729     goto reschedule; \
730   } \
731 } while(0)
732 #ifdef SO_KEEPALIVE
733   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
734 #endif
735 #ifdef TCP_KEEPALIVE_THRESHOLD
736   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
737 #endif
738 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
739   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
740 #endif
741 #ifdef TCP_CONN_NOTIFY_THRESHOLD
742   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
743 #endif
744 #ifdef TCP_CONN_ABORT_THRESHOLD
745   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
746 #endif
747
748   /* Initiate a connection */
749   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
750   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
751
752   /* Register a handler for connection completion */
753   e = eventer_alloc();
754   e->fd = fd;
755   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
756   e->callback = noit_connection_complete_connect;
757   e->closure = nctx;
758   nctx->e = e;
759   eventer_add(e);
760
761   STRATCON_CONNECT(e->fd, (char *)feedtype, nctx->remote_str,
762                         (char *)cn_expected);
763   noit_connection_update_timeout(nctx);
764   return;
765
766  reschedule:
767   if(fd >= 0) close(fd);
768   gettimeofday(&__now, NULL);
769   noit_connection_schedule_reattempt(nctx, &__now);
770   return;
771 }
772
773 int
774 noit_connection_update_timeout(noit_connection_ctx_t *nctx) {
775   struct timeval now, diff;
776   if(nctx->max_silence == 0) return 0;
777
778   diff.tv_sec = nctx->max_silence / 1000;
779   diff.tv_usec = (nctx->max_silence % 1000) * 1000;
780   gettimeofday(&now, NULL);
781
782   if(!nctx->timeout_event) {
783     nctx->timeout_event = eventer_alloc();
784     nctx->timeout_event->mask = EVENTER_TIMER;
785     nctx->timeout_event->closure = nctx;
786     nctx->timeout_event->callback = noit_connection_session_timeout;
787     add_timeval(now, diff, &nctx->timeout_event->whence);
788     eventer_add(nctx->timeout_event);
789   }
790   else {
791     add_timeval(now, diff, &nctx->timeout_event->whence);
792     eventer_update(nctx->timeout_event, EVENTER_TIMER);
793   }
794   return 0;
795 }
796
797 int
798 noit_connection_disable_timeout(noit_connection_ctx_t *nctx) {
799   if(nctx->timeout_event) {
800     eventer_remove(nctx->timeout_event);
801     eventer_free(nctx->timeout_event);
802     nctx->timeout_event = NULL;
803   }
804   return 0;
805 }
806
807 int
808 initiate_noit_connection(const char *host, unsigned short port,
809                          noit_hash_table *sslconfig, noit_hash_table *config,
810                          eventer_func_t handler, void *closure,
811                          void (*freefunc)(void *)) {
812   noit_connection_ctx_t *ctx;
813   const char *stimeout;
814   int8_t family;
815   int rv;
816   union {
817     struct in_addr addr4;
818     struct in6_addr addr6;
819   } a;
820
821   if(host[0] == '/') {
822     family = AF_UNIX;
823   }
824   else {
825     family = AF_INET;
826     rv = inet_pton(family, host, &a);
827     if(rv != 1) {
828       family = AF_INET6;
829       rv = inet_pton(family, host, &a);
830       if(rv != 1) {
831         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
832         return -1;
833       }
834     }
835   }
836
837   ctx = noit_connection_ctx_alloc();
838   ctx->remote_str = calloc(1, strlen(host) + 7);
839   snprintf(ctx->remote_str, strlen(host) + 7,
840            "%s:%d", host, port);
841  
842   memset(&ctx->r, 0, sizeof(ctx->r));
843   if(family == AF_UNIX) {
844     struct sockaddr_un *s = &ctx->r.remote_un;
845     s->sun_family = AF_UNIX;
846     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
847     ctx->remote_len = sizeof(*s);
848   }
849   else if(family == AF_INET) {
850     struct sockaddr_in *s = &ctx->r.remote_in;
851     s->sin_family = family;
852     s->sin_port = htons(port);
853     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
854     ctx->remote_len = sizeof(*s);
855   }
856   else {
857     struct sockaddr_in6 *s = &ctx->r.remote_in6;
858     s->sin6_family = family;
859     s->sin6_port = htons(port);
860     memcpy(&s->sin6_addr, &a, sizeof(a));
861     ctx->remote_len = sizeof(*s);
862   }
863
864   if(ctx->sslconfig)
865     noit_hash_delete_all(ctx->sslconfig, free, free);
866   else
867     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
868   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
869   if(ctx->config)
870     noit_hash_delete_all(ctx->config, free, free);
871   else
872     ctx->config = calloc(1, sizeof(noit_hash_table));
873   noit_hash_merge_as_dict(ctx->config, config);
874
875   if(noit_hash_retr_str(ctx->config, "timeout", strlen("timeout"), &stimeout))
876     ctx->max_silence = atoi(stimeout);
877   else
878     ctx->max_silence = DEFAULT_NOIT_CONNECTION_TIMEOUT;
879   ctx->consumer_callback = handler;
880   ctx->consumer_free = freefunc;
881   ctx->consumer_ctx = closure;
882   noit_connection_initiate_connection(ctx);
883   return 0;
884 }
885
886 void
887 stratcon_streamer_connection(const char *toplevel, const char *destination,
888                              eventer_func_t handler,
889                              void *(*handler_alloc)(void), void *handler_ctx,
890                              void (*handler_free)(void *)) {
891   int i, cnt = 0;
892   noit_conf_section_t *noit_configs;
893   char path[256];
894
895   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
896   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
897   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
898   for(i=0; i<cnt; i++) {
899     char address[256];
900     unsigned short port;
901     int portint;
902     noit_hash_table *sslconfig, *config;
903
904     if(!noit_conf_get_stringbuf(noit_configs[i],
905                                 "ancestor-or-self::node()/@address",
906                                 address, sizeof(address))) {
907       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
908       continue;
909     }
910     /* if destination is specified, exact match it */
911     if(destination && strcmp(address, destination)) continue;
912
913     if(!noit_conf_get_int(noit_configs[i],
914                           "ancestor-or-self::node()/@port", &portint))
915       portint = 0;
916     port = (unsigned short) portint;
917     if(address[0] != '/' && (portint == 0 || (port != portint))) {
918       /* UNIX sockets don't require a port (they'll ignore it if specified */
919       noitL(noit_stderr,
920             "Invalid port [%d] specified in stanza %d\n", port, i+1);
921       continue;
922     }
923     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
924     config = noit_conf_get_hash(noit_configs[i], "config");
925
926     noitL(noit_error, "initiating to %s\n", address);
927     initiate_noit_connection(address, port, sslconfig, config,
928                              handler,
929                              handler_alloc ? handler_alloc() : handler_ctx,
930                              handler_free);
931     noit_hash_destroy(sslconfig,free,free);
932     free(sslconfig);
933     noit_hash_destroy(config,free,free);
934     free(config);
935   }
936   free(noit_configs);
937 }
938 int
939 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
940   int rv = -1;
941   void *vip;
942   pthread_mutex_lock(&noit_ip_by_cn_lock);
943   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
944     int new_len;
945     char *new_ip = (char *)vip;
946     new_len = strlen(new_ip);
947     strlcpy(ip, new_ip, len);
948     if(new_len >= len) rv = new_len+1;
949     else rv = 0;
950   }
951   pthread_mutex_unlock(&noit_ip_by_cn_lock);
952   return rv;
953 }
954 void
955 stratcon_jlog_streamer_recache_noit() {
956   int di, cnt;
957   noit_conf_section_t *noit_configs;
958   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
959   pthread_mutex_lock(&noit_ip_by_cn_lock);
960   noit_hash_delete_all(&noit_ip_by_cn, free, free);
961   for(di=0; di<cnt; di++) {
962     char address[64];
963     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
964                                  address, sizeof(address))) {
965       char expected_cn[256];
966       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
967                                  expected_cn, sizeof(expected_cn)))
968         noit_hash_store(&noit_ip_by_cn,
969                         strdup(expected_cn), strlen(expected_cn),
970                         strdup(address));
971     }
972   }
973   free(noit_configs);
974   pthread_mutex_unlock(&noit_ip_by_cn_lock);
975 }
976 void
977 stratcon_jlog_streamer_reload(const char *toplevel) {
978   /* flush and repopulate the cn cache */
979   stratcon_jlog_streamer_recache_noit();
980   if(!stratcon_datastore_get_enabled()) return;
981   stratcon_streamer_connection(toplevel, NULL,
982                                stratcon_jlog_recv_handler,
983                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
984                                NULL,
985                                jlog_streamer_ctx_free);
986 }
987
988 char *
989 stratcon_console_noit_opts(noit_console_closure_t ncct,
990                            noit_console_state_stack_t *stack,
991                            noit_console_state_t *dstate,
992                            int argc, char **argv, int idx) {
993   if(argc == 1) {
994     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
995     const char *key_id;
996     int klen, i = 0;
997     void *vconn, *vcn;
998     noit_connection_ctx_t *ctx;
999     noit_hash_table dedup = NOIT_HASH_EMPTY;
1000
1001     pthread_mutex_lock(&noits_lock);
1002     while(noit_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
1003       ctx = (noit_connection_ctx_t *)vconn;
1004       vcn = NULL;
1005       if(ctx->config && noit_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
1006          !noit_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
1007         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
1008           if(idx == i) {
1009             pthread_mutex_unlock(&noits_lock);
1010             noit_hash_destroy(&dedup, NULL, NULL);
1011             return strdup(vcn);
1012           }
1013           i++;
1014         }
1015       }
1016       if(ctx->remote_str &&
1017          !noit_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
1018         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
1019           if(idx == i) {
1020             pthread_mutex_unlock(&noits_lock);
1021             noit_hash_destroy(&dedup, NULL, NULL);
1022             return strdup(ctx->remote_str);
1023           }
1024           i++;
1025         }
1026       }
1027     }
1028     pthread_mutex_unlock(&noits_lock);
1029     noit_hash_destroy(&dedup, NULL, NULL);
1030   }
1031   if(argc == 2)
1032     return noit_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
1033   return NULL;
1034 }
1035 static int
1036 stratcon_console_show_noits(noit_console_closure_t ncct,
1037                             int argc, char **argv,
1038                             noit_console_state_t *dstate,
1039                             void *closure) {
1040   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1041   const char *key_id, *ecn;
1042   int klen, n = 0, i;
1043   void *vconn;
1044   noit_connection_ctx_t **ctx;
1045
1046   if(closure != (void *)0 && argc == 0) {
1047     nc_printf(ncct, "takes an argument\n");
1048     return 0;
1049   }
1050   if(closure == (void *)0 && argc > 0) {
1051     nc_printf(ncct, "takes no arguments\n");
1052     return 0;
1053   }
1054   pthread_mutex_lock(&noits_lock);
1055   ctx = malloc(sizeof(*ctx) * noits.size);
1056   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1057                        &vconn)) {
1058     ctx[n] = (noit_connection_ctx_t *)vconn;
1059     if(argc == 0 ||
1060        !strcmp(ctx[n]->remote_str, argv[0]) ||
1061        (ctx[n]->config && noit_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
1062         !strcmp(ecn, argv[0]))) {
1063       noit_atomic_inc32(&ctx[n]->refcnt);
1064       n++;
1065     }
1066   }
1067   pthread_mutex_unlock(&noits_lock);
1068   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
1069   for(i=0; i<n; i++) {
1070     nc_print_noit_conn_brief(ncct, ctx[i]);
1071     noit_connection_ctx_deref(ctx[i]);
1072   }
1073   free(ctx);
1074   return 0;
1075 }
1076
1077 static void
1078 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
1079                        noit_connection_ctx_t *nctx) {
1080   struct timeval last, session_duration, diff;
1081   u_int64_t session_duration_ms, last_event_ms;
1082   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
1083   char str[1024], *wr;
1084   int len;
1085   const char *cn_expected;
1086   const char *feedtype = "unknown";
1087
1088   GET_FEEDTYPE(nctx, feedtype);
1089   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
1090
1091   GET_EXPECTED_CN(nctx, cn_expected);
1092   if(!cn_expected) return;
1093
1094   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
1095            (long unsigned int)now->tv_sec,
1096            (long unsigned int)now->tv_usec/1000UL,
1097            uuid_str, cn_expected, feedtype);
1098   wr = str + strlen(str);
1099   len = sizeof(str) - (wr - str);
1100
1101   /* Now we write NAME TYPE VALUE into wr each time and push it */
1102 #define push_noit_m_str(name, value) do { \
1103   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
1104   stratcon_datastore_push(DS_OP_INSERT, \
1105                           (struct sockaddr *)&self_stratcon_ip, \
1106                           self_stratcon_hostname, strdup(str), NULL); \
1107   stratcon_iep_line_processor(DS_OP_INSERT, \
1108                               (struct sockaddr *)&self_stratcon_ip, \
1109                               self_stratcon_hostname, strdup(str), NULL); \
1110 } while(0)
1111 #define push_noit_m_u64(name, value) do { \
1112   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
1113   stratcon_datastore_push(DS_OP_INSERT, \
1114                           (struct sockaddr *)&self_stratcon_ip, \
1115                           self_stratcon_hostname, strdup(str), NULL); \
1116   stratcon_iep_line_processor(DS_OP_INSERT, \
1117                               (struct sockaddr *)&self_stratcon_ip, \
1118                               self_stratcon_hostname, strdup(str), NULL); \
1119 } while(0)
1120
1121   last.tv_sec = jctx->header.tv_sec;
1122   last.tv_usec = jctx->header.tv_usec;
1123   sub_timeval(*now, last, &diff);
1124   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
1125   sub_timeval(*now, nctx->last_connect, &session_duration);
1126   session_duration_ms = session_duration.tv_sec * 1000 +
1127                         session_duration.tv_usec / 1000;
1128
1129   push_noit_m_str("state", nctx->remote_cn ? "connected" :
1130                              (nctx->retry_event ? "disconnected" :
1131                                                   "connecting"));
1132   push_noit_m_u64("last_event_age_ms", last_event_ms);
1133   push_noit_m_u64("session_length_ms", session_duration_ms);
1134 }
1135 static int
1136 periodic_noit_metrics(eventer_t e, int mask, void *closure,
1137                       struct timeval *now) {
1138   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1139   noit_connection_ctx_t **ctxs;
1140   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1141   const char *key_id;
1142   void *vconn;
1143   int klen, n = 0, i;
1144   char str[1024];
1145   char uuid_str[1024], tmp_uuid_str[UUID_STR_LEN+1];
1146   struct timeval epoch, diff;
1147   u_int64_t uptime = 0;
1148   char ip_str[128];
1149
1150   inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
1151             sizeof(ip_str));
1152
1153   uuid_str[0] = '\0';
1154   uuid_unparse_lower(self_stratcon_id, tmp_uuid_str);
1155   if(stratcon_selfcheck_extended_id) {
1156     strlcat(uuid_str, ip_str, sizeof(uuid_str)-37);
1157     strlcat(uuid_str, "`selfcheck`selfcheck`", sizeof(uuid_str)-37);
1158   }
1159   strlcat(uuid_str, tmp_uuid_str, sizeof(uuid_str));
1160
1161 #define PUSH_BOTH(type, str) do { \
1162   stratcon_datastore_push(type, \
1163                           (struct sockaddr *)&self_stratcon_ip, \
1164                           self_stratcon_hostname, str, NULL); \
1165   stratcon_iep_line_processor(type, \
1166                               (struct sockaddr *)&self_stratcon_ip, \
1167                               self_stratcon_hostname, str, NULL); \
1168 } while(0)
1169
1170   if(closure == NULL) {
1171     /* Only do this the first time we get called */
1172     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
1173              (long unsigned int)now->tv_sec,
1174              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
1175              self_stratcon_hostname);
1176     PUSH_BOTH(DS_OP_INSERT, strdup(str));
1177   }
1178
1179   pthread_mutex_lock(&noits_lock);
1180   ctxs = malloc(sizeof(*ctxs) * noits.size);
1181   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1182                        &vconn)) {
1183     ctxs[n] = (noit_connection_ctx_t *)vconn;
1184     noit_atomic_inc32(&ctxs[n]->refcnt);
1185     n++;
1186   }
1187   pthread_mutex_unlock(&noits_lock);
1188
1189   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
1190            (long unsigned int)now->tv_sec,
1191            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
1192   PUSH_BOTH(DS_OP_INSERT, strdup(str));
1193
1194   if(eventer_get_epoch(&epoch) != 0)
1195     memcpy(&epoch, now, sizeof(epoch));
1196   sub_timeval(*now, epoch, &diff);
1197   uptime = diff.tv_sec;
1198   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
1199            (long unsigned int)now->tv_sec,
1200            (long unsigned int)now->tv_usec/1000UL,
1201            uuid_str, (long long unsigned int)uptime);
1202   PUSH_BOTH(DS_OP_INSERT, strdup(str));
1203
1204   for(i=0; i<n; i++) {
1205     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
1206     noit_connection_ctx_deref(ctxs[i]);
1207   }
1208   free(ctxs);
1209   PUSH_BOTH(DS_OP_CHKPT, NULL);
1210
1211   add_timeval(e->whence, whence, &whence);
1212   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
1213   return 0;
1214 }
1215
1216 static int
1217 rest_show_noits(noit_http_rest_closure_t *restc,
1218                 int npats, char **pats) {
1219   xmlDocPtr doc;
1220   xmlNodePtr root;
1221   noit_hash_table seen = NOIT_HASH_EMPTY;
1222   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1223   char path[256];
1224   const char *key_id;
1225   const char *type = NULL, *want_cn = NULL;
1226   int klen, n = 0, i, di, cnt;
1227   void *vconn;
1228   noit_connection_ctx_t **ctxs;
1229   noit_conf_section_t *noit_configs;
1230   struct timeval now, diff, last;
1231   xmlNodePtr node;
1232   noit_http_request *req = noit_http_session_request(restc->http_ctx);
1233
1234   noit_http_process_querystring(req);
1235   type = noit_http_request_querystring(req, "type");
1236   want_cn = noit_http_request_querystring(req, "cn");
1237
1238   gettimeofday(&now, NULL);
1239
1240   pthread_mutex_lock(&noits_lock);
1241   ctxs = malloc(sizeof(*ctxs) * noits.size);
1242   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1243                        &vconn)) {
1244     ctxs[n] = (noit_connection_ctx_t *)vconn;
1245     noit_atomic_inc32(&ctxs[n]->refcnt);
1246     n++;
1247   }
1248   pthread_mutex_unlock(&noits_lock);
1249   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
1250
1251   doc = xmlNewDoc((xmlChar *)"1.0");
1252   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
1253   xmlDocSetRootElement(doc, root);
1254
1255   for(i=0; i<n; i++) {
1256     char buff[256];
1257     const char *feedtype = "unknown", *state = "unknown";
1258     noit_connection_ctx_t *ctx = ctxs[i];
1259     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
1260
1261     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1262
1263     /* If the user requested a specific type and we're not it, skip. */
1264     if(type && strcmp(feedtype, type)) {
1265         noit_connection_ctx_deref(ctx);
1266         continue;
1267     }
1268     /* If the user wants a specific CN... limit to that. */
1269     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
1270         noit_connection_ctx_deref(ctx);
1271         continue;
1272     }
1273
1274     node = xmlNewNode(NULL, (xmlChar *)"noit");
1275     snprintf(buff, sizeof(buff), "%llu.%06d",
1276              (long long unsigned)ctx->last_connect.tv_sec,
1277              (int)ctx->last_connect.tv_usec);
1278     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1279     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1280                (xmlChar *)"connected" :
1281                (ctx->retry_event ? (xmlChar *)"disconnected" :
1282                                     (xmlChar *)"connecting"));
1283     if(ctx->e) {
1284       char buff[128];
1285       const char *addrstr = NULL;
1286       struct sockaddr_in6 addr6;
1287       socklen_t len = sizeof(addr6);
1288       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1289         unsigned short port = 0;
1290         if(addr6.sin6_family == AF_INET) {
1291           addrstr = inet_ntop(addr6.sin6_family,
1292                               &((struct sockaddr_in *)&addr6)->sin_addr,
1293                               buff, sizeof(buff));
1294           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
1295           port = ntohs(port);
1296         }
1297         else if(addr6.sin6_family == AF_INET6) {
1298           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1299                               buff, sizeof(buff));
1300           port = ntohs(addr6.sin6_port);
1301         }
1302         if(addrstr != NULL) {
1303           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1304                    ":%u", port);
1305           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1306         }
1307       }
1308     }
1309     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1310                       0, free, NULL);
1311     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1312     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1313     if(ctx->retry_event) {
1314       sub_timeval(ctx->retry_event->whence, now, &diff);
1315       snprintf(buff, sizeof(buff), "%llu.%06d",
1316                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1317       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1318     }
1319     else if(ctx->remote_cn) {
1320       if(ctx->remote_cn)
1321         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1322  
1323       switch(jctx->state) {
1324         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1325         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1326         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1327         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1328         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1329         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1330         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1331       }
1332       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1333       snprintf(buff, sizeof(buff), "%08x:%08x",
1334                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1335       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1336       snprintf(buff, sizeof(buff), "%llu",
1337                (unsigned long long)jctx->total_events);
1338       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1339       snprintf(buff, sizeof(buff), "%llu",
1340                (unsigned long long)jctx->total_bytes_read);
1341       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1342  
1343       sub_timeval(now, ctx->last_connect, &diff);
1344       snprintf(buff, sizeof(buff), "%lld.%06d",
1345                (long long)diff.tv_sec, (int)diff.tv_usec);
1346       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1347  
1348       if(jctx->header.tv_sec) {
1349         last.tv_sec = jctx->header.tv_sec;
1350         last.tv_usec = jctx->header.tv_usec;
1351         snprintf(buff, sizeof(buff), "%llu.%06d",
1352                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1353         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1354         sub_timeval(now, last, &diff);
1355         snprintf(buff, sizeof(buff), "%lld.%06d",
1356                  (long long)diff.tv_sec, (int)diff.tv_usec);
1357         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1358       }
1359     }
1360
1361     xmlAddChild(root, node);
1362     noit_connection_ctx_deref(ctx);
1363   }
1364   free(ctxs);
1365
1366   if(!type || !strcmp(type, "configured")) {
1367     snprintf(path, sizeof(path), "//noits//noit");
1368     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1369     for(di=0; di<cnt; di++) {
1370       char address[64], port_str[32], remote_str[98];
1371       char expected_cn_buff[256], *expected_cn = NULL;
1372       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1373                                  expected_cn_buff, sizeof(expected_cn_buff)))
1374         expected_cn = expected_cn_buff;
1375       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1376       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1377                                  address, sizeof(address))) {
1378         void *v;
1379         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1380                                    port_str, sizeof(port_str)))
1381           strlcpy(port_str, "43191", sizeof(port_str));
1382
1383         /* If the user wants a specific CN... limit to that. */
1384           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1385             continue;
1386
1387         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1388         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1389           node = xmlNewNode(NULL, (xmlChar *)"noit");
1390           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1391           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1392           if(expected_cn)
1393             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1394           xmlAddChild(root, node);
1395         }
1396       }
1397     }
1398     free(noit_configs);
1399   }
1400   noit_hash_destroy(&seen, free, NULL);
1401
1402   noit_http_response_ok(restc->http_ctx, "text/xml");
1403   noit_http_response_xml(restc->http_ctx, doc);
1404   noit_http_response_end(restc->http_ctx);
1405   xmlFreeDoc(doc);
1406   return 0;
1407 }
1408 static int
1409 stratcon_add_noit(const char *target, unsigned short port,
1410                   const char *cn) {
1411   int cnt;
1412   char path[256];
1413   char port_str[6];
1414   noit_conf_section_t *noit_configs, parent;
1415   xmlNodePtr newnoit, config, cnnode;
1416
1417   snprintf(path, sizeof(path),
1418            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1419   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1420   free(noit_configs);
1421   if(cnt != 0) return -1;
1422
1423   parent = noit_conf_get_section(NULL, "//noits");
1424   if(!parent) return -1;
1425   snprintf(port_str, sizeof(port_str), "%d", port);
1426   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1427   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1428   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1429   xmlAddChild(parent, newnoit);
1430   if(cn) {
1431     config = xmlNewNode(NULL, (xmlChar *)"config");
1432     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1433     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1434     xmlAddChild(config, cnnode);
1435     xmlAddChild(newnoit, config);
1436     pthread_mutex_lock(&noit_ip_by_cn_lock);
1437     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1438                       strdup(target), free, free);
1439     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1440   }
1441   if(stratcon_datastore_get_enabled())
1442     stratcon_streamer_connection(NULL, target,
1443                                  stratcon_jlog_recv_handler,
1444                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1445                                  NULL,
1446                                  jlog_streamer_ctx_free);
1447   if(stratcon_iep_get_enabled())
1448     stratcon_streamer_connection(NULL, target,
1449                                  stratcon_jlog_recv_handler,
1450                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1451                                  NULL,
1452                                  jlog_streamer_ctx_free);
1453   return 1;
1454 }
1455 static int
1456 stratcon_remove_noit(const char *target, unsigned short port) {
1457   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1458   const char *key_id;
1459   int klen, n = -1, i, cnt = 0;
1460   void *vconn;
1461   noit_connection_ctx_t **ctx;
1462   noit_conf_section_t *noit_configs;
1463   char path[256];
1464   char remote_str[256];
1465
1466   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1467
1468   snprintf(path, sizeof(path),
1469            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1470   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1471   for(i=0; i<cnt; i++) {
1472     char expected_cn[256];
1473     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1474                                expected_cn, sizeof(expected_cn))) {
1475       pthread_mutex_lock(&noit_ip_by_cn_lock);
1476       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1477                        free, free);
1478       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1479     }
1480     CONF_REMOVE(noit_configs[i]);
1481     xmlUnlinkNode(noit_configs[i]);
1482     xmlFreeNode(noit_configs[i]);
1483     n = 0;
1484   }
1485   free(noit_configs);
1486
1487   pthread_mutex_lock(&noits_lock);
1488   ctx = malloc(sizeof(*ctx) * noits.size);
1489   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1490                        &vconn)) {
1491     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1492       ctx[n] = (noit_connection_ctx_t *)vconn;
1493       noit_atomic_inc32(&ctx[n]->refcnt);
1494       n++;
1495     }
1496   }
1497   pthread_mutex_unlock(&noits_lock);
1498   for(i=0; i<n; i++) {
1499     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1500     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1501   }
1502   free(ctx);
1503   return n;
1504 }
1505 static int
1506 rest_set_noit(noit_http_rest_closure_t *restc,
1507               int npats, char **pats) {
1508   const char *cn = NULL;
1509   noit_http_session_ctx *ctx = restc->http_ctx;
1510   noit_http_request *req = noit_http_session_request(ctx);
1511   unsigned short port = 43191;
1512   if(npats < 1 || npats > 2)
1513     noit_http_response_server_error(ctx, "text/xml");
1514   if(npats == 2) port = atoi(pats[1]);
1515   noit_http_process_querystring(req);
1516   cn = noit_http_request_querystring(req, "cn");
1517   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1518     noit_http_response_ok(ctx, "text/xml");
1519   else
1520     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1521   if(noit_conf_write_file(NULL) != 0)
1522     noitL(noit_error, "local config write failed\n");
1523   noit_conf_mark_changed();
1524   noit_http_response_end(ctx);
1525   return 0;
1526 }
1527 static int
1528 rest_delete_noit(noit_http_rest_closure_t *restc,
1529                  int npats, char **pats) {
1530   noit_http_session_ctx *ctx = restc->http_ctx;
1531   unsigned short port = 43191;
1532   if(npats < 1 || npats > 2)
1533     noit_http_response_server_error(ctx, "text/xml");
1534   if(npats == 2) port = atoi(pats[1]);
1535   if(stratcon_remove_noit(pats[0], port) >= 0)
1536     noit_http_response_ok(ctx, "text/xml");
1537   else
1538     noit_http_response_not_found(ctx, "text/xml");
1539   if(noit_conf_write_file(NULL) != 0)
1540     noitL(noit_error, "local config write failed\n");
1541   noit_conf_mark_changed();
1542   noit_http_response_end(ctx);
1543   return 0;
1544 }
1545 static int
1546 stratcon_console_conf_noits(noit_console_closure_t ncct,
1547                             int argc, char **argv,
1548                             noit_console_state_t *dstate,
1549                             void *closure) {
1550   char *cp, target[128];
1551   unsigned short port = 43191;
1552   int adding = (int)(vpsized_int)closure;
1553   if(argc != 1)
1554     return -1;
1555
1556   cp = strchr(argv[0], ':');
1557   if(cp) {
1558     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1559     port = atoi(cp+1);
1560   }
1561   else strlcpy(target, argv[0], sizeof(target));
1562   if(adding) {
1563     if(stratcon_add_noit(target, port, NULL) >= 0) {
1564       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1565     }
1566     else {
1567       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1568     }
1569   }
1570   else {
1571     if(stratcon_remove_noit(target, port) >= 0) {
1572       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1573     }
1574     else {
1575       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1576     }
1577   }
1578   return 0;
1579 }
1580
1581 static void
1582 register_console_streamer_commands() {
1583   noit_console_state_t *tl;
1584   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1585
1586   tl = noit_console_state_initial();
1587   showcmd = noit_console_state_get_cmd(tl, "show");
1588   assert(showcmd && showcmd->dstate);
1589   confcmd = noit_console_state_get_cmd(tl, "configure");
1590   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1591   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1592
1593   noit_console_state_add_cmd(conftcmd->dstate,
1594     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1595   noit_console_state_add_cmd(conftnocmd->dstate,
1596     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1597
1598   noit_console_state_add_cmd(showcmd->dstate,
1599     NCSCMD("noit", stratcon_console_show_noits,
1600            stratcon_console_noit_opts, NULL, (void *)1));
1601   noit_console_state_add_cmd(showcmd->dstate,
1602     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1603 }
1604
1605 void
1606 stratcon_jlog_streamer_init(const char *toplevel) {
1607   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1608   struct in_addr remote;
1609   char uuid_str[UUID_STR_LEN + 1];
1610
1611   pthread_mutex_init(&noits_lock, NULL);
1612   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1613   eventer_name_callback("noit_connection_reinitiate",
1614                         noit_connection_reinitiate);
1615   eventer_name_callback("stratcon_jlog_recv_handler",
1616                         stratcon_jlog_recv_handler);
1617   eventer_name_callback("noit_connection_ssl_upgrade",
1618                         noit_connection_ssl_upgrade);
1619   eventer_name_callback("noit_connection_complete_connect",
1620                         noit_connection_complete_connect);
1621   eventer_name_callback("noit_connection_session_timeout",
1622                         noit_connection_session_timeout);
1623   register_console_streamer_commands();
1624   stratcon_jlog_streamer_reload(toplevel);
1625   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1626   assert(noit_http_rest_register_auth(
1627     "GET", "/noits/", "^show$", rest_show_noits,
1628              noit_http_rest_client_cert_auth
1629   ) == 0);
1630   assert(noit_http_rest_register_auth(
1631     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1632              noit_http_rest_client_cert_auth
1633   ) == 0);
1634   assert(noit_http_rest_register_auth(
1635     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1636              noit_http_rest_client_cert_auth
1637   ) == 0);
1638   assert(noit_http_rest_register_auth(
1639     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1640              noit_http_rest_client_cert_auth
1641   ) == 0);
1642   assert(noit_http_rest_register_auth(
1643     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1644              noit_http_rest_client_cert_auth
1645   ) == 0);
1646
1647   uuid_clear(self_stratcon_id);
1648
1649   if(noit_conf_get_stringbuf(NULL, "/stratcon/@id",
1650                              uuid_str, sizeof(uuid_str)) &&
1651      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1652     int period;
1653     noit_conf_get_boolean(NULL, "/stratcon/@extended_id",
1654                           &stratcon_selfcheck_extended_id);
1655     /* If a UUID was provided for stratcon itself, we will report metrics
1656      * on a large variety of things (including all noits).
1657      */
1658     if(noit_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1659        period > 0) {
1660       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1661       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1662     }
1663     self_stratcon_ip.sin_family = AF_INET;
1664     remote.s_addr = 0xffffffff;
1665     noit_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1666     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1667     eventer_add_in(periodic_noit_metrics, NULL, whence);
1668   }
1669 }
1670
Note: See TracBrowser for help on using the browser.