root/src/stratcon_jlog_streamer.c

Revision 5c3b2beac55664af9541cf140da10ab73a8ef00e, 60.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 months ago)

Support spefication of SSL layer (SSLv3, SSLv2, TLSv1, TLSv1.1 and TLSv1.2).

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "dtrace_probes.h"
35 #include "eventer/eventer.h"
36 #include "noit_conf.h"
37 #include "utils/noit_hash.h"
38 #include "utils/noit_log.h"
39 #include "utils/noit_getip.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_rest.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_jlog_streamer.h"
44 #include "stratcon_iep.h"
45
46 #include <unistd.h>
47 #include <assert.h>
48 #include <errno.h>
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #ifdef HAVE_SYS_FILIO_H
52 #include <sys/filio.h>
53 #endif
54 #include <netinet/in.h>
55 #include <sys/un.h>
56 #include <arpa/inet.h>
57
58 pthread_mutex_t noits_lock;
59 noit_hash_table noits = NOIT_HASH_EMPTY;
60 pthread_mutex_t noit_ip_by_cn_lock;
61 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
62 static uuid_t self_stratcon_id;
63 static char self_stratcon_hostname[256] = "\0";
64 static struct sockaddr_in self_stratcon_ip;
65 static noit_boolean stratcon_selfcheck_extended_id = noit_true;
66
67 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
68
69 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
70
71 static const char *feed_type_to_str(int jlog_feed_cmd) {
72   switch(jlog_feed_cmd) {
73     case NOIT_JLOG_DATA_FEED: return "durable/storage";
74     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
75   }
76   return "unknown";
77 }
78
79 #define GET_EXPECTED_CN(nctx, cn) do { \
80   void *vcn; \
81   cn = NULL; \
82   if(nctx->config && \
83      noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
84      cn = vcn; \
85   } \
86 } while(0)
87 #define GET_FEEDTYPE(nctx, feedtype) do { \
88   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
89   feedtype = "unknown"; \
90   if(_jctx->push == stratcon_datastore_push) \
91     feedtype = "storage"; \
92   else if(_jctx->push == stratcon_iep_line_processor) \
93     feedtype = "iep"; \
94 } while(0)
95
96 static int
97 remote_str_sort(const void *a, const void *b) {
98   int rv;
99   noit_connection_ctx_t * const *actx = a;
100   noit_connection_ctx_t * const *bctx = b;
101   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
102   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
103   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
104   if(rv) return rv;
105   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
106            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
107 }
108 static void
109 nc_print_noit_conn_brief(noit_console_closure_t ncct,
110                           noit_connection_ctx_t *ctx) {
111   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
112   struct timeval now, diff, session_duration;
113   char cmdbuf[4096];
114   const char *feedtype = "unknown";
115   const char *lasttime = "never";
116   if(ctx->last_connect.tv_sec != 0) {
117     time_t r = ctx->last_connect.tv_sec;
118     struct tm tbuf, *tm;
119     tm = gmtime_r(&r, &tbuf);
120     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
121     lasttime = cmdbuf;
122   }
123   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
124             ctx->remote_cn ? "connected" :
125                              (ctx->retry_event ? "disconnected" :
126                                                    "connecting"), lasttime);
127   if(ctx->e) {
128     char buff[128];
129     const char *addrstr = NULL;
130     struct sockaddr_in6 addr6;
131     socklen_t len = sizeof(addr6);
132     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
133       unsigned short port = 0;
134       if(addr6.sin6_family == AF_INET) {
135         addrstr = inet_ntop(addr6.sin6_family,
136                             &((struct sockaddr_in *)&addr6)->sin_addr,
137                             buff, sizeof(buff));
138         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
139         port = ntohs(port);
140       }
141       else if(addr6.sin6_family == AF_INET6) {
142         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
143                             buff, sizeof(buff));
144         port = ntohs(addr6.sin6_port);
145       }
146       if(addrstr != NULL)
147         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
148       else
149         nc_printf(ncct, "\tLocal address not interpretable\n");
150     }
151     else {
152       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
153                 ctx->e->fd, strerror(errno));
154     }
155   }
156   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
157   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
158   gettimeofday(&now, NULL);
159   if(ctx->timeout_event) {
160     sub_timeval(ctx->timeout_event->whence, now, &diff);
161     nc_printf(ncct, "\tTimeout scheduled for %lld.%06us\n",
162               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
163   }
164   if(ctx->retry_event) {
165     sub_timeval(ctx->retry_event->whence, now, &diff);
166     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
167               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
168   }
169   else if(ctx->remote_cn) {
170     nc_printf(ncct, "\tRemote CN: '%s'\n",
171               ctx->remote_cn ? ctx->remote_cn : "???");
172     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
173       struct timeval last;
174       double session_duration_seconds;
175       const char *state = "unknown";
176
177       switch(jctx->state) {
178         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
179         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
180         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
181         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
182         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
183         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
184         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
185       }
186       last.tv_sec = jctx->header.tv_sec;
187       last.tv_usec = jctx->header.tv_usec;
188       sub_timeval(now, last, &diff);
189       sub_timeval(now, ctx->last_connect, &session_duration);
190       session_duration_seconds = session_duration.tv_sec +
191                                  (double)session_duration.tv_usec/1000000.0;
192       nc_printf(ncct, "\tState: %s\n"
193                       "\tNext checkpoint: [%08x:%08x]\n"
194                       "\tLast event: %lld.%06us ago\n"
195                       "\tEvents this session: %llu (%0.2f/s)\n"
196                       "\tOctets this session: %llu (%0.2f/s)\n",
197                 state,
198                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
199                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
200                 jctx->total_events,
201                 (double)jctx->total_events/session_duration_seconds,
202                 jctx->total_bytes_read,
203                 (double)jctx->total_bytes_read/session_duration_seconds);
204     }
205     else {
206       nc_printf(ncct, "\tUnknown type.\n");
207     }
208   }
209 }
210
211 jlog_streamer_ctx_t *
212 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
213   jlog_streamer_ctx_t *ctx;
214   ctx = stratcon_jlog_streamer_ctx_alloc();
215   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
216   ctx->push = stratcon_datastore_push;
217   return ctx;
218 }
219 jlog_streamer_ctx_t *
220 stratcon_jlog_streamer_ctx_alloc(void) {
221   jlog_streamer_ctx_t *ctx;
222   ctx = calloc(1, sizeof(*ctx));
223   return ctx;
224 }
225 noit_connection_ctx_t *
226 noit_connection_ctx_alloc(void) {
227   noit_connection_ctx_t *ctx, **pctx;
228   ctx = calloc(1, sizeof(*ctx));
229   ctx->refcnt = 1;
230   pctx = malloc(sizeof(*pctx));
231   *pctx = ctx;
232   pthread_mutex_lock(&noits_lock);
233   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
234   pthread_mutex_unlock(&noits_lock);
235   return ctx;
236 }
237 int
238 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
239                          struct timeval *now) {
240   noit_connection_ctx_t *ctx = closure;
241   ctx->retry_event = NULL;
242   noit_connection_initiate_connection(closure);
243   return 0;
244 }
245 void
246 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
247                                    struct timeval *now) {
248   struct timeval __now, interval;
249   const char *v, *feedtype, *cn_expected;
250   u_int32_t min_interval = 1000, max_interval = 8000;
251
252   noit_connection_disable_timeout(ctx);
253   if(ctx->remote_cn) {
254     free(ctx->remote_cn);
255     ctx->remote_cn = NULL;
256   }
257   if(noit_hash_retr_str(ctx->config,
258                         "reconnect_initial_interval",
259                         strlen("reconnect_initial_interval"),
260                         &v)) {
261     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
262   }
263   if(noit_hash_retr_str(ctx->config,
264                         "reconnect_maximum_interval",
265                         strlen("reconnect_maximum_interval"),
266                         &v)) {
267     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
268   }
269   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
270   else {
271     ctx->current_backoff *= 2;
272     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
273     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
274   }
275   if(!now) {
276     gettimeofday(&__now, NULL);
277     now = &__now;
278   }
279   interval.tv_sec = ctx->current_backoff / 1000;
280   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
281   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
282         ctx->current_backoff);
283   if(ctx->retry_event)
284     eventer_remove(ctx->retry_event);
285   else
286     ctx->retry_event = eventer_alloc();
287   ctx->retry_event->callback = noit_connection_reinitiate;
288   ctx->retry_event->closure = ctx;
289   ctx->retry_event->mask = EVENTER_TIMER;
290   add_timeval(*now, interval, &ctx->retry_event->whence);
291   GET_EXPECTED_CN(ctx, cn_expected);
292   GET_FEEDTYPE(ctx, feedtype);
293   STRATCON_RESCHEDULE(-1, (char *)feedtype, ctx->remote_str,
294                            (char *)cn_expected, ctx->current_backoff);
295   eventer_add(ctx->retry_event);
296 }
297 static void
298 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
299   if(ctx->remote_cn) free(ctx->remote_cn);
300   if(ctx->remote_str) free(ctx->remote_str);
301   if(ctx->retry_event) {
302     eventer_remove(ctx->retry_event);
303     eventer_free(ctx->retry_event);
304   }
305   if(ctx->timeout_event) {
306     eventer_remove(ctx->timeout_event);
307     eventer_free(ctx->timeout_event);
308   }
309   ctx->consumer_free(ctx->consumer_ctx);
310   if (ctx->e) {
311     int mask = 0;
312     eventer_remove_fd(ctx->e->fd);
313     ctx->e->opset->close(ctx->e->fd, &mask, ctx->e);
314     eventer_free(ctx->e);
315   }
316   free(ctx);
317 }
318 void
319 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
320   if(noit_atomic_dec32(&ctx->refcnt) == 0) {
321     noit_connection_ctx_free(ctx);
322   }
323 }
324 void
325 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
326   noit_connection_ctx_t **pctx = &ctx;
327   pthread_mutex_lock(&noits_lock);
328   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
329                    free, (void (*)(void *))noit_connection_ctx_deref);
330   pthread_mutex_unlock(&noits_lock);
331 }
332 void
333 jlog_streamer_ctx_free(void *cl) {
334   jlog_streamer_ctx_t *ctx = cl;
335   if(ctx->buffer) free(ctx->buffer);
336   free(ctx);
337 }
338
339 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
340 static int
341 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
342   int len, mask;
343   while(ctx->bytes_read < ctx->bytes_expected) {
344     len = Eread(ctx->buffer + ctx->bytes_read,
345                 ctx->bytes_expected - ctx->bytes_read);
346     if(len < 0) {
347       *newmask = mask;
348       return -1;
349     }
350     /* if we get 0 inside SSL, and there was a real error, we
351      * will actually get a -1 here.
352      * if(len == 0) return ctx->bytes_read;
353      */
354     ctx->total_bytes_read += len;
355     ctx->bytes_read += len;
356   }
357   assert(ctx->bytes_read == ctx->bytes_expected);
358   return ctx->bytes_read;
359 }
360 #define FULLREAD(e,ctx,size) do { \
361   int mask, len; \
362   if(!ctx->bytes_expected) { \
363     ctx->bytes_expected = size; \
364     if(ctx->buffer) free(ctx->buffer); \
365     ctx->buffer = malloc(size + 1); \
366     if(ctx->buffer == NULL) { \
367       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
368       goto socket_error; \
369     } \
370     ctx->buffer[size] = '\0'; \
371   } \
372   len = __read_on_ctx(e, ctx, &mask); \
373   if(len < 0) { \
374     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
375     noitL(noit_error, "[%s] [%s] SSL read error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)", \
376           nctx->remote_cn ? nctx->remote_cn : "(null)", \
377           strerror(errno)); \
378     goto socket_error; \
379   } \
380   ctx->bytes_read = 0; \
381   ctx->bytes_expected = 0; \
382   if(len != size) { \
383     noitL(noit_error, "[%s] [%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
384           nctx->remote_str ? nctx->remote_str : "(null)", \
385           nctx->remote_cn ? nctx->remote_cn : "(null)", \
386           ctx->state, len, (long unsigned int)size); \
387     goto socket_error; \
388   } \
389 } while(0)
390
391 int
392 noit_connection_session_timeout(eventer_t e, int mask, void *closure,
393                                 struct timeval *now) {
394   noit_connection_ctx_t *nctx = closure;
395   eventer_t fde = nctx->e;
396   nctx->timeout_event = NULL;
397   noitL(noit_error, "Timing out jlog session: %s, %s\n",
398         nctx->remote_cn ? nctx->remote_cn : "(null)",
399         nctx->remote_str ? nctx->remote_str : "(null)");
400   if(fde)
401     eventer_trigger(fde, EVENTER_EXCEPTION);
402   return 0;
403 }
404 int
405 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
406                            struct timeval *now) {
407   noit_connection_ctx_t *nctx = closure;
408   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
409   jlog_streamer_ctx_t dummy;
410   int len;
411   jlog_id n_chkpt;
412   const char *cn_expected, *feedtype;
413   GET_EXPECTED_CN(nctx, cn_expected);
414   GET_FEEDTYPE(nctx, feedtype);
415
416   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
417     if(write(e->fd, e, 0) == -1)
418       noitL(noit_error, "[%s] [%s] socket error: %s\n", nctx->remote_str ? nctx->remote_str : "(null)",
419             nctx->remote_cn ? nctx->remote_cn : "(null)", strerror(errno));
420  socket_error:
421     STRATCON_CONNECT_CLOSE(e->fd, (char *)feedtype, nctx->remote_str,
422                                 (char *)cn_expected,
423                                 nctx->wants_shutdown, errno);
424     ctx->state = JLOG_STREAMER_WANT_INITIATE;
425     ctx->count = 0;
426     ctx->needs_chkpt = 0;
427     ctx->bytes_read = 0;
428     ctx->bytes_expected = 0;
429     if(ctx->buffer) free(ctx->buffer);
430     ctx->buffer = NULL;
431     noit_connection_schedule_reattempt(nctx, now);
432     eventer_remove_fd(e->fd);
433     nctx->e = NULL;
434     e->opset->close(e->fd, &mask, e);
435     return 0;
436   }
437
438   noit_connection_update_timeout(nctx);
439   while(1) {
440     switch(ctx->state) {
441       case JLOG_STREAMER_WANT_INITIATE:
442         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
443                               sizeof(ctx->jlog_feed_cmd),
444                               &mask, e);
445         if(len < 0) {
446           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
447           goto socket_error;
448         }
449         if(len != sizeof(ctx->jlog_feed_cmd)) {
450           noitL(noit_error, "[%s] [%s] short write [%d/%d] on initiating stream.\n",
451                 nctx->remote_str ? nctx->remote_str : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)",
452                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
453           goto socket_error;
454         }
455         ctx->state = JLOG_STREAMER_WANT_COUNT;
456         break;
457
458       case JLOG_STREAMER_WANT_ERROR:
459         FULLREAD(e, ctx, 0 - ctx->count);
460         noitL(noit_error, "[%s] [%s] %.*s\n", nctx->remote_str ? nctx->remote_str : "(null)",
461               nctx->remote_cn ? nctx->remote_cn : "(null)", 0 - ctx->count, ctx->buffer);
462         free(ctx->buffer); ctx->buffer = NULL;
463         goto socket_error;
464         break;
465
466       case JLOG_STREAMER_WANT_COUNT:
467         FULLREAD(e, ctx, sizeof(u_int32_t));
468         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
469         ctx->count = ntohl(dummy.count);
470         ctx->needs_chkpt = 0;
471         free(ctx->buffer); ctx->buffer = NULL;
472         STRATCON_STREAM_COUNT(e->fd, (char *)feedtype,
473                                    nctx->remote_str, (char *)cn_expected,
474                                    ctx->count);
475         if(ctx->count < 0)
476           ctx->state = JLOG_STREAMER_WANT_ERROR;
477         else
478           ctx->state = JLOG_STREAMER_WANT_HEADER;
479         break;
480
481       case JLOG_STREAMER_WANT_HEADER:
482         if(ctx->count == 0) {
483           ctx->state = JLOG_STREAMER_WANT_COUNT;
484           break;
485         }
486         FULLREAD(e, ctx, sizeof(ctx->header));
487         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
488         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
489         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
490         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
491         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
492         ctx->header.message_len = ntohl(dummy.header.message_len);
493         STRATCON_STREAM_HEADER(e->fd, (char *)feedtype,
494                                     nctx->remote_str, (char *)cn_expected,
495                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
496                                     ctx->header.tv_sec, ctx->header.tv_usec,
497                                     ctx->header.message_len);
498         free(ctx->buffer); ctx->buffer = NULL;
499         ctx->state = JLOG_STREAMER_WANT_BODY;
500         break;
501
502       case JLOG_STREAMER_WANT_BODY:
503         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
504         STRATCON_STREAM_BODY(e->fd, (char *)feedtype,
505                                   nctx->remote_str, (char *)cn_expected,
506                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
507                                   ctx->header.tv_sec, ctx->header.tv_usec,
508                                   ctx->buffer);
509         if(ctx->header.message_len > 0) {
510           ctx->needs_chkpt = 1;
511           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
512                     ctx->buffer, NULL);
513         }
514         else if(ctx->buffer)
515           free(ctx->buffer);
516         /* Don't free the buffer, it's used by the datastore process. */
517         ctx->buffer = NULL;
518         ctx->count--;
519         ctx->total_events++;
520         if(ctx->count == 0 && ctx->needs_chkpt) {
521           eventer_t completion_e;
522           eventer_remove_fd(e->fd);
523           completion_e = eventer_alloc();
524           memcpy(completion_e, e, sizeof(*e));
525           nctx->e = completion_e;
526           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
527           ctx->state = JLOG_STREAMER_IS_ASYNC;
528           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
529                     NULL, completion_e);
530           noitL(noit_debug, "Pushing %s batch async [%s] [%s]: [%u/%u]\n",
531                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
532                 nctx->remote_str ? nctx->remote_str : "(null)",
533                 nctx->remote_cn ? nctx->remote_cn : "(null)",
534                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
535           noit_connection_disable_timeout(nctx);
536           return 0;
537         }
538         else if(ctx->count == 0)
539           ctx->state = JLOG_STREAMER_WANT_CHKPT;
540         else
541           ctx->state = JLOG_STREAMER_WANT_HEADER;
542         break;
543
544       case JLOG_STREAMER_IS_ASYNC:
545         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
546       case JLOG_STREAMER_WANT_CHKPT:
547         noitL(noit_debug, "Pushing %s checkpoint [%s] [%s]: [%u/%u]\n",
548               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
549               nctx->remote_str ? nctx->remote_str : "(null)",
550               nctx->remote_cn ? nctx->remote_cn : "(null)",
551               ctx->header.chkpt.log, ctx->header.chkpt.marker);
552         n_chkpt.log = htonl(ctx->header.chkpt.log);
553         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
554
555         /* screw short writes.  I'd rather die than not write my data! */
556         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
557                               &mask, e);
558         if(len < 0) {
559           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
560           goto socket_error;
561         }
562         if(len != sizeof(jlog_id)) {
563           noitL(noit_error, "[%s] [%s] short write on checkpointing stream.\n",
564             nctx->remote_str ? nctx->remote_str : "(null)",
565             nctx->remote_cn ? nctx->remote_cn : "(null)");
566           goto socket_error;
567         }
568         STRATCON_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
569                                         nctx->remote_str, (char *)cn_expected,
570                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
571         ctx->state = JLOG_STREAMER_WANT_COUNT;
572         break;
573     }
574   }
575   /* never get here */
576 }
577
578 int
579 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
580                             struct timeval *now) {
581   noit_connection_ctx_t *nctx = closure;
582   int rv;
583   const char *error = NULL, *cn_expected, *feedtype;
584   char error_buff[500];
585   eventer_ssl_ctx_t *sslctx = NULL;
586
587   GET_EXPECTED_CN(nctx, cn_expected);
588   GET_FEEDTYPE(nctx, feedtype);
589   STRATCON_CONNECT_SSL(e->fd, (char *)feedtype, nctx->remote_str,
590                             (char *)cn_expected);
591
592   if(mask & EVENTER_EXCEPTION) goto error;
593
594   rv = eventer_SSL_connect(e, &mask);
595   sslctx = eventer_get_eventer_ssl_ctx(e);
596
597   if(rv > 0) {
598     e->callback = nctx->consumer_callback;
599     /* We must make a copy of the acceptor_closure_t for each new
600      * connection.
601      */
602     if(sslctx != NULL) {
603       const char *cn, *end;
604       cn = eventer_ssl_get_peer_subject(sslctx);
605       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
606         cn += 3;
607         end = cn;
608         while(*end && *end != '/') end++;
609         nctx->remote_cn = malloc(end - cn + 1);
610         memcpy(nctx->remote_cn, cn, end - cn);
611         nctx->remote_cn[end-cn] = '\0';
612       }
613       if(cn_expected && (!nctx->remote_cn ||
614                          strcmp(nctx->remote_cn, cn_expected))) {
615         snprintf(error_buff, sizeof(error_buff), "jlog connect CN mismatch - expected %s, got %s",
616             cn_expected ? cn_expected : "(null)", nctx->remote_cn ? nctx->remote_cn : "(null)");
617         error = error_buff;
618         goto error;
619       }
620     }
621     STRATCON_CONNECT_SSL_SUCCESS(e->fd, (char *)feedtype,
622                                       nctx->remote_str, (char *)cn_expected);
623     return e->callback(e, mask, e->closure, now);
624   }
625   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
626   if(sslctx) error = eventer_ssl_get_last_error(sslctx);
627   noitL(noit_debug, "jlog streamer SSL upgrade failed.\n");
628
629  error:
630   STRATCON_CONNECT_SSL_FAILED(e->fd, (char *)feedtype,
631                                    nctx->remote_str, (char *)cn_expected,
632                                    (char *)error, errno);
633   if(error) noitL(noit_error, "[%s] [%s] noit_connection_ssl_upgrade: %s\n",
634     nctx->remote_str ? nctx->remote_str : "(null)",
635     cn_expected ? cn_expected : "(null)", error);
636   eventer_remove_fd(e->fd);
637   nctx->e = NULL;
638   e->opset->close(e->fd, &mask, e);
639   noit_connection_schedule_reattempt(nctx, now);
640   return 0;
641 }
642 int
643 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
644                                  struct timeval *now) {
645   noit_connection_ctx_t *nctx = closure;
646   const char *layer = NULL, *cert, *key, *ca, *ciphers, *crl = NULL, *cn_expected, *feedtype;
647   char remote_str[128], tmp_str[128];
648   eventer_ssl_ctx_t *sslctx;
649   int aerrno, len;
650   socklen_t aerrno_len = sizeof(aerrno);
651
652   GET_EXPECTED_CN(nctx, cn_expected);
653   GET_FEEDTYPE(nctx, feedtype);
654   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
655     if(aerrno != 0) goto connect_error;
656   aerrno = 0;
657
658   if(mask & EVENTER_EXCEPTION) {
659     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
660       aerrno = errno;
661  connect_error:
662     switch(nctx->r.remote.sa_family) {
663       case AF_INET:
664         len = sizeof(struct sockaddr_in);
665         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
666                   tmp_str, len);
667         snprintf(remote_str, sizeof(remote_str), "%s:%d",
668                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
669         break;
670       case AF_INET6:
671         len = sizeof(struct sockaddr_in6);
672         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
673                   tmp_str, len);
674         snprintf(remote_str, sizeof(remote_str), "%s:%d",
675                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
676        break;
677       case AF_UNIX:
678         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
679         break;
680       default:
681         snprintf(remote_str, sizeof(remote_str), "(unknown)");
682     }
683     noitL(noit_error, "Error connecting to %s (%s): %s\n",
684           remote_str, cn_expected ? cn_expected : "(null)", strerror(aerrno));
685     STRATCON_CONNECT_FAILED(e->fd, (char *)feedtype, remote_str,
686                                  (char *)cn_expected, aerrno);
687     eventer_remove_fd(e->fd);
688     nctx->e = NULL;
689     e->opset->close(e->fd, &mask, e);
690     noit_connection_schedule_reattempt(nctx, now);
691     return 0;
692   }
693
694 #define SSLCONFGET(var,name) do { \
695   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
696                          &var)) var = NULL; } while(0)
697   SSLCONFGET(layer, "layer");
698   SSLCONFGET(cert, "certificate_file");
699   SSLCONFGET(key, "key_file");
700   SSLCONFGET(ca, "ca_chain");
701   SSLCONFGET(ciphers, "ciphers");
702   SSLCONFGET(crl, "crl");
703   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, layer, cert, key, ca, ciphers);
704   if(!sslctx) goto connect_error;
705   if(crl) {
706     if(!eventer_ssl_use_crl(sslctx, crl)) {
707       noitL(noit_error, "Failed to load CRL from %s\n", crl);
708       eventer_ssl_ctx_free(sslctx);
709       goto connect_error;
710     }
711   }
712
713   memcpy(&nctx->last_connect, now, sizeof(*now));
714   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
715                              nctx->sslconfig);
716   EVENTER_ATTACH_SSL(e, sslctx);
717   e->callback = noit_connection_ssl_upgrade;
718   STRATCON_CONNECT_SUCCESS(e->fd, (char *)feedtype, nctx->remote_str,
719                                 (char *)cn_expected);
720   return e->callback(e, mask, closure, now);
721 }
722 static void
723 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
724   struct timeval __now;
725   const char *cn_expected, *feedtype;
726   eventer_t e;
727   int rv, fd = -1;
728 #ifdef SO_KEEPALIVE
729   int optval;
730   socklen_t optlen = sizeof(optval);
731 #endif
732
733   GET_EXPECTED_CN(nctx, cn_expected);
734   GET_FEEDTYPE(nctx, feedtype);
735   nctx->e = NULL;
736   if(nctx->wants_permanent_shutdown) {
737     STRATCON_SHUTDOWN_PERMANENT(-1, (char *)feedtype,
738                                      nctx->remote_str, (char *)cn_expected);
739     noit_connection_ctx_dealloc(nctx);
740     return;
741   }
742   /* Open a socket */
743   fd = socket(nctx->r.remote.sa_family, NE_SOCK_CLOEXEC|SOCK_STREAM, 0);
744   if(fd < 0) goto reschedule;
745
746   /* Make it non-blocking */
747   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
748 #define set_or_bail(type, opt, val) do { \
749   optval = val; \
750   optlen = sizeof(optval); \
751   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
752     noitL(noit_error, "[%s] [%s] Cannot set " #type "/" #opt " on jlog socket: %s\n", \
753           nctx->remote_str ? nctx->remote_str : "(null)", \
754           cn_expected,  \
755           strerror(errno)); \
756     goto reschedule; \
757   } \
758 } while(0)
759 #ifdef SO_KEEPALIVE
760   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
761 #endif
762 #ifdef TCP_KEEPALIVE_THRESHOLD
763   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
764 #endif
765 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
766   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
767 #endif
768 #ifdef TCP_CONN_NOTIFY_THRESHOLD
769   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
770 #endif
771 #ifdef TCP_CONN_ABORT_THRESHOLD
772   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
773 #endif
774
775   /* Initiate a connection */
776   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
777   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
778
779   /* Register a handler for connection completion */
780   e = eventer_alloc();
781   e->fd = fd;
782   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
783   e->callback = noit_connection_complete_connect;
784   e->closure = nctx;
785   nctx->e = e;
786   eventer_add(e);
787
788   STRATCON_CONNECT(e->fd, (char *)feedtype, nctx->remote_str,
789                         (char *)cn_expected);
790   noit_connection_update_timeout(nctx);
791   return;
792
793  reschedule:
794   if(fd >= 0) close(fd);
795   gettimeofday(&__now, NULL);
796   noit_connection_schedule_reattempt(nctx, &__now);
797   return;
798 }
799
800 int
801 noit_connection_update_timeout(noit_connection_ctx_t *nctx) {
802   struct timeval now, diff;
803   if(nctx->max_silence == 0) return 0;
804
805   diff.tv_sec = nctx->max_silence / 1000;
806   diff.tv_usec = (nctx->max_silence % 1000) * 1000;
807   gettimeofday(&now, NULL);
808
809   if(!nctx->timeout_event) {
810     nctx->timeout_event = eventer_alloc();
811     nctx->timeout_event->mask = EVENTER_TIMER;
812     nctx->timeout_event->closure = nctx;
813     nctx->timeout_event->callback = noit_connection_session_timeout;
814     add_timeval(now, diff, &nctx->timeout_event->whence);
815     eventer_add(nctx->timeout_event);
816   }
817   else {
818     add_timeval(now, diff, &nctx->timeout_event->whence);
819     eventer_update(nctx->timeout_event, EVENTER_TIMER);
820   }
821   return 0;
822 }
823
824 int
825 noit_connection_disable_timeout(noit_connection_ctx_t *nctx) {
826   if(nctx->timeout_event) {
827     eventer_remove(nctx->timeout_event);
828     eventer_free(nctx->timeout_event);
829     nctx->timeout_event = NULL;
830   }
831   return 0;
832 }
833
834 int
835 initiate_noit_connection(const char *host, unsigned short port,
836                          noit_hash_table *sslconfig, noit_hash_table *config,
837                          eventer_func_t handler, void *closure,
838                          void (*freefunc)(void *)) {
839   noit_connection_ctx_t *ctx;
840   const char *stimeout;
841   int8_t family;
842   int rv;
843   union {
844     struct in_addr addr4;
845     struct in6_addr addr6;
846   } a;
847
848   if(host[0] == '/') {
849     family = AF_UNIX;
850   }
851   else {
852     family = AF_INET;
853     rv = inet_pton(family, host, &a);
854     if(rv != 1) {
855       family = AF_INET6;
856       rv = inet_pton(family, host, &a);
857       if(rv != 1) {
858         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
859         return -1;
860       }
861     }
862   }
863
864   ctx = noit_connection_ctx_alloc();
865   ctx->remote_str = calloc(1, strlen(host) + 7);
866   snprintf(ctx->remote_str, strlen(host) + 7,
867            "%s:%d", host, port);
868  
869   memset(&ctx->r, 0, sizeof(ctx->r));
870   if(family == AF_UNIX) {
871     struct sockaddr_un *s = &ctx->r.remote_un;
872     s->sun_family = AF_UNIX;
873     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
874     ctx->remote_len = sizeof(*s);
875   }
876   else if(family == AF_INET) {
877     struct sockaddr_in *s = &ctx->r.remote_in;
878     s->sin_family = family;
879     s->sin_port = htons(port);
880     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
881     ctx->remote_len = sizeof(*s);
882   }
883   else {
884     struct sockaddr_in6 *s = &ctx->r.remote_in6;
885     s->sin6_family = family;
886     s->sin6_port = htons(port);
887     memcpy(&s->sin6_addr, &a, sizeof(a));
888     ctx->remote_len = sizeof(*s);
889   }
890
891   if(ctx->sslconfig)
892     noit_hash_delete_all(ctx->sslconfig, free, free);
893   else
894     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
895   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
896   if(ctx->config)
897     noit_hash_delete_all(ctx->config, free, free);
898   else
899     ctx->config = calloc(1, sizeof(noit_hash_table));
900   noit_hash_merge_as_dict(ctx->config, config);
901
902   if(noit_hash_retr_str(ctx->config, "timeout", strlen("timeout"), &stimeout))
903     ctx->max_silence = atoi(stimeout);
904   else
905     ctx->max_silence = DEFAULT_NOIT_CONNECTION_TIMEOUT;
906   ctx->consumer_callback = handler;
907   ctx->consumer_free = freefunc;
908   ctx->consumer_ctx = closure;
909   noit_connection_initiate_connection(ctx);
910   return 0;
911 }
912
913 int
914 stratcon_streamer_connection(const char *toplevel, const char *destination,
915                              eventer_func_t handler,
916                              void *(*handler_alloc)(void), void *handler_ctx,
917                              void (*handler_free)(void *)) {
918   int i, cnt = 0, found = 0;
919   noit_conf_section_t *noit_configs;
920   char path[256];
921
922   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
923   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
924   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
925   for(i=0; i<cnt; i++) {
926     char address[256];
927     unsigned short port;
928     int portint;
929     noit_hash_table *sslconfig, *config;
930
931     if(!noit_conf_get_stringbuf(noit_configs[i],
932                                 "ancestor-or-self::node()/@address",
933                                 address, sizeof(address))) {
934       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
935       continue;
936     }
937     /* if destination is specified, exact match it */
938     if(destination && strcmp(address, destination)) continue;
939
940     if(!noit_conf_get_int(noit_configs[i],
941                           "ancestor-or-self::node()/@port", &portint))
942       portint = 0;
943     port = (unsigned short) portint;
944     if(address[0] != '/' && (portint == 0 || (port != portint))) {
945       /* UNIX sockets don't require a port (they'll ignore it if specified */
946       noitL(noit_stderr,
947             "Invalid port [%d] specified in stanza %d\n", port, i+1);
948       continue;
949     }
950     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
951     config = noit_conf_get_hash(noit_configs[i], "config");
952
953     noitL(noit_error, "initiating to %s\n", address);
954     initiate_noit_connection(address, port, sslconfig, config,
955                              handler,
956                              handler_alloc ? handler_alloc() : handler_ctx,
957                              handler_free);
958     found++;
959     noit_hash_destroy(sslconfig,free,free);
960     free(sslconfig);
961     noit_hash_destroy(config,free,free);
962     free(config);
963   }
964   free(noit_configs);
965   return found;
966 }
967 int
968 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
969   int rv = -1;
970   void *vip;
971   pthread_mutex_lock(&noit_ip_by_cn_lock);
972   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
973     int new_len;
974     char *new_ip = (char *)vip;
975     new_len = strlen(new_ip);
976     strlcpy(ip, new_ip, len);
977     if(new_len >= len) rv = new_len+1;
978     else rv = 0;
979   }
980   pthread_mutex_unlock(&noit_ip_by_cn_lock);
981   return rv;
982 }
983 void
984 stratcon_jlog_streamer_recache_noit() {
985   int di, cnt;
986   noit_conf_section_t *noit_configs;
987   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
988   pthread_mutex_lock(&noit_ip_by_cn_lock);
989   noit_hash_delete_all(&noit_ip_by_cn, free, free);
990   for(di=0; di<cnt; di++) {
991     char address[64];
992     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
993                                  address, sizeof(address))) {
994       char expected_cn[256];
995       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
996                                  expected_cn, sizeof(expected_cn)))
997         noit_hash_store(&noit_ip_by_cn,
998                         strdup(expected_cn), strlen(expected_cn),
999                         strdup(address));
1000     }
1001   }
1002   free(noit_configs);
1003   pthread_mutex_unlock(&noit_ip_by_cn_lock);
1004 }
1005 void
1006 stratcon_jlog_streamer_reload(const char *toplevel) {
1007   /* flush and repopulate the cn cache */
1008   stratcon_jlog_streamer_recache_noit();
1009   if(!stratcon_datastore_get_enabled()) return;
1010   stratcon_streamer_connection(toplevel, NULL,
1011                                stratcon_jlog_recv_handler,
1012                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1013                                NULL,
1014                                jlog_streamer_ctx_free);
1015 }
1016
1017 char *
1018 stratcon_console_noit_opts(noit_console_closure_t ncct,
1019                            noit_console_state_stack_t *stack,
1020                            noit_console_state_t *dstate,
1021                            int argc, char **argv, int idx) {
1022   if(argc == 1) {
1023     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1024     const char *key_id;
1025     int klen, i = 0;
1026     void *vconn, *vcn;
1027     noit_connection_ctx_t *ctx;
1028     noit_hash_table dedup = NOIT_HASH_EMPTY;
1029
1030     pthread_mutex_lock(&noits_lock);
1031     while(noit_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
1032       ctx = (noit_connection_ctx_t *)vconn;
1033       vcn = NULL;
1034       if(ctx->config && noit_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
1035          !noit_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
1036         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
1037           if(idx == i) {
1038             pthread_mutex_unlock(&noits_lock);
1039             noit_hash_destroy(&dedup, NULL, NULL);
1040             return strdup(vcn);
1041           }
1042           i++;
1043         }
1044       }
1045       if(ctx->remote_str &&
1046          !noit_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
1047         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
1048           if(idx == i) {
1049             pthread_mutex_unlock(&noits_lock);
1050             noit_hash_destroy(&dedup, NULL, NULL);
1051             return strdup(ctx->remote_str);
1052           }
1053           i++;
1054         }
1055       }
1056     }
1057     pthread_mutex_unlock(&noits_lock);
1058     noit_hash_destroy(&dedup, NULL, NULL);
1059   }
1060   if(argc == 2)
1061     return noit_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
1062   return NULL;
1063 }
1064 static int
1065 stratcon_console_show_noits(noit_console_closure_t ncct,
1066                             int argc, char **argv,
1067                             noit_console_state_t *dstate,
1068                             void *closure) {
1069   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1070   const char *key_id, *ecn;
1071   int klen, n = 0, i;
1072   void *vconn;
1073   noit_connection_ctx_t **ctx;
1074
1075   if(closure != (void *)0 && argc == 0) {
1076     nc_printf(ncct, "takes an argument\n");
1077     return 0;
1078   }
1079   if(closure == (void *)0 && argc > 0) {
1080     nc_printf(ncct, "takes no arguments\n");
1081     return 0;
1082   }
1083   pthread_mutex_lock(&noits_lock);
1084   ctx = malloc(sizeof(*ctx) * noits.size);
1085   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1086                        &vconn)) {
1087     ctx[n] = (noit_connection_ctx_t *)vconn;
1088     if(argc == 0 ||
1089        !strcmp(ctx[n]->remote_str, argv[0]) ||
1090        (ctx[n]->config && noit_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
1091         !strcmp(ecn, argv[0]))) {
1092       noit_atomic_inc32(&ctx[n]->refcnt);
1093       n++;
1094     }
1095   }
1096   pthread_mutex_unlock(&noits_lock);
1097   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
1098   for(i=0; i<n; i++) {
1099     nc_print_noit_conn_brief(ncct, ctx[i]);
1100     noit_connection_ctx_deref(ctx[i]);
1101   }
1102   free(ctx);
1103   return 0;
1104 }
1105
1106 static void
1107 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
1108                        noit_connection_ctx_t *nctx) {
1109   struct timeval last, session_duration, diff;
1110   u_int64_t session_duration_ms, last_event_ms;
1111   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
1112   char str[1024], *wr;
1113   int len;
1114   const char *cn_expected;
1115   const char *feedtype = "unknown";
1116
1117   GET_FEEDTYPE(nctx, feedtype);
1118   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
1119
1120   GET_EXPECTED_CN(nctx, cn_expected);
1121   if(!cn_expected) return;
1122
1123   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
1124            (long unsigned int)now->tv_sec,
1125            (long unsigned int)now->tv_usec/1000UL,
1126            uuid_str, cn_expected, feedtype);
1127   wr = str + strlen(str);
1128   len = sizeof(str) - (wr - str);
1129
1130   /* Now we write NAME TYPE VALUE into wr each time and push it */
1131 #define push_noit_m_str(name, value) do { \
1132   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
1133   stratcon_datastore_push(DS_OP_INSERT, \
1134                           (struct sockaddr *)&self_stratcon_ip, \
1135                           self_stratcon_hostname, strdup(str), NULL); \
1136   stratcon_iep_line_processor(DS_OP_INSERT, \
1137                               (struct sockaddr *)&self_stratcon_ip, \
1138                               self_stratcon_hostname, strdup(str), NULL); \
1139 } while(0)
1140 #define push_noit_m_u64(name, value) do { \
1141   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
1142   stratcon_datastore_push(DS_OP_INSERT, \
1143                           (struct sockaddr *)&self_stratcon_ip, \
1144                           self_stratcon_hostname, strdup(str), NULL); \
1145   stratcon_iep_line_processor(DS_OP_INSERT, \
1146                               (struct sockaddr *)&self_stratcon_ip, \
1147                               self_stratcon_hostname, strdup(str), NULL); \
1148 } while(0)
1149
1150   last.tv_sec = jctx->header.tv_sec;
1151   last.tv_usec = jctx->header.tv_usec;
1152   sub_timeval(*now, last, &diff);
1153   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
1154   sub_timeval(*now, nctx->last_connect, &session_duration);
1155   session_duration_ms = session_duration.tv_sec * 1000 +
1156                         session_duration.tv_usec / 1000;
1157
1158   push_noit_m_str("state", nctx->remote_cn ? "connected" :
1159                              (nctx->retry_event ? "disconnected" :
1160                                                   "connecting"));
1161   push_noit_m_u64("last_event_age_ms", last_event_ms);
1162   push_noit_m_u64("session_length_ms", session_duration_ms);
1163 }
1164 static int
1165 periodic_noit_metrics(eventer_t e, int mask, void *closure,
1166                       struct timeval *now) {
1167   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1168   noit_connection_ctx_t **ctxs;
1169   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1170   const char *key_id;
1171   void *vconn;
1172   int klen, n = 0, i;
1173   char str[1024];
1174   char uuid_str[1024], tmp_uuid_str[UUID_STR_LEN+1];
1175   struct timeval epoch, diff;
1176   u_int64_t uptime = 0;
1177   char ip_str[128];
1178
1179   inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
1180             sizeof(ip_str));
1181
1182   uuid_str[0] = '\0';
1183   uuid_unparse_lower(self_stratcon_id, tmp_uuid_str);
1184   if(stratcon_selfcheck_extended_id) {
1185     strlcat(uuid_str, ip_str, sizeof(uuid_str)-37);
1186     strlcat(uuid_str, "`selfcheck`selfcheck`", sizeof(uuid_str)-37);
1187   }
1188   strlcat(uuid_str, tmp_uuid_str, sizeof(uuid_str));
1189
1190 #define PUSH_BOTH(type, str) do { \
1191   stratcon_datastore_push(type, \
1192                           (struct sockaddr *)&self_stratcon_ip, \
1193                           self_stratcon_hostname, str, NULL); \
1194   stratcon_iep_line_processor(type, \
1195                               (struct sockaddr *)&self_stratcon_ip, \
1196                               self_stratcon_hostname, str, NULL); \
1197 } while(0)
1198
1199   if(closure == NULL) {
1200     /* Only do this the first time we get called */
1201     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
1202              (long unsigned int)now->tv_sec,
1203              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
1204              self_stratcon_hostname);
1205     PUSH_BOTH(DS_OP_INSERT, strdup(str));
1206   }
1207
1208   pthread_mutex_lock(&noits_lock);
1209   ctxs = malloc(sizeof(*ctxs) * noits.size);
1210   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1211                        &vconn)) {
1212     ctxs[n] = (noit_connection_ctx_t *)vconn;
1213     noit_atomic_inc32(&ctxs[n]->refcnt);
1214     n++;
1215   }
1216   pthread_mutex_unlock(&noits_lock);
1217
1218   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
1219            (long unsigned int)now->tv_sec,
1220            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
1221   PUSH_BOTH(DS_OP_INSERT, strdup(str));
1222
1223   if(eventer_get_epoch(&epoch) != 0)
1224     memcpy(&epoch, now, sizeof(epoch));
1225   sub_timeval(*now, epoch, &diff);
1226   uptime = diff.tv_sec;
1227   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
1228            (long unsigned int)now->tv_sec,
1229            (long unsigned int)now->tv_usec/1000UL,
1230            uuid_str, (long long unsigned int)uptime);
1231   PUSH_BOTH(DS_OP_INSERT, strdup(str));
1232
1233   for(i=0; i<n; i++) {
1234     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
1235     noit_connection_ctx_deref(ctxs[i]);
1236   }
1237   free(ctxs);
1238   PUSH_BOTH(DS_OP_CHKPT, NULL);
1239
1240   add_timeval(e->whence, whence, &whence);
1241   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
1242   return 0;
1243 }
1244
1245 static int
1246 rest_show_noits(noit_http_rest_closure_t *restc,
1247                 int npats, char **pats) {
1248   xmlDocPtr doc;
1249   xmlNodePtr root;
1250   noit_hash_table seen = NOIT_HASH_EMPTY;
1251   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1252   char path[256];
1253   const char *key_id;
1254   const char *type = NULL, *want_cn = NULL;
1255   int klen, n = 0, i, di, cnt;
1256   void *vconn;
1257   noit_connection_ctx_t **ctxs;
1258   noit_conf_section_t *noit_configs;
1259   struct timeval now, diff, last;
1260   xmlNodePtr node;
1261   noit_http_request *req = noit_http_session_request(restc->http_ctx);
1262
1263   noit_http_process_querystring(req);
1264   type = noit_http_request_querystring(req, "type");
1265   want_cn = noit_http_request_querystring(req, "cn");
1266
1267   gettimeofday(&now, NULL);
1268
1269   pthread_mutex_lock(&noits_lock);
1270   ctxs = malloc(sizeof(*ctxs) * noits.size);
1271   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1272                        &vconn)) {
1273     ctxs[n] = (noit_connection_ctx_t *)vconn;
1274     noit_atomic_inc32(&ctxs[n]->refcnt);
1275     n++;
1276   }
1277   pthread_mutex_unlock(&noits_lock);
1278   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
1279
1280   doc = xmlNewDoc((xmlChar *)"1.0");
1281   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
1282   xmlDocSetRootElement(doc, root);
1283
1284   for(i=0; i<n; i++) {
1285     char buff[256];
1286     const char *feedtype = "unknown", *state = "unknown";
1287     noit_connection_ctx_t *ctx = ctxs[i];
1288     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
1289
1290     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1291
1292     /* If the user requested a specific type and we're not it, skip. */
1293     if(type && strcmp(feedtype, type)) {
1294         noit_connection_ctx_deref(ctx);
1295         continue;
1296     }
1297     /* If the user wants a specific CN... limit to that. */
1298     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn))) {
1299         noit_connection_ctx_deref(ctx);
1300         continue;
1301     }
1302
1303     node = xmlNewNode(NULL, (xmlChar *)"noit");
1304     snprintf(buff, sizeof(buff), "%llu.%06d",
1305              (long long unsigned)ctx->last_connect.tv_sec,
1306              (int)ctx->last_connect.tv_usec);
1307     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1308     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1309                (xmlChar *)"connected" :
1310                (ctx->retry_event ? (xmlChar *)"disconnected" :
1311                                     (xmlChar *)"connecting"));
1312     if(ctx->e) {
1313       char buff[128];
1314       const char *addrstr = NULL;
1315       struct sockaddr_in6 addr6;
1316       socklen_t len = sizeof(addr6);
1317       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1318         unsigned short port = 0;
1319         if(addr6.sin6_family == AF_INET) {
1320           addrstr = inet_ntop(addr6.sin6_family,
1321                               &((struct sockaddr_in *)&addr6)->sin_addr,
1322                               buff, sizeof(buff));
1323           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
1324           port = ntohs(port);
1325         }
1326         else if(addr6.sin6_family == AF_INET6) {
1327           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1328                               buff, sizeof(buff));
1329           port = ntohs(addr6.sin6_port);
1330         }
1331         if(addrstr != NULL) {
1332           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1333                    ":%u", port);
1334           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1335         }
1336       }
1337     }
1338     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1339                       0, free, NULL);
1340     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1341     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1342     if(ctx->retry_event) {
1343       sub_timeval(ctx->retry_event->whence, now, &diff);
1344       snprintf(buff, sizeof(buff), "%llu.%06d",
1345                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1346       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1347     }
1348     else if(ctx->remote_cn) {
1349       if(ctx->remote_cn)
1350         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1351  
1352       switch(jctx->state) {
1353         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1354         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1355         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1356         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1357         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1358         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1359         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1360       }
1361       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1362       snprintf(buff, sizeof(buff), "%08x:%08x",
1363                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1364       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1365       snprintf(buff, sizeof(buff), "%llu",
1366                (unsigned long long)jctx->total_events);
1367       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1368       snprintf(buff, sizeof(buff), "%llu",
1369                (unsigned long long)jctx->total_bytes_read);
1370       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1371  
1372       sub_timeval(now, ctx->last_connect, &diff);
1373       snprintf(buff, sizeof(buff), "%lld.%06d",
1374                (long long)diff.tv_sec, (int)diff.tv_usec);
1375       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1376  
1377       if(jctx->header.tv_sec) {
1378         last.tv_sec = jctx->header.tv_sec;
1379         last.tv_usec = jctx->header.tv_usec;
1380         snprintf(buff, sizeof(buff), "%llu.%06d",
1381                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1382         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1383         sub_timeval(now, last, &diff);
1384         snprintf(buff, sizeof(buff), "%lld.%06d",
1385                  (long long)diff.tv_sec, (int)diff.tv_usec);
1386         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1387       }
1388     }
1389
1390     xmlAddChild(root, node);
1391     noit_connection_ctx_deref(ctx);
1392   }
1393   free(ctxs);
1394
1395   if(!type || !strcmp(type, "configured")) {
1396     snprintf(path, sizeof(path), "//noits//noit");
1397     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1398     for(di=0; di<cnt; di++) {
1399       char address[64], port_str[32], remote_str[98];
1400       char expected_cn_buff[256], *expected_cn = NULL;
1401       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1402                                  expected_cn_buff, sizeof(expected_cn_buff)))
1403         expected_cn = expected_cn_buff;
1404       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1405       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1406                                  address, sizeof(address))) {
1407         void *v;
1408         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1409                                    port_str, sizeof(port_str)))
1410           strlcpy(port_str, "43191", sizeof(port_str));
1411
1412         /* If the user wants a specific CN... limit to that. */
1413           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1414             continue;
1415
1416         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1417         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1418           node = xmlNewNode(NULL, (xmlChar *)"noit");
1419           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1420           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1421           if(expected_cn)
1422             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1423           xmlAddChild(root, node);
1424         }
1425       }
1426     }
1427     free(noit_configs);
1428   }
1429   noit_hash_destroy(&seen, free, NULL);
1430
1431   noit_http_response_ok(restc->http_ctx, "text/xml");
1432   noit_http_response_xml(restc->http_ctx, doc);
1433   noit_http_response_end(restc->http_ctx);
1434   xmlFreeDoc(doc);
1435   return 0;
1436 }
1437 static int
1438 stratcon_add_noit(const char *target, unsigned short port,
1439                   const char *cn) {
1440   int cnt;
1441   char path[256];
1442   char port_str[6];
1443   noit_conf_section_t *noit_configs, parent;
1444   xmlNodePtr newnoit, config, cnnode;
1445
1446   snprintf(path, sizeof(path),
1447            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1448   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1449   free(noit_configs);
1450   if(cnt != 0) return -1;
1451
1452   parent = noit_conf_get_section(NULL, "//noits");
1453   if(!parent) return -1;
1454   snprintf(port_str, sizeof(port_str), "%d", port);
1455   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1456   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1457   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1458   xmlAddChild(parent, newnoit);
1459   if(cn) {
1460     config = xmlNewNode(NULL, (xmlChar *)"config");
1461     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1462     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1463     xmlAddChild(config, cnnode);
1464     xmlAddChild(newnoit, config);
1465     pthread_mutex_lock(&noit_ip_by_cn_lock);
1466     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1467                       strdup(target), free, free);
1468     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1469   }
1470   if(stratcon_datastore_get_enabled())
1471     stratcon_streamer_connection(NULL, target,
1472                                  stratcon_jlog_recv_handler,
1473                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1474                                  NULL,
1475                                  jlog_streamer_ctx_free);
1476   if(stratcon_iep_get_enabled())
1477     stratcon_streamer_connection(NULL, target,
1478                                  stratcon_jlog_recv_handler,
1479                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1480                                  NULL,
1481                                  jlog_streamer_ctx_free);
1482   return 1;
1483 }
1484 static int
1485 stratcon_remove_noit(const char *target, unsigned short port) {
1486   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1487   const char *key_id;
1488   int klen, n = -1, i, cnt = 0;
1489   void *vconn;
1490   noit_connection_ctx_t **ctx;
1491   noit_conf_section_t *noit_configs;
1492   char path[256];
1493   char remote_str[256];
1494
1495   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1496
1497   snprintf(path, sizeof(path),
1498            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1499   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1500   for(i=0; i<cnt; i++) {
1501     char expected_cn[256];
1502     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1503                                expected_cn, sizeof(expected_cn))) {
1504       pthread_mutex_lock(&noit_ip_by_cn_lock);
1505       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1506                        free, free);
1507       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1508     }
1509     CONF_REMOVE(noit_configs[i]);
1510     xmlUnlinkNode(noit_configs[i]);
1511     xmlFreeNode(noit_configs[i]);
1512     n = 0;
1513   }
1514   free(noit_configs);
1515
1516   pthread_mutex_lock(&noits_lock);
1517   ctx = malloc(sizeof(*ctx) * noits.size);
1518   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1519                        &vconn)) {
1520     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1521       ctx[n] = (noit_connection_ctx_t *)vconn;
1522       noit_atomic_inc32(&ctx[n]->refcnt);
1523       n++;
1524     }
1525   }
1526   pthread_mutex_unlock(&noits_lock);
1527   for(i=0; i<n; i++) {
1528     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1529     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1530   }
1531   free(ctx);
1532   return n;
1533 }
1534 static int
1535 rest_set_noit(noit_http_rest_closure_t *restc,
1536               int npats, char **pats) {
1537   const char *cn = NULL;
1538   noit_http_session_ctx *ctx = restc->http_ctx;
1539   noit_http_request *req = noit_http_session_request(ctx);
1540   unsigned short port = 43191;
1541   if(npats < 1 || npats > 2)
1542     noit_http_response_server_error(ctx, "text/xml");
1543   if(npats == 2) port = atoi(pats[1]);
1544   noit_http_process_querystring(req);
1545   cn = noit_http_request_querystring(req, "cn");
1546   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1547     noit_http_response_ok(ctx, "text/xml");
1548   else
1549     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1550   if(noit_conf_write_file(NULL) != 0)
1551     noitL(noit_error, "local config write failed\n");
1552   noit_conf_mark_changed();
1553   noit_http_response_end(ctx);
1554   return 0;
1555 }
1556 static int
1557 rest_delete_noit(noit_http_rest_closure_t *restc,
1558                  int npats, char **pats) {
1559   noit_http_session_ctx *ctx = restc->http_ctx;
1560   unsigned short port = 43191;
1561   if(npats < 1 || npats > 2)
1562     noit_http_response_server_error(ctx, "text/xml");
1563   if(npats == 2) port = atoi(pats[1]);
1564   if(stratcon_remove_noit(pats[0], port) >= 0)
1565     noit_http_response_ok(ctx, "text/xml");
1566   else
1567     noit_http_response_not_found(ctx, "text/xml");
1568   if(noit_conf_write_file(NULL) != 0)
1569     noitL(noit_error, "local config write failed\n");
1570   noit_conf_mark_changed();
1571   noit_http_response_end(ctx);
1572   return 0;
1573 }
1574 static int
1575 stratcon_console_conf_noits(noit_console_closure_t ncct,
1576                             int argc, char **argv,
1577                             noit_console_state_t *dstate,
1578                             void *closure) {
1579   char *cp, target[128];
1580   unsigned short port = 43191;
1581   int adding = (int)(vpsized_int)closure;
1582   if(argc != 1)
1583     return -1;
1584
1585   cp = strchr(argv[0], ':');
1586   if(cp) {
1587     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1588     port = atoi(cp+1);
1589   }
1590   else strlcpy(target, argv[0], sizeof(target));
1591   if(adding) {
1592     if(stratcon_add_noit(target, port, NULL) >= 0) {
1593       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1594     }
1595     else {
1596       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1597     }
1598   }
1599   else {
1600     if(stratcon_remove_noit(target, port) >= 0) {
1601       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1602     }
1603     else {
1604       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1605     }
1606   }
1607   return 0;
1608 }
1609
1610 static void
1611 register_console_streamer_commands() {
1612   noit_console_state_t *tl;
1613   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1614
1615   tl = noit_console_state_initial();
1616   showcmd = noit_console_state_get_cmd(tl, "show");
1617   assert(showcmd && showcmd->dstate);
1618   confcmd = noit_console_state_get_cmd(tl, "configure");
1619   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1620   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1621
1622   noit_console_state_add_cmd(conftcmd->dstate,
1623     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1624   noit_console_state_add_cmd(conftnocmd->dstate,
1625     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1626
1627   noit_console_state_add_cmd(showcmd->dstate,
1628     NCSCMD("noit", stratcon_console_show_noits,
1629            stratcon_console_noit_opts, NULL, (void *)1));
1630   noit_console_state_add_cmd(showcmd->dstate,
1631     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1632 }
1633
1634 void
1635 stratcon_jlog_streamer_init(const char *toplevel) {
1636   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1637   struct in_addr remote;
1638   char uuid_str[UUID_STR_LEN + 1];
1639
1640   pthread_mutex_init(&noits_lock, NULL);
1641   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1642   eventer_name_callback("noit_connection_reinitiate",
1643                         noit_connection_reinitiate);
1644   eventer_name_callback("stratcon_jlog_recv_handler",
1645                         stratcon_jlog_recv_handler);
1646   eventer_name_callback("noit_connection_ssl_upgrade",
1647                         noit_connection_ssl_upgrade);
1648   eventer_name_callback("noit_connection_complete_connect",
1649                         noit_connection_complete_connect);
1650   eventer_name_callback("noit_connection_session_timeout",
1651                         noit_connection_session_timeout);
1652   register_console_streamer_commands();
1653   stratcon_jlog_streamer_reload(toplevel);
1654   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1655   assert(noit_http_rest_register_auth(
1656     "GET", "/noits/", "^show$", rest_show_noits,
1657              noit_http_rest_client_cert_auth
1658   ) == 0);
1659   assert(noit_http_rest_register_auth(
1660     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1661              noit_http_rest_client_cert_auth
1662   ) == 0);
1663   assert(noit_http_rest_register_auth(
1664     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1665              noit_http_rest_client_cert_auth
1666   ) == 0);
1667   assert(noit_http_rest_register_auth(
1668     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1669              noit_http_rest_client_cert_auth
1670   ) == 0);
1671   assert(noit_http_rest_register_auth(
1672     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1673              noit_http_rest_client_cert_auth
1674   ) == 0);
1675
1676   uuid_clear(self_stratcon_id);
1677
1678   if(noit_conf_get_stringbuf(NULL, "/stratcon/@id",
1679                              uuid_str, sizeof(uuid_str)) &&
1680      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1681     int period;
1682     noit_conf_get_boolean(NULL, "/stratcon/@extended_id",
1683                           &stratcon_selfcheck_extended_id);
1684     /* If a UUID was provided for stratcon itself, we will report metrics
1685      * on a large variety of things (including all noits).
1686      */
1687     if(noit_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1688        period > 0) {
1689       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1690       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1691     }
1692     self_stratcon_ip.sin_family = AF_INET;
1693     remote.s_addr = 0xffffffff;
1694     noit_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1695     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1696     eventer_add_in(periodic_noit_metrics, NULL, whence);
1697   }
1698 }
1699
Note: See TracBrowser for help on using the browser.