root/src/stratcon_jlog_streamer.c

Revision 407a5b96efc104ceb914e1c547777a296c2286a9, 58.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 2 years ago)

Too tired dammit.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "dtrace_probes.h"
35 #include "eventer/eventer.h"
36 #include "noit_conf.h"
37 #include "utils/noit_hash.h"
38 #include "utils/noit_log.h"
39 #include "utils/noit_getip.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_rest.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_jlog_streamer.h"
44 #include "stratcon_iep.h"
45
46 #include <unistd.h>
47 #include <assert.h>
48 #include <errno.h>
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #ifdef HAVE_SYS_FILIO_H
52 #include <sys/filio.h>
53 #endif
54 #include <netinet/in.h>
55 #include <sys/un.h>
56 #include <arpa/inet.h>
57
58 pthread_mutex_t noits_lock;
59 noit_hash_table noits = NOIT_HASH_EMPTY;
60 pthread_mutex_t noit_ip_by_cn_lock;
61 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
62 static uuid_t self_stratcon_id;
63 static char self_stratcon_hostname[256] = "\0";
64 static struct sockaddr_in self_stratcon_ip;
65
66 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
67
68 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
69
70 static const char *feed_type_to_str(int jlog_feed_cmd) {
71   switch(jlog_feed_cmd) {
72     case NOIT_JLOG_DATA_FEED: return "durable/storage";
73     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
74   }
75   return "unknown";
76 }
77
78 #define GET_EXPECTED_CN(nctx, cn) do { \
79   void *vcn; \
80   cn = NULL; \
81   if(nctx->config && \
82      noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
83      cn = vcn; \
84   } \
85 } while(0)
86 #define GET_FEEDTYPE(nctx, feedtype) do { \
87   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
88   feedtype = "unknown"; \
89   if(_jctx->push == stratcon_datastore_push) \
90     feedtype = "storage"; \
91   else if(_jctx->push == stratcon_iep_line_processor) \
92     feedtype = "iep"; \
93 } while(0)
94
95 static int
96 remote_str_sort(const void *a, const void *b) {
97   int rv;
98   noit_connection_ctx_t * const *actx = a;
99   noit_connection_ctx_t * const *bctx = b;
100   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
101   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
102   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
103   if(rv) return rv;
104   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
105            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
106 }
107 static void
108 nc_print_noit_conn_brief(noit_console_closure_t ncct,
109                           noit_connection_ctx_t *ctx) {
110   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
111   struct timeval now, diff, session_duration;
112   const char *feedtype = "unknown";
113   const char *lasttime = "never";
114   if(ctx->last_connect.tv_sec != 0) {
115     char cmdbuf[4096];
116     time_t r = ctx->last_connect.tv_sec;
117     struct tm tbuf, *tm;
118     tm = gmtime_r(&r, &tbuf);
119     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
120     lasttime = cmdbuf;
121   }
122   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
123             ctx->remote_cn ? "connected" :
124                              (ctx->retry_event ? "disconnected" :
125                                                    "connecting"), lasttime);
126   if(ctx->e) {
127     char buff[128];
128     const char *addrstr = NULL;
129     struct sockaddr_in6 addr6;
130     socklen_t len = sizeof(addr6);
131     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
132       unsigned short port = 0;
133       if(addr6.sin6_family == AF_INET) {
134         addrstr = inet_ntop(addr6.sin6_family,
135                             &((struct sockaddr_in *)&addr6)->sin_addr,
136                             buff, sizeof(buff));
137         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
138         port = ntohs(port);
139       }
140       else if(addr6.sin6_family == AF_INET6) {
141         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
142                             buff, sizeof(buff));
143         port = ntohs(addr6.sin6_port);
144       }
145       if(addrstr != NULL)
146         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
147       else
148         nc_printf(ncct, "\tLocal address not interpretable\n");
149     }
150     else {
151       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
152                 ctx->e->fd, strerror(errno));
153     }
154   }
155   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
156   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
157   gettimeofday(&now, NULL);
158   if(ctx->retry_event) {
159     sub_timeval(ctx->retry_event->whence, now, &diff);
160     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
161               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
162   }
163   else if(ctx->remote_cn) {
164     nc_printf(ncct, "\tRemote CN: '%s'\n",
165               ctx->remote_cn ? ctx->remote_cn : "???");
166     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
167       struct timeval last;
168       double session_duration_seconds;
169       const char *state = "unknown";
170
171       switch(jctx->state) {
172         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
173         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
174         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
175         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
176         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
177         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
178         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
179       }
180       last.tv_sec = jctx->header.tv_sec;
181       last.tv_usec = jctx->header.tv_usec;
182       sub_timeval(now, last, &diff);
183       sub_timeval(now, ctx->last_connect, &session_duration);
184       session_duration_seconds = session_duration.tv_sec +
185                                  (double)session_duration.tv_usec/1000000.0;
186       nc_printf(ncct, "\tState: %s\n"
187                       "\tNext checkpoint: [%08x:%08x]\n"
188                       "\tLast event: %lld.%06us ago\n"
189                       "\tEvents this session: %llu (%0.2f/s)\n"
190                       "\tOctets this session: %llu (%0.2f/s)\n",
191                 state,
192                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
193                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
194                 jctx->total_events,
195                 (double)jctx->total_events/session_duration_seconds,
196                 jctx->total_bytes_read,
197                 (double)jctx->total_bytes_read/session_duration_seconds);
198     }
199     else {
200       nc_printf(ncct, "\tUnknown type.\n");
201     }
202   }
203 }
204
205 jlog_streamer_ctx_t *
206 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
207   jlog_streamer_ctx_t *ctx;
208   ctx = stratcon_jlog_streamer_ctx_alloc();
209   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
210   ctx->push = stratcon_datastore_push;
211   return ctx;
212 }
213 jlog_streamer_ctx_t *
214 stratcon_jlog_streamer_ctx_alloc(void) {
215   jlog_streamer_ctx_t *ctx;
216   ctx = calloc(1, sizeof(*ctx));
217   return ctx;
218 }
219 noit_connection_ctx_t *
220 noit_connection_ctx_alloc(void) {
221   noit_connection_ctx_t *ctx, **pctx;
222   ctx = calloc(1, sizeof(*ctx));
223   ctx->refcnt = 1;
224   pctx = malloc(sizeof(*pctx));
225   *pctx = ctx;
226   pthread_mutex_lock(&noits_lock);
227   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
228   pthread_mutex_unlock(&noits_lock);
229   return ctx;
230 }
231 int
232 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
233                          struct timeval *now) {
234   noit_connection_ctx_t *ctx = closure;
235   ctx->retry_event = NULL;
236   noit_connection_initiate_connection(closure);
237   return 0;
238 }
239 void
240 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
241                                    struct timeval *now) {
242   struct timeval __now, interval;
243   const char *v, *feedtype, *cn_expected;
244   u_int32_t min_interval = 1000, max_interval = 8000;
245
246   noit_connection_disable_timeout(ctx);
247   if(ctx->remote_cn) {
248     free(ctx->remote_cn);
249     ctx->remote_cn = NULL;
250   }
251   if(noit_hash_retr_str(ctx->config,
252                         "reconnect_initial_interval",
253                         strlen("reconnect_initial_interval"),
254                         &v)) {
255     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
256   }
257   if(noit_hash_retr_str(ctx->config,
258                         "reconnect_maximum_interval",
259                         strlen("reconnect_maximum_interval"),
260                         &v)) {
261     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
262   }
263   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
264   else {
265     ctx->current_backoff *= 2;
266     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
267     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
268   }
269   if(!now) {
270     gettimeofday(&__now, NULL);
271     now = &__now;
272   }
273   interval.tv_sec = ctx->current_backoff / 1000;
274   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
275   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
276         ctx->current_backoff);
277   if(ctx->retry_event)
278     eventer_remove(ctx->retry_event);
279   else
280     ctx->retry_event = eventer_alloc();
281   ctx->retry_event->callback = noit_connection_reinitiate;
282   ctx->retry_event->closure = ctx;
283   ctx->retry_event->mask = EVENTER_TIMER;
284   add_timeval(*now, interval, &ctx->retry_event->whence);
285   GET_EXPECTED_CN(ctx, cn_expected);
286   GET_FEEDTYPE(ctx, feedtype);
287   STRATCON_NOIT_RESCHEDULE(-1, (char *)feedtype, ctx->remote_str,
288                            (char *)cn_expected, ctx->current_backoff);
289   eventer_add(ctx->retry_event);
290 }
291 static void
292 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
293   if(ctx->remote_cn) free(ctx->remote_cn);
294   if(ctx->remote_str) free(ctx->remote_str);
295   if(ctx->retry_event) {
296     eventer_remove(ctx->retry_event);
297     eventer_free(ctx->retry_event);
298   }
299   if(ctx->timeout_event) {
300     eventer_remove(ctx->timeout_event);
301     eventer_free(ctx->timeout_event);
302   }
303   ctx->consumer_free(ctx->consumer_ctx);
304   free(ctx);
305 }
306 void
307 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
308   if(noit_atomic_dec32(&ctx->refcnt) == 0)
309     noit_connection_ctx_free(ctx);
310 }
311 void
312 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
313   noit_connection_ctx_t **pctx = &ctx;
314   pthread_mutex_lock(&noits_lock);
315   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
316                    free, (void (*)(void *))noit_connection_ctx_deref);
317   pthread_mutex_unlock(&noits_lock);
318 }
319 void
320 jlog_streamer_ctx_free(void *cl) {
321   jlog_streamer_ctx_t *ctx = cl;
322   if(ctx->buffer) free(ctx->buffer);
323   free(ctx);
324 }
325
326 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
327 static int
328 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
329   int len, mask;
330   while(ctx->bytes_read < ctx->bytes_expected) {
331     len = Eread(ctx->buffer + ctx->bytes_read,
332                 ctx->bytes_expected - ctx->bytes_read);
333     if(len < 0) {
334       *newmask = mask;
335       return -1;
336     }
337     /* if we get 0 inside SSL, and there was a real error, we
338      * will actually get a -1 here.
339      * if(len == 0) return ctx->bytes_read;
340      */
341     ctx->total_bytes_read += len;
342     ctx->bytes_read += len;
343   }
344   assert(ctx->bytes_read == ctx->bytes_expected);
345   return ctx->bytes_read;
346 }
347 #define FULLREAD(e,ctx,size) do { \
348   int mask, len; \
349   if(!ctx->bytes_expected) { \
350     ctx->bytes_expected = size; \
351     if(ctx->buffer) free(ctx->buffer); \
352     ctx->buffer = malloc(size + 1); \
353     if(ctx->buffer == NULL) { \
354       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
355       goto socket_error; \
356     } \
357     ctx->buffer[size] = '\0'; \
358   } \
359   len = __read_on_ctx(e, ctx, &mask); \
360   if(len < 0) { \
361     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
362     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
363     goto socket_error; \
364   } \
365   ctx->bytes_read = 0; \
366   ctx->bytes_expected = 0; \
367   if(len != size) { \
368     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
369           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
370     goto socket_error; \
371   } \
372 } while(0)
373
374 int
375 noit_connection_session_timeout(eventer_t e, int mask, void *closure,
376                                 struct timeval *now) {
377   noit_connection_ctx_t *nctx = closure;
378   eventer_t fde = nctx->e;
379   nctx->timeout_event = NULL;
380   noitL(noit_error, "Timing out jlog session: %s\n",
381         nctx->remote_cn ? nctx->remote_cn : "(null)");
382   if(fde)
383     eventer_trigger(fde, EVENTER_EXCEPTION);
384   return 0;
385 }
386 int
387 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
388                            struct timeval *now) {
389   noit_connection_ctx_t *nctx = closure;
390   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
391   jlog_streamer_ctx_t dummy;
392   int len;
393   jlog_id n_chkpt;
394   const char *cn_expected, *feedtype;
395   GET_EXPECTED_CN(nctx, cn_expected);
396   GET_FEEDTYPE(nctx, feedtype);
397
398   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
399     if(write(e->fd, e, 0) == -1)
400       noitL(noit_error, "socket error: %s\n", strerror(errno));
401  socket_error:
402     STRATCON_NOIT_CONNECT_CLOSE(e->fd, (char *)feedtype, nctx->remote_str,
403                                 (char *)cn_expected,
404                                 nctx->wants_shutdown, errno);
405     ctx->state = JLOG_STREAMER_WANT_INITIATE;
406     ctx->count = 0;
407     ctx->needs_chkpt = 0;
408     ctx->bytes_read = 0;
409     ctx->bytes_expected = 0;
410     if(ctx->buffer) free(ctx->buffer);
411     ctx->buffer = NULL;
412     noit_connection_schedule_reattempt(nctx, now);
413     eventer_remove_fd(e->fd);
414     nctx->e = NULL;
415     e->opset->close(e->fd, &mask, e);
416     return 0;
417   }
418
419   noit_connection_update_timeout(nctx);
420   while(1) {
421     switch(ctx->state) {
422       case JLOG_STREAMER_WANT_INITIATE:
423         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
424                               sizeof(ctx->jlog_feed_cmd),
425                               &mask, e);
426         if(len < 0) {
427           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
428           goto socket_error;
429         }
430         if(len != sizeof(ctx->jlog_feed_cmd)) {
431           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
432                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
433           goto socket_error;
434         }
435         ctx->state = JLOG_STREAMER_WANT_COUNT;
436         break;
437
438       case JLOG_STREAMER_WANT_ERROR:
439         FULLREAD(e, ctx, 0 - ctx->count);
440         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
441               0 - ctx->count, ctx->buffer);
442         free(ctx->buffer); ctx->buffer = NULL;
443         goto socket_error;
444         break;
445
446       case JLOG_STREAMER_WANT_COUNT:
447         FULLREAD(e, ctx, sizeof(u_int32_t));
448         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
449         ctx->count = ntohl(dummy.count);
450         ctx->needs_chkpt = 0;
451         free(ctx->buffer); ctx->buffer = NULL;
452         STRATCON_NOIT_STREAM_COUNT(e->fd, (char *)feedtype,
453                                    nctx->remote_str, (char *)cn_expected,
454                                    ctx->count);
455         if(ctx->count < 0)
456           ctx->state = JLOG_STREAMER_WANT_ERROR;
457         else
458           ctx->state = JLOG_STREAMER_WANT_HEADER;
459         break;
460
461       case JLOG_STREAMER_WANT_HEADER:
462         if(ctx->count == 0) {
463           ctx->state = JLOG_STREAMER_WANT_COUNT;
464           break;
465         }
466         FULLREAD(e, ctx, sizeof(ctx->header));
467         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
468         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
469         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
470         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
471         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
472         ctx->header.message_len = ntohl(dummy.header.message_len);
473         STRATCON_NOIT_STREAM_HEADER(e->fd, (char *)feedtype,
474                                     nctx->remote_str, (char *)cn_expected,
475                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
476                                     ctx->header.tv_sec, ctx->header.tv_usec,
477                                     ctx->header.message_len);
478         free(ctx->buffer); ctx->buffer = NULL;
479         ctx->state = JLOG_STREAMER_WANT_BODY;
480         break;
481
482       case JLOG_STREAMER_WANT_BODY:
483         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
484         STRATCON_NOIT_STREAM_BODY(e->fd, (char *)feedtype,
485                                   nctx->remote_str, (char *)cn_expected,
486                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
487                                   ctx->header.tv_sec, ctx->header.tv_usec,
488                                   ctx->buffer);
489         if(ctx->header.message_len > 0) {
490           ctx->needs_chkpt = 1;
491           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
492                     ctx->buffer, NULL);
493         }
494         else if(ctx->buffer)
495           free(ctx->buffer);
496         /* Don't free the buffer, it's used by the datastore process. */
497         ctx->buffer = NULL;
498         ctx->count--;
499         ctx->total_events++;
500         if(ctx->count == 0 && ctx->needs_chkpt) {
501           eventer_t completion_e;
502           eventer_remove_fd(e->fd);
503           completion_e = eventer_alloc();
504           memcpy(completion_e, e, sizeof(*e));
505           nctx->e = completion_e;
506           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
507           ctx->state = JLOG_STREAMER_IS_ASYNC;
508           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
509                     NULL, completion_e);
510           noitL(noit_debug, "Pushing %s batch async [%s]: [%u/%u]\n",
511                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
512                 nctx->remote_cn ? nctx->remote_cn : "(null)",
513                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
514           noit_connection_disable_timeout(nctx);
515           return 0;
516         }
517         else if(ctx->count == 0)
518           ctx->state = JLOG_STREAMER_WANT_CHKPT;
519         else
520           ctx->state = JLOG_STREAMER_WANT_HEADER;
521         break;
522
523       case JLOG_STREAMER_IS_ASYNC:
524         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
525       case JLOG_STREAMER_WANT_CHKPT:
526         noitL(noit_debug, "Pushing %s checkpoint [%s]: [%u/%u]\n",
527               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
528               nctx->remote_cn ? nctx->remote_cn : "(null)",
529               ctx->header.chkpt.log, ctx->header.chkpt.marker);
530         n_chkpt.log = htonl(ctx->header.chkpt.log);
531         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
532
533         /* screw short writes.  I'd rather die than not write my data! */
534         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
535                               &mask, e);
536         if(len < 0) {
537           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
538           goto socket_error;
539         }
540         if(len != sizeof(jlog_id)) {
541           noitL(noit_error, "short write on checkpointing stream.\n");
542           goto socket_error;
543         }
544         STRATCON_NOIT_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
545                                         nctx->remote_str, (char *)cn_expected,
546                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
547         ctx->state = JLOG_STREAMER_WANT_COUNT;
548         break;
549     }
550   }
551   /* never get here */
552 }
553
554 int
555 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
556                             struct timeval *now) {
557   noit_connection_ctx_t *nctx = closure;
558   int rv;
559   const char *error = NULL, *cn_expected, *feedtype;
560
561   GET_EXPECTED_CN(nctx, cn_expected);
562   GET_FEEDTYPE(nctx, feedtype);
563   STRATCON_NOIT_CONNECT_SSL(e->fd, (char *)feedtype, nctx->remote_str,
564                             (char *)cn_expected);
565   rv = eventer_SSL_connect(e, &mask);
566   if(rv > 0) {
567     eventer_ssl_ctx_t *sslctx;
568     e->callback = nctx->consumer_callback;
569     /* We must make a copy of the acceptor_closure_t for each new
570      * connection.
571      */
572     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
573       const char *cn, *end;
574       cn = eventer_ssl_get_peer_subject(sslctx);
575       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
576         cn += 3;
577         end = cn;
578         while(*end && *end != '/') end++;
579         nctx->remote_cn = malloc(end - cn + 1);
580         memcpy(nctx->remote_cn, cn, end - cn);
581         nctx->remote_cn[end-cn] = '\0';
582       }
583       if(cn_expected && (!nctx->remote_cn ||
584                          strcmp(nctx->remote_cn, cn_expected))) {
585         error = "jlog connect CN mismatch\n";
586         goto error;
587       }
588     }
589     STRATCON_NOIT_CONNECT_SSL_SUCCESS(e->fd, (char *)feedtype,
590                                       nctx->remote_str, (char *)cn_expected);
591     return e->callback(e, mask, e->closure, now);
592   }
593   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
594   noitL(noit_debug, "jlog streamer SSL upgrade failed.\n");
595
596  error:
597   STRATCON_NOIT_CONNECT_SSL_FAILED(e->fd, (char *)feedtype,
598                                    nctx->remote_str, (char *)cn_expected,
599                                    (char *)error, errno);
600   if(error) noitL(noit_error, "%s", error);
601   eventer_remove_fd(e->fd);
602   nctx->e = NULL;
603   e->opset->close(e->fd, &mask, e);
604   noit_connection_schedule_reattempt(nctx, now);
605   return 0;
606 }
607 int
608 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
609                                  struct timeval *now) {
610   noit_connection_ctx_t *nctx = closure;
611   const char *cert, *key, *ca, *ciphers, *crl = NULL, *cn_expected, *feedtype;
612   char remote_str[128], tmp_str[128];
613   eventer_ssl_ctx_t *sslctx;
614   int aerrno, len;
615   socklen_t aerrno_len = sizeof(aerrno);
616
617   GET_EXPECTED_CN(nctx, cn_expected);
618   GET_FEEDTYPE(nctx, feedtype);
619   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
620     if(aerrno != 0) goto connect_error;
621   aerrno = 0;
622
623   if(mask & EVENTER_EXCEPTION) {
624     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
625       aerrno = errno;
626  connect_error:
627     switch(nctx->r.remote.sa_family) {
628       case AF_INET:
629         len = sizeof(struct sockaddr_in);
630         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
631                   tmp_str, len);
632         snprintf(remote_str, sizeof(remote_str), "%s:%d",
633                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
634         break;
635       case AF_INET6:
636         len = sizeof(struct sockaddr_in6);
637         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
638                   tmp_str, len);
639         snprintf(remote_str, sizeof(remote_str), "%s:%d",
640                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
641        break;
642       case AF_UNIX:
643         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
644         break;
645       default:
646         snprintf(remote_str, sizeof(remote_str), "(unknown)");
647     }
648     noitL(noit_error, "Error connecting to %s: %s\n",
649           remote_str, strerror(aerrno));
650     STRATCON_NOIT_CONNECT_FAILED(e->fd, (char *)feedtype, remote_str,
651                                  (char *)cn_expected, aerrno);
652     eventer_remove_fd(e->fd);
653     nctx->e = NULL;
654     e->opset->close(e->fd, &mask, e);
655     noit_connection_schedule_reattempt(nctx, now);
656     return 0;
657   }
658
659 #define SSLCONFGET(var,name) do { \
660   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
661                          &var)) var = NULL; } while(0)
662   SSLCONFGET(cert, "certificate_file");
663   SSLCONFGET(key, "key_file");
664   SSLCONFGET(ca, "ca_chain");
665   SSLCONFGET(ciphers, "ciphers");
666   SSLCONFGET(crl, "crl");
667   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
668   if(!sslctx) goto connect_error;
669   if(crl) {
670     if(!eventer_ssl_use_crl(sslctx, crl)) {
671       noitL(noit_error, "Failed to load CRL from %s\n", crl);
672       eventer_ssl_ctx_free(sslctx);
673       goto connect_error;
674     }
675   }
676
677   memcpy(&nctx->last_connect, now, sizeof(*now));
678   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
679                              nctx->sslconfig);
680   EVENTER_ATTACH_SSL(e, sslctx);
681   e->callback = noit_connection_ssl_upgrade;
682   STRATCON_NOIT_CONNECT_SUCCESS(e->fd, (char *)feedtype, nctx->remote_str,
683                                 (char *)cn_expected);
684   return e->callback(e, mask, closure, now);
685 }
686 static void
687 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
688   struct timeval __now;
689   const char *cn_expected, *feedtype;
690   eventer_t e;
691   int rv, fd = -1;
692 #ifdef SO_KEEPALIVE
693   int optval;
694   socklen_t optlen = sizeof(optval);
695 #endif
696
697   GET_EXPECTED_CN(nctx, cn_expected);
698   GET_FEEDTYPE(nctx, feedtype);
699   nctx->e = NULL;
700   if(nctx->wants_permanent_shutdown) {
701     STRATCON_NOIT_SHUTDOWN_PERMANENT(-1, (char *)feedtype,
702                                      nctx->remote_str, (char *)cn_expected);
703     noit_connection_ctx_dealloc(nctx);
704     return;
705   }
706   /* Open a socket */
707   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
708   if(fd < 0) goto reschedule;
709
710   /* Make it non-blocking */
711   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
712 #define set_or_bail(type, opt, val) do { \
713   optval = val; \
714   optlen = sizeof(optval); \
715   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
716     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
717           strerror(errno)); \
718     goto reschedule; \
719   } \
720 } while(0)
721 #ifdef SO_KEEPALIVE
722   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
723 #endif
724 #ifdef TCP_KEEPALIVE_THRESHOLD
725   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
726 #endif
727 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
728   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
729 #endif
730 #ifdef TCP_CONN_NOTIFY_THRESHOLD
731   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
732 #endif
733 #ifdef TCP_CONN_ABORT_THRESHOLD
734   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
735 #endif
736
737   /* Initiate a connection */
738   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
739   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
740
741   /* Register a handler for connection completion */
742   e = eventer_alloc();
743   e->fd = fd;
744   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
745   e->callback = noit_connection_complete_connect;
746   e->closure = nctx;
747   nctx->e = e;
748   eventer_add(e);
749
750   STRATCON_NOIT_CONNECT(e->fd, (char *)feedtype, nctx->remote_str,
751                         (char *)cn_expected);
752   noit_connection_update_timeout(nctx);
753   return;
754
755  reschedule:
756   if(fd >= 0) close(fd);
757   gettimeofday(&__now, NULL);
758   noit_connection_schedule_reattempt(nctx, &__now);
759   return;
760 }
761
762 int
763 noit_connection_update_timeout(noit_connection_ctx_t *nctx) {
764   struct timeval now, diff;
765   if(nctx->max_silence == 0) return 0;
766
767   diff.tv_sec = nctx->max_silence / 1000;
768   diff.tv_usec = (nctx->max_silence % 1000) * 1000;
769   gettimeofday(&now, NULL);
770
771   if(!nctx->timeout_event) {
772     nctx->timeout_event = eventer_alloc();
773     nctx->timeout_event->mask = EVENTER_TIMER;
774     nctx->timeout_event->closure = nctx;
775     nctx->timeout_event->callback = noit_connection_session_timeout;
776     add_timeval(now, diff, &nctx->timeout_event->whence);
777     eventer_add(nctx->timeout_event);
778   }
779   else {
780     add_timeval(now, diff, &nctx->timeout_event->whence);
781     eventer_update(nctx->timeout_event, EVENTER_TIMER);
782   }
783   return 0;
784 }
785
786 int
787 noit_connection_disable_timeout(noit_connection_ctx_t *nctx) {
788   if(nctx->timeout_event) {
789     eventer_remove(nctx->timeout_event);
790     eventer_free(nctx->timeout_event);
791     nctx->timeout_event = NULL;
792   }
793   return 0;
794 }
795
796 int
797 initiate_noit_connection(const char *host, unsigned short port,
798                          noit_hash_table *sslconfig, noit_hash_table *config,
799                          eventer_func_t handler, void *closure,
800                          void (*freefunc)(void *)) {
801   noit_connection_ctx_t *ctx;
802   const char *stimeout;
803   int8_t family;
804   int rv;
805   union {
806     struct in_addr addr4;
807     struct in6_addr addr6;
808   } a;
809
810   if(host[0] == '/') {
811     family = AF_UNIX;
812   }
813   else {
814     family = AF_INET;
815     rv = inet_pton(family, host, &a);
816     if(rv != 1) {
817       family = AF_INET6;
818       rv = inet_pton(family, host, &a);
819       if(rv != 1) {
820         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
821         return -1;
822       }
823     }
824   }
825
826   ctx = noit_connection_ctx_alloc();
827   ctx->remote_str = calloc(1, strlen(host) + 7);
828   snprintf(ctx->remote_str, strlen(host) + 7,
829            "%s:%d", host, port);
830  
831   memset(&ctx->r, 0, sizeof(ctx->r));
832   if(family == AF_UNIX) {
833     struct sockaddr_un *s = &ctx->r.remote_un;
834     s->sun_family = AF_UNIX;
835     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
836     ctx->remote_len = sizeof(*s);
837   }
838   else if(family == AF_INET) {
839     struct sockaddr_in *s = &ctx->r.remote_in;
840     s->sin_family = family;
841     s->sin_port = htons(port);
842     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
843     ctx->remote_len = sizeof(*s);
844   }
845   else {
846     struct sockaddr_in6 *s = &ctx->r.remote_in6;
847     s->sin6_family = family;
848     s->sin6_port = htons(port);
849     memcpy(&s->sin6_addr, &a, sizeof(a));
850     ctx->remote_len = sizeof(*s);
851   }
852
853   if(ctx->sslconfig)
854     noit_hash_delete_all(ctx->sslconfig, free, free);
855   else
856     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
857   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
858   if(ctx->config)
859     noit_hash_delete_all(ctx->config, free, free);
860   else
861     ctx->config = calloc(1, sizeof(noit_hash_table));
862   noit_hash_merge_as_dict(ctx->config, config);
863
864   if(noit_hash_retr_str(ctx->config, "timeout", strlen("timeout"), &stimeout))
865     ctx->max_silence = atoi(stimeout);
866   else
867     ctx->max_silence = DEFAULT_NOIT_CONNECTION_TIMEOUT;
868   ctx->consumer_callback = handler;
869   ctx->consumer_free = freefunc;
870   ctx->consumer_ctx = closure;
871   noit_connection_initiate_connection(ctx);
872   return 0;
873 }
874
875 void
876 stratcon_streamer_connection(const char *toplevel, const char *destination,
877                              eventer_func_t handler,
878                              void *(*handler_alloc)(void), void *handler_ctx,
879                              void (*handler_free)(void *)) {
880   int i, cnt = 0;
881   noit_conf_section_t *noit_configs;
882   char path[256];
883
884   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
885   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
886   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
887   for(i=0; i<cnt; i++) {
888     char address[256];
889     unsigned short port;
890     int portint;
891     noit_hash_table *sslconfig, *config;
892
893     if(!noit_conf_get_stringbuf(noit_configs[i],
894                                 "ancestor-or-self::node()/@address",
895                                 address, sizeof(address))) {
896       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
897       continue;
898     }
899     /* if destination is specified, exact match it */
900     if(destination && strcmp(address, destination)) continue;
901
902     if(!noit_conf_get_int(noit_configs[i],
903                           "ancestor-or-self::node()/@port", &portint))
904       portint = 0;
905     port = (unsigned short) portint;
906     if(address[0] != '/' && (portint == 0 || (port != portint))) {
907       /* UNIX sockets don't require a port (they'll ignore it if specified */
908       noitL(noit_stderr,
909             "Invalid port [%d] specified in stanza %d\n", port, i+1);
910       continue;
911     }
912     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
913     config = noit_conf_get_hash(noit_configs[i], "config");
914
915     noitL(noit_error, "initiating to %s\n", address);
916     initiate_noit_connection(address, port, sslconfig, config,
917                              handler,
918                              handler_alloc ? handler_alloc() : handler_ctx,
919                              handler_free);
920     noit_hash_destroy(sslconfig,free,free);
921     free(sslconfig);
922     noit_hash_destroy(config,free,free);
923     free(config);
924   }
925   free(noit_configs);
926 }
927 int
928 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
929   int rv = -1;
930   void *vip;
931   pthread_mutex_lock(&noit_ip_by_cn_lock);
932   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
933     int new_len;
934     char *new_ip = (char *)vip;
935     new_len = strlen(new_ip);
936     strlcpy(ip, new_ip, len);
937     if(new_len >= len) rv = new_len+1;
938     else rv = 0;
939   }
940   pthread_mutex_unlock(&noit_ip_by_cn_lock);
941   return rv;
942 }
943 void
944 stratcon_jlog_streamer_recache_noit() {
945   int di, cnt;
946   noit_conf_section_t *noit_configs;
947   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
948   pthread_mutex_lock(&noit_ip_by_cn_lock);
949   noit_hash_delete_all(&noit_ip_by_cn, free, free);
950   for(di=0; di<cnt; di++) {
951     char address[64];
952     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
953                                  address, sizeof(address))) {
954       char expected_cn[256];
955       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
956                                  expected_cn, sizeof(expected_cn)))
957         noit_hash_store(&noit_ip_by_cn,
958                         strdup(expected_cn), strlen(expected_cn),
959                         strdup(address));
960     }
961   }
962   free(noit_configs);
963   pthread_mutex_unlock(&noit_ip_by_cn_lock);
964 }
965 void
966 stratcon_jlog_streamer_reload(const char *toplevel) {
967   /* flush and repopulate the cn cache */
968   stratcon_jlog_streamer_recache_noit();
969   if(!stratcon_datastore_get_enabled()) return;
970   stratcon_streamer_connection(toplevel, NULL,
971                                stratcon_jlog_recv_handler,
972                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
973                                NULL,
974                                jlog_streamer_ctx_free);
975 }
976
977 char *
978 stratcon_console_noit_opts(noit_console_closure_t ncct,
979                            noit_console_state_stack_t *stack,
980                            noit_console_state_t *dstate,
981                            int argc, char **argv, int idx) {
982   if(argc == 1) {
983     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
984     const char *key_id;
985     int klen, i = 0;
986     void *vconn, *vcn;
987     noit_connection_ctx_t *ctx;
988     noit_hash_table dedup = NOIT_HASH_EMPTY;
989
990     pthread_mutex_lock(&noits_lock);
991     while(noit_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
992       ctx = (noit_connection_ctx_t *)vconn;
993       vcn = NULL;
994       if(ctx->config && noit_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
995          !noit_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
996         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
997           if(idx == i) {
998             pthread_mutex_unlock(&noits_lock);
999             noit_hash_destroy(&dedup, NULL, NULL);
1000             return strdup(vcn);
1001           }
1002           i++;
1003         }
1004       }
1005       if(ctx->remote_str &&
1006          !noit_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
1007         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
1008           if(idx == i) {
1009             pthread_mutex_unlock(&noits_lock);
1010             noit_hash_destroy(&dedup, NULL, NULL);
1011             return strdup(ctx->remote_str);
1012           }
1013           i++;
1014         }
1015       }
1016     }
1017     pthread_mutex_unlock(&noits_lock);
1018     noit_hash_destroy(&dedup, NULL, NULL);
1019   }
1020   if(argc == 2)
1021     return noit_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
1022   return NULL;
1023 }
1024 static int
1025 stratcon_console_show_noits(noit_console_closure_t ncct,
1026                             int argc, char **argv,
1027                             noit_console_state_t *dstate,
1028                             void *closure) {
1029   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1030   const char *key_id, *ecn;
1031   int klen, n = 0, i;
1032   void *vconn;
1033   noit_connection_ctx_t **ctx;
1034
1035   if(closure != (void *)0 && argc == 0) {
1036     nc_printf(ncct, "takes an argument\n");
1037     return 0;
1038   }
1039   if(closure == (void *)0 && argc > 0) {
1040     nc_printf(ncct, "takes no arguments\n");
1041     return 0;
1042   }
1043   pthread_mutex_lock(&noits_lock);
1044   ctx = malloc(sizeof(*ctx) * noits.size);
1045   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1046                        &vconn)) {
1047     ctx[n] = (noit_connection_ctx_t *)vconn;
1048     if(argc == 0 ||
1049        !strcmp(ctx[n]->remote_str, argv[0]) ||
1050        (ctx[n]->config && noit_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
1051         !strcmp(ecn, argv[0]))) {
1052       noit_atomic_inc32(&ctx[n]->refcnt);
1053       n++;
1054     }
1055   }
1056   pthread_mutex_unlock(&noits_lock);
1057   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
1058   for(i=0; i<n; i++) {
1059     nc_print_noit_conn_brief(ncct, ctx[i]);
1060     noit_connection_ctx_deref(ctx[i]);
1061   }
1062   free(ctx);
1063   return 0;
1064 }
1065
1066 static void
1067 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
1068                        noit_connection_ctx_t *nctx) {
1069   struct timeval last, session_duration, diff;
1070   u_int64_t session_duration_ms, last_event_ms;
1071   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
1072   char str[1024], *wr;
1073   int len;
1074   const char *cn_expected;
1075   const char *feedtype = "unknown";
1076
1077   GET_FEEDTYPE(nctx, feedtype);
1078   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
1079
1080   GET_EXPECTED_CN(nctx, cn_expected);
1081   if(!cn_expected) return;
1082
1083   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
1084            (long unsigned int)now->tv_sec,
1085            (long unsigned int)now->tv_usec/1000UL,
1086            uuid_str, cn_expected, feedtype);
1087   wr = str + strlen(str);
1088   len = sizeof(str) - (wr - str);
1089
1090   /* Now we write NAME TYPE VALUE into wr each time and push it */
1091 #define push_noit_m_str(name, value) do { \
1092   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
1093   stratcon_datastore_push(DS_OP_INSERT, \
1094                           (struct sockaddr *)&self_stratcon_ip, \
1095                           self_stratcon_hostname, strdup(str), NULL); \
1096   stratcon_iep_line_processor(DS_OP_INSERT, \
1097                               (struct sockaddr *)&self_stratcon_ip, \
1098                               self_stratcon_hostname, strdup(str), NULL); \
1099 } while(0)
1100 #define push_noit_m_u64(name, value) do { \
1101   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
1102   stratcon_datastore_push(DS_OP_INSERT, \
1103                           (struct sockaddr *)&self_stratcon_ip, \
1104                           self_stratcon_hostname, strdup(str), NULL); \
1105   stratcon_iep_line_processor(DS_OP_INSERT, \
1106                               (struct sockaddr *)&self_stratcon_ip, \
1107                               self_stratcon_hostname, strdup(str), NULL); \
1108 } while(0)
1109
1110   last.tv_sec = jctx->header.tv_sec;
1111   last.tv_usec = jctx->header.tv_usec;
1112   sub_timeval(*now, last, &diff);
1113   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
1114   sub_timeval(*now, nctx->last_connect, &session_duration);
1115   session_duration_ms = session_duration.tv_sec * 1000 +
1116                         session_duration.tv_usec / 1000;
1117
1118   push_noit_m_str("state", nctx->remote_cn ? "connected" :
1119                              (nctx->retry_event ? "disconnected" :
1120                                                   "connecting"));
1121   push_noit_m_u64("last_event_age_ms", last_event_ms);
1122   push_noit_m_u64("session_length_ms", session_duration_ms);
1123 }
1124 static int
1125 periodic_noit_metrics(eventer_t e, int mask, void *closure,
1126                       struct timeval *now) {
1127   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1128   noit_connection_ctx_t **ctxs;
1129   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1130   const char *key_id;
1131   void *vconn;
1132   int klen, n = 0, i;
1133   char str[1024];
1134   char uuid_str[UUID_STR_LEN+1];
1135   struct timeval epoch, diff;
1136   u_int64_t uptime = 0;
1137
1138   uuid_unparse_lower(self_stratcon_id, uuid_str);
1139
1140 #define PUSH_BOTH(type, str) do { \
1141   stratcon_datastore_push(type, \
1142                           (struct sockaddr *)&self_stratcon_ip, \
1143                           self_stratcon_hostname, str, NULL); \
1144   stratcon_iep_line_processor(type, \
1145                               (struct sockaddr *)&self_stratcon_ip, \
1146                               self_stratcon_hostname, str, NULL); \
1147 } while(0)
1148
1149   if(closure == NULL) {
1150     /* Only do this the first time we get called */
1151     char ip_str[128];
1152     inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
1153               sizeof(ip_str));
1154     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
1155              (long unsigned int)now->tv_sec,
1156              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
1157              self_stratcon_hostname);
1158     PUSH_BOTH(DS_OP_INSERT, strdup(str));
1159   }
1160
1161   pthread_mutex_lock(&noits_lock);
1162   ctxs = malloc(sizeof(*ctxs) * noits.size);
1163   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1164                        &vconn)) {
1165     ctxs[n] = (noit_connection_ctx_t *)vconn;
1166     noit_atomic_inc32(&ctxs[n]->refcnt);
1167     n++;
1168   }
1169   pthread_mutex_unlock(&noits_lock);
1170
1171   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok %d noits\n",
1172            (long unsigned int)now->tv_sec,
1173            (long unsigned int)now->tv_usec/1000UL, uuid_str, n);
1174   PUSH_BOTH(DS_OP_INSERT, strdup(str));
1175
1176   if(eventer_get_epoch(&epoch) != 0)
1177     memcpy(&epoch, now, sizeof(epoch));
1178   sub_timeval(*now, epoch, &diff);
1179   uptime = diff.tv_sec;
1180   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\tuptime\tL\t%llu\n",
1181            (long unsigned int)now->tv_sec,
1182            (long unsigned int)now->tv_usec/1000UL,
1183            uuid_str, uptime);
1184   PUSH_BOTH(DS_OP_INSERT, strdup(str));
1185
1186   for(i=0; i<n; i++) {
1187     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
1188     noit_connection_ctx_deref(ctxs[i]);
1189   }
1190   free(ctxs);
1191   PUSH_BOTH(DS_OP_CHKPT, NULL);
1192
1193   add_timeval(e->whence, whence, &whence);
1194   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
1195   return 0;
1196 }
1197
1198 static int
1199 rest_show_noits(noit_http_rest_closure_t *restc,
1200                 int npats, char **pats) {
1201   xmlDocPtr doc;
1202   xmlNodePtr root;
1203   noit_hash_table seen = NOIT_HASH_EMPTY;
1204   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1205   char path[256];
1206   const char *key_id;
1207   const char *type = NULL, *want_cn = NULL;
1208   int klen, n = 0, i, di, cnt;
1209   void *vconn;
1210   noit_connection_ctx_t **ctxs;
1211   noit_conf_section_t *noit_configs;
1212   struct timeval now, diff, last;
1213   xmlNodePtr node;
1214   noit_http_request *req = noit_http_session_request(restc->http_ctx);
1215
1216   noit_http_process_querystring(req);
1217   type = noit_http_request_querystring(req, "type");
1218   want_cn = noit_http_request_querystring(req, "cn");
1219
1220   gettimeofday(&now, NULL);
1221
1222   pthread_mutex_lock(&noits_lock);
1223   ctxs = malloc(sizeof(*ctxs) * noits.size);
1224   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1225                        &vconn)) {
1226     ctxs[n] = (noit_connection_ctx_t *)vconn;
1227     noit_atomic_inc32(&ctxs[n]->refcnt);
1228     n++;
1229   }
1230   pthread_mutex_unlock(&noits_lock);
1231   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
1232
1233   doc = xmlNewDoc((xmlChar *)"1.0");
1234   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
1235   xmlDocSetRootElement(doc, root);
1236
1237   for(i=0; i<n; i++) {
1238     char buff[256];
1239     const char *feedtype = "unknown", *state = "unknown";
1240     noit_connection_ctx_t *ctx = ctxs[i];
1241     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
1242
1243     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1244
1245     /* If the user requested a specific type and we're not it, skip. */
1246     if(type && strcmp(feedtype, type)) continue;
1247     /* If the user wants a specific CN... limit to that. */
1248     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn)))
1249       continue;
1250
1251     node = xmlNewNode(NULL, (xmlChar *)"noit");
1252     snprintf(buff, sizeof(buff), "%llu.%06d",
1253              (long long unsigned)ctx->last_connect.tv_sec,
1254              (int)ctx->last_connect.tv_usec);
1255     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1256     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1257                (xmlChar *)"connected" :
1258                (ctx->retry_event ? (xmlChar *)"disconnected" :
1259                                     (xmlChar *)"connecting"));
1260     if(ctx->e) {
1261       char buff[128];
1262       const char *addrstr = NULL;
1263       struct sockaddr_in6 addr6;
1264       socklen_t len = sizeof(addr6);
1265       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1266         unsigned short port = 0;
1267         if(addr6.sin6_family == AF_INET) {
1268           addrstr = inet_ntop(addr6.sin6_family,
1269                               &((struct sockaddr_in *)&addr6)->sin_addr,
1270                               buff, sizeof(buff));
1271           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
1272           port = ntohs(port);
1273         }
1274         else if(addr6.sin6_family == AF_INET6) {
1275           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1276                               buff, sizeof(buff));
1277           port = ntohs(addr6.sin6_port);
1278         }
1279         if(addrstr != NULL) {
1280           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1281                    ":%u", port);
1282           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1283         }
1284       }
1285     }
1286     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1287                       0, free, NULL);
1288     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1289     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1290     if(ctx->retry_event) {
1291       sub_timeval(ctx->retry_event->whence, now, &diff);
1292       snprintf(buff, sizeof(buff), "%llu.%06d",
1293                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1294       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1295     }
1296     else if(ctx->remote_cn) {
1297       if(ctx->remote_cn)
1298         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1299  
1300       switch(jctx->state) {
1301         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1302         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1303         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1304         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1305         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1306         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1307         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1308       }
1309       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1310       snprintf(buff, sizeof(buff), "%08x:%08x",
1311                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1312       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1313       snprintf(buff, sizeof(buff), "%llu",
1314                (unsigned long long)jctx->total_events);
1315       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1316       snprintf(buff, sizeof(buff), "%llu",
1317                (unsigned long long)jctx->total_bytes_read);
1318       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1319  
1320       sub_timeval(now, ctx->last_connect, &diff);
1321       snprintf(buff, sizeof(buff), "%lld.%06d",
1322                (long long)diff.tv_sec, (int)diff.tv_usec);
1323       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1324  
1325       if(jctx->header.tv_sec) {
1326         last.tv_sec = jctx->header.tv_sec;
1327         last.tv_usec = jctx->header.tv_usec;
1328         snprintf(buff, sizeof(buff), "%llu.%06d",
1329                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1330         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1331         sub_timeval(now, last, &diff);
1332         snprintf(buff, sizeof(buff), "%lld.%06d",
1333                  (long long)diff.tv_sec, (int)diff.tv_usec);
1334         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1335       }
1336     }
1337
1338     xmlAddChild(root, node);
1339     noit_connection_ctx_deref(ctx);
1340   }
1341   free(ctxs);
1342
1343   if(!type || !strcmp(type, "configured")) {
1344     snprintf(path, sizeof(path), "//noits//noit");
1345     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1346     for(di=0; di<cnt; di++) {
1347       char address[64], port_str[32], remote_str[98];
1348       char expected_cn_buff[256], *expected_cn = NULL;
1349       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1350                                  expected_cn_buff, sizeof(expected_cn_buff)))
1351         expected_cn = expected_cn_buff;
1352       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1353       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1354                                  address, sizeof(address))) {
1355         void *v;
1356         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1357                                    port_str, sizeof(port_str)))
1358           strlcpy(port_str, "43191", sizeof(port_str));
1359
1360         /* If the user wants a specific CN... limit to that. */
1361           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1362             continue;
1363
1364         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1365         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1366           node = xmlNewNode(NULL, (xmlChar *)"noit");
1367           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1368           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1369           if(expected_cn)
1370             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1371           xmlAddChild(root, node);
1372         }
1373       }
1374     }
1375     free(noit_configs);
1376   }
1377   noit_hash_destroy(&seen, free, NULL);
1378
1379   noit_http_response_ok(restc->http_ctx, "text/xml");
1380   noit_http_response_xml(restc->http_ctx, doc);
1381   noit_http_response_end(restc->http_ctx);
1382   xmlFreeDoc(doc);
1383   return 0;
1384 }
1385 static int
1386 stratcon_add_noit(const char *target, unsigned short port,
1387                   const char *cn) {
1388   int cnt;
1389   char path[256];
1390   char port_str[6];
1391   noit_conf_section_t *noit_configs, parent;
1392   xmlNodePtr newnoit, config, cnnode;
1393
1394   snprintf(path, sizeof(path),
1395            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1396   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1397   free(noit_configs);
1398   if(cnt != 0) return -1;
1399
1400   parent = noit_conf_get_section(NULL, "//noits");
1401   if(!parent) return -1;
1402   snprintf(port_str, sizeof(port_str), "%d", port);
1403   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1404   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1405   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1406   xmlAddChild(parent, newnoit);
1407   if(cn) {
1408     config = xmlNewNode(NULL, (xmlChar *)"config");
1409     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1410     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1411     xmlAddChild(config, cnnode);
1412     xmlAddChild(newnoit, config);
1413     pthread_mutex_lock(&noit_ip_by_cn_lock);
1414     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1415                       strdup(target), free, free);
1416     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1417   }
1418   if(stratcon_datastore_get_enabled())
1419     stratcon_streamer_connection(NULL, target,
1420                                  stratcon_jlog_recv_handler,
1421                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1422                                  NULL,
1423                                  jlog_streamer_ctx_free);
1424   if(stratcon_iep_get_enabled())
1425     stratcon_streamer_connection(NULL, target,
1426                                  stratcon_jlog_recv_handler,
1427                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1428                                  NULL,
1429                                  jlog_streamer_ctx_free);
1430   return 1;
1431 }
1432 static int
1433 stratcon_remove_noit(const char *target, unsigned short port) {
1434   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1435   const char *key_id;
1436   int klen, n = -1, i, cnt = 0;
1437   void *vconn;
1438   noit_connection_ctx_t **ctx;
1439   noit_conf_section_t *noit_configs;
1440   char path[256];
1441   char remote_str[256];
1442
1443   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1444
1445   snprintf(path, sizeof(path),
1446            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1447   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1448   for(i=0; i<cnt; i++) {
1449     char expected_cn[256];
1450     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1451                                expected_cn, sizeof(expected_cn))) {
1452       pthread_mutex_lock(&noit_ip_by_cn_lock);
1453       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1454                        free, free);
1455       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1456     }
1457     xmlUnlinkNode(noit_configs[i]);
1458     xmlFreeNode(noit_configs[i]);
1459     n = 0;
1460   }
1461   free(noit_configs);
1462
1463   pthread_mutex_lock(&noits_lock);
1464   ctx = malloc(sizeof(*ctx) * noits.size);
1465   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1466                        &vconn)) {
1467     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1468       ctx[n] = (noit_connection_ctx_t *)vconn;
1469       noit_atomic_inc32(&ctx[n]->refcnt);
1470       n++;
1471     }
1472   }
1473   pthread_mutex_unlock(&noits_lock);
1474   for(i=0; i<n; i++) {
1475     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1476     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1477   }
1478   free(ctx);
1479   return n;
1480 }
1481 static int
1482 rest_set_noit(noit_http_rest_closure_t *restc,
1483               int npats, char **pats) {
1484   const char *cn = NULL;
1485   noit_http_session_ctx *ctx = restc->http_ctx;
1486   noit_http_request *req = noit_http_session_request(ctx);
1487   unsigned short port = 43191;
1488   if(npats < 1 || npats > 2)
1489     noit_http_response_server_error(ctx, "text/xml");
1490   if(npats == 2) port = atoi(pats[1]);
1491   noit_http_process_querystring(req);
1492   cn = noit_http_request_querystring(req, "cn");
1493   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1494     noit_http_response_ok(ctx, "text/xml");
1495   else
1496     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1497   if(noit_conf_write_file(NULL) != 0)
1498     noitL(noit_error, "local config write failed\n");
1499   noit_conf_mark_changed();
1500   noit_http_response_end(ctx);
1501   return 0;
1502 }
1503 static int
1504 rest_delete_noit(noit_http_rest_closure_t *restc,
1505                  int npats, char **pats) {
1506   noit_http_session_ctx *ctx = restc->http_ctx;
1507   unsigned short port = 43191;
1508   if(npats < 1 || npats > 2)
1509     noit_http_response_server_error(ctx, "text/xml");
1510   if(npats == 2) port = atoi(pats[1]);
1511   if(stratcon_remove_noit(pats[0], port) >= 0)
1512     noit_http_response_ok(ctx, "text/xml");
1513   else
1514     noit_http_response_not_found(ctx, "text/xml");
1515   if(noit_conf_write_file(NULL) != 0)
1516     noitL(noit_error, "local config write failed\n");
1517   noit_conf_mark_changed();
1518   noit_http_response_end(ctx);
1519   return 0;
1520 }
1521 static int
1522 stratcon_console_conf_noits(noit_console_closure_t ncct,
1523                             int argc, char **argv,
1524                             noit_console_state_t *dstate,
1525                             void *closure) {
1526   char *cp, target[128];
1527   unsigned short port = 43191;
1528   int adding = (int)(vpsized_int)closure;
1529   if(argc != 1)
1530     return -1;
1531
1532   cp = strchr(argv[0], ':');
1533   if(cp) {
1534     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1535     port = atoi(cp+1);
1536   }
1537   else strlcpy(target, argv[0], sizeof(target));
1538   if(adding) {
1539     if(stratcon_add_noit(target, port, NULL) >= 0) {
1540       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1541     }
1542     else {
1543       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1544     }
1545   }
1546   else {
1547     if(stratcon_remove_noit(target, port) >= 0) {
1548       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1549     }
1550     else {
1551       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1552     }
1553   }
1554   return 0;
1555 }
1556
1557 static void
1558 register_console_streamer_commands() {
1559   noit_console_state_t *tl;
1560   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1561
1562   tl = noit_console_state_initial();
1563   showcmd = noit_console_state_get_cmd(tl, "show");
1564   assert(showcmd && showcmd->dstate);
1565   confcmd = noit_console_state_get_cmd(tl, "configure");
1566   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1567   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1568
1569   noit_console_state_add_cmd(conftcmd->dstate,
1570     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1571   noit_console_state_add_cmd(conftnocmd->dstate,
1572     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1573
1574   noit_console_state_add_cmd(showcmd->dstate,
1575     NCSCMD("noit", stratcon_console_show_noits,
1576            stratcon_console_noit_opts, NULL, (void *)1));
1577   noit_console_state_add_cmd(showcmd->dstate,
1578     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1579 }
1580
1581 void
1582 stratcon_jlog_streamer_init(const char *toplevel) {
1583   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1584   struct in_addr remote;
1585   char uuid_str[UUID_STR_LEN + 1];
1586
1587   pthread_mutex_init(&noits_lock, NULL);
1588   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1589   eventer_name_callback("noit_connection_reinitiate",
1590                         noit_connection_reinitiate);
1591   eventer_name_callback("stratcon_jlog_recv_handler",
1592                         stratcon_jlog_recv_handler);
1593   eventer_name_callback("noit_connection_ssl_upgrade",
1594                         noit_connection_ssl_upgrade);
1595   eventer_name_callback("noit_connection_complete_connect",
1596                         noit_connection_complete_connect);
1597   eventer_name_callback("noit_connection_session_timeout",
1598                         noit_connection_session_timeout);
1599   register_console_streamer_commands();
1600   stratcon_jlog_streamer_reload(toplevel);
1601   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1602   assert(noit_http_rest_register_auth(
1603     "GET", "/noits/", "^show$", rest_show_noits,
1604              noit_http_rest_client_cert_auth
1605   ) == 0);
1606   assert(noit_http_rest_register_auth(
1607     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1608              noit_http_rest_client_cert_auth
1609   ) == 0);
1610   assert(noit_http_rest_register_auth(
1611     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1612              noit_http_rest_client_cert_auth
1613   ) == 0);
1614   assert(noit_http_rest_register_auth(
1615     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1616              noit_http_rest_client_cert_auth
1617   ) == 0);
1618   assert(noit_http_rest_register_auth(
1619     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1620              noit_http_rest_client_cert_auth
1621   ) == 0);
1622
1623   uuid_clear(self_stratcon_id);
1624
1625   if(noit_conf_get_stringbuf(NULL, "/stratcon/@id",
1626                              uuid_str, sizeof(uuid_str)) &&
1627      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1628     int period;
1629     /* If a UUID was provided for stratcon itself, we will report metrics
1630      * on a large variety of things (including all noits).
1631      */
1632     if(noit_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1633        period > 0) {
1634       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1635       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1636     }
1637     self_stratcon_ip.sin_family = AF_INET;
1638     remote.s_addr = 0xffffffff;
1639     noit_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1640     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1641     eventer_add_in(periodic_noit_metrics, NULL, whence);
1642   }
1643 }
1644
Note: See TracBrowser for help on using the browser.