root/src/stratcon_jlog_streamer.c

Revision 9e9846c7878c94bd85ae7f072eeddc316cc795a0, 58.6 kB (checked in by Sean OMeara <someara@opscode.com>, 3 years ago)

Fixing compile error on EL6: stratcon_jlog_streamer.c:1636: error: dereferencing pointer 'addr6.41' does break strict-aliasing

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "dtrace_probes.h"
35 #include "eventer/eventer.h"
36 #include "noit_conf.h"
37 #include "utils/noit_hash.h"
38 #include "utils/noit_log.h"
39 #include "utils/noit_getip.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_rest.h"
42 #include "stratcon_datastore.h"
43 #include "stratcon_jlog_streamer.h"
44 #include "stratcon_iep.h"
45
46 #include <unistd.h>
47 #include <assert.h>
48 #include <errno.h>
49 #include <sys/types.h>
50 #include <sys/socket.h>
51 #ifdef HAVE_SYS_FILIO_H
52 #include <sys/filio.h>
53 #endif
54 #include <netinet/in.h>
55 #include <sys/un.h>
56 #include <arpa/inet.h>
57
58 pthread_mutex_t noits_lock;
59 noit_hash_table noits = NOIT_HASH_EMPTY;
60 pthread_mutex_t noit_ip_by_cn_lock;
61 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
62 static uuid_t self_stratcon_id;
63 static char self_stratcon_hostname[256] = "\0";
64 static struct sockaddr_in self_stratcon_ip;
65
66 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
67
68 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
69
70 static const char *feed_type_to_str(int jlog_feed_cmd) {
71   switch(jlog_feed_cmd) {
72     case NOIT_JLOG_DATA_FEED: return "durable/storage";
73     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
74   }
75   return "unknown";
76 }
77
78 #define GET_EXPECTED_CN(nctx, cn) do { \
79   void *vcn; \
80   cn = NULL; \
81   if(nctx->config && \
82      noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) { \
83      cn = vcn; \
84   } \
85 } while(0)
86 #define GET_FEEDTYPE(nctx, feedtype) do { \
87   jlog_streamer_ctx_t *_jctx = nctx->consumer_ctx; \
88   feedtype = "unknown"; \
89   if(_jctx->push == stratcon_datastore_push) \
90     feedtype = "storage"; \
91   else if(_jctx->push == stratcon_iep_line_processor) \
92     feedtype = "iep"; \
93 } while(0)
94
95 static int
96 remote_str_sort(const void *a, const void *b) {
97   int rv;
98   noit_connection_ctx_t * const *actx = a;
99   noit_connection_ctx_t * const *bctx = b;
100   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
101   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
102   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
103   if(rv) return rv;
104   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
105            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
106 }
107 static void
108 nc_print_noit_conn_brief(noit_console_closure_t ncct,
109                           noit_connection_ctx_t *ctx) {
110   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
111   struct timeval now, diff, session_duration;
112   const char *feedtype = "unknown";
113   const char *lasttime = "never";
114   if(ctx->last_connect.tv_sec != 0) {
115     char cmdbuf[4096];
116     time_t r = ctx->last_connect.tv_sec;
117     struct tm tbuf, *tm;
118     tm = gmtime_r(&r, &tbuf);
119     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
120     lasttime = cmdbuf;
121   }
122   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
123             ctx->remote_cn ? "connected" :
124                              (ctx->retry_event ? "disconnected" :
125                                                    "connecting"), lasttime);
126   if(ctx->e) {
127     char buff[128];
128     const char *addrstr = NULL;
129     struct sockaddr_in6 addr6;
130     socklen_t len = sizeof(addr6);
131     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
132       unsigned short port = 0;
133       if(addr6.sin6_family == AF_INET) {
134         addrstr = inet_ntop(addr6.sin6_family,
135                             &((struct sockaddr_in *)&addr6)->sin_addr,
136                             buff, sizeof(buff));
137         memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
138         port = ntohs(port);
139       }
140       else if(addr6.sin6_family == AF_INET6) {
141         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
142                             buff, sizeof(buff));
143         port = ntohs(addr6.sin6_port);
144       }
145       if(addrstr != NULL)
146         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
147       else
148         nc_printf(ncct, "\tLocal address not interpretable\n");
149     }
150     else {
151       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
152                 ctx->e->fd, strerror(errno));
153     }
154   }
155   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
156   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
157   gettimeofday(&now, NULL);
158   if(ctx->retry_event) {
159     sub_timeval(ctx->retry_event->whence, now, &diff);
160     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
161               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
162   }
163   else if(ctx->remote_cn) {
164     nc_printf(ncct, "\tRemote CN: '%s'\n",
165               ctx->remote_cn ? ctx->remote_cn : "???");
166     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
167       struct timeval last;
168       double session_duration_seconds;
169       const char *state = "unknown";
170
171       switch(jctx->state) {
172         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
173         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
174         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
175         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
176         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
177         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
178         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
179       }
180       last.tv_sec = jctx->header.tv_sec;
181       last.tv_usec = jctx->header.tv_usec;
182       sub_timeval(now, last, &diff);
183       sub_timeval(now, ctx->last_connect, &session_duration);
184       session_duration_seconds = session_duration.tv_sec +
185                                  (double)session_duration.tv_usec/1000000.0;
186       nc_printf(ncct, "\tState: %s\n"
187                       "\tNext checkpoint: [%08x:%08x]\n"
188                       "\tLast event: %lld.%06us ago\n"
189                       "\tEvents this session: %llu (%0.2f/s)\n"
190                       "\tOctets this session: %llu (%0.2f/s)\n",
191                 state,
192                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
193                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
194                 jctx->total_events,
195                 (double)jctx->total_events/session_duration_seconds,
196                 jctx->total_bytes_read,
197                 (double)jctx->total_bytes_read/session_duration_seconds);
198     }
199     else {
200       nc_printf(ncct, "\tUnknown type.\n");
201     }
202   }
203 }
204
205 jlog_streamer_ctx_t *
206 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
207   jlog_streamer_ctx_t *ctx;
208   ctx = stratcon_jlog_streamer_ctx_alloc();
209   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
210   ctx->push = stratcon_datastore_push;
211   return ctx;
212 }
213 jlog_streamer_ctx_t *
214 stratcon_jlog_streamer_ctx_alloc(void) {
215   jlog_streamer_ctx_t *ctx;
216   ctx = calloc(1, sizeof(*ctx));
217   return ctx;
218 }
219 noit_connection_ctx_t *
220 noit_connection_ctx_alloc(void) {
221   noit_connection_ctx_t *ctx, **pctx;
222   ctx = calloc(1, sizeof(*ctx));
223   ctx->refcnt = 1;
224   pctx = malloc(sizeof(*pctx));
225   *pctx = ctx;
226   pthread_mutex_lock(&noits_lock);
227   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
228   pthread_mutex_unlock(&noits_lock);
229   return ctx;
230 }
231 int
232 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
233                          struct timeval *now) {
234   noit_connection_ctx_t *ctx = closure;
235   ctx->retry_event = NULL;
236   noit_connection_initiate_connection(closure);
237   return 0;
238 }
239 void
240 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
241                                    struct timeval *now) {
242   struct timeval __now, interval;
243   const char *v, *feedtype, *cn_expected;
244   u_int32_t min_interval = 1000, max_interval = 8000;
245
246   noit_connection_disable_timeout(ctx);
247   if(ctx->remote_cn) {
248     free(ctx->remote_cn);
249     ctx->remote_cn = NULL;
250   }
251   if(noit_hash_retr_str(ctx->config,
252                         "reconnect_initial_interval",
253                         strlen("reconnect_initial_interval"),
254                         &v)) {
255     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
256   }
257   if(noit_hash_retr_str(ctx->config,
258                         "reconnect_maximum_interval",
259                         strlen("reconnect_maximum_interval"),
260                         &v)) {
261     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
262   }
263   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
264   else {
265     ctx->current_backoff *= 2;
266     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
267     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
268   }
269   if(!now) {
270     gettimeofday(&__now, NULL);
271     now = &__now;
272   }
273   interval.tv_sec = ctx->current_backoff / 1000;
274   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
275   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
276         ctx->current_backoff);
277   if(ctx->retry_event)
278     eventer_remove(ctx->retry_event);
279   else
280     ctx->retry_event = eventer_alloc();
281   ctx->retry_event->callback = noit_connection_reinitiate;
282   ctx->retry_event->closure = ctx;
283   ctx->retry_event->mask = EVENTER_TIMER;
284   add_timeval(*now, interval, &ctx->retry_event->whence);
285   GET_EXPECTED_CN(ctx, cn_expected);
286   GET_FEEDTYPE(ctx, feedtype);
287   STRATCON_NOIT_RESCHEDULE(-1, (char *)feedtype, ctx->remote_str,
288                            (char *)cn_expected, ctx->current_backoff);
289   eventer_add(ctx->retry_event);
290 }
291 static void
292 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
293   if(ctx->remote_cn) free(ctx->remote_cn);
294   if(ctx->remote_str) free(ctx->remote_str);
295   if(ctx->retry_event) {
296     eventer_remove(ctx->retry_event);
297     eventer_free(ctx->retry_event);
298   }
299   if(ctx->timeout_event) {
300     eventer_remove(ctx->timeout_event);
301     eventer_free(ctx->timeout_event);
302   }
303   ctx->consumer_free(ctx->consumer_ctx);
304   free(ctx);
305 }
306 void
307 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
308   if(noit_atomic_dec32(&ctx->refcnt) == 0)
309     noit_connection_ctx_free(ctx);
310 }
311 void
312 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
313   noit_connection_ctx_t **pctx = &ctx;
314   pthread_mutex_lock(&noits_lock);
315   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
316                    free, (void (*)(void *))noit_connection_ctx_deref);
317   pthread_mutex_unlock(&noits_lock);
318 }
319 void
320 jlog_streamer_ctx_free(void *cl) {
321   jlog_streamer_ctx_t *ctx = cl;
322   if(ctx->buffer) free(ctx->buffer);
323   free(ctx);
324 }
325
326 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
327 static int
328 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
329   int len, mask;
330   while(ctx->bytes_read < ctx->bytes_expected) {
331     len = Eread(ctx->buffer + ctx->bytes_read,
332                 ctx->bytes_expected - ctx->bytes_read);
333     if(len < 0) {
334       *newmask = mask;
335       return -1;
336     }
337     /* if we get 0 inside SSL, and there was a real error, we
338      * will actually get a -1 here.
339      * if(len == 0) return ctx->bytes_read;
340      */
341     ctx->total_bytes_read += len;
342     ctx->bytes_read += len;
343   }
344   assert(ctx->bytes_read == ctx->bytes_expected);
345   return ctx->bytes_read;
346 }
347 #define FULLREAD(e,ctx,size) do { \
348   int mask, len; \
349   if(!ctx->bytes_expected) { \
350     ctx->bytes_expected = size; \
351     if(ctx->buffer) free(ctx->buffer); \
352     ctx->buffer = malloc(size + 1); \
353     if(ctx->buffer == NULL) { \
354       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
355       goto socket_error; \
356     } \
357     ctx->buffer[size] = '\0'; \
358   } \
359   len = __read_on_ctx(e, ctx, &mask); \
360   if(len < 0) { \
361     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
362     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
363     goto socket_error; \
364   } \
365   ctx->bytes_read = 0; \
366   ctx->bytes_expected = 0; \
367   if(len != size) { \
368     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
369           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
370     goto socket_error; \
371   } \
372 } while(0)
373
374 int
375 noit_connection_session_timeout(eventer_t e, int mask, void *closure,
376                                 struct timeval *now) {
377   noit_connection_ctx_t *nctx = closure;
378   eventer_t fde = nctx->e;
379   nctx->timeout_event = NULL;
380   noitL(noit_error, "Timing out jlog session: %s\n",
381         nctx->remote_cn ? nctx->remote_cn : "(null)");
382   if(fde)
383     eventer_trigger(fde, EVENTER_EXCEPTION);
384   return 0;
385 }
386 int
387 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
388                            struct timeval *now) {
389   noit_connection_ctx_t *nctx = closure;
390   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
391   jlog_streamer_ctx_t dummy;
392   int len;
393   jlog_id n_chkpt;
394   const char *cn_expected, *feedtype;
395   GET_EXPECTED_CN(nctx, cn_expected);
396   GET_FEEDTYPE(nctx, feedtype);
397
398   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
399     if(write(e->fd, e, 0) == -1)
400       noitL(noit_error, "socket error: %s\n", strerror(errno));
401  socket_error:
402     STRATCON_NOIT_CONNECT_CLOSE(e->fd, (char *)feedtype, nctx->remote_str,
403                                 (char *)cn_expected,
404                                 nctx->wants_shutdown, errno);
405     ctx->state = JLOG_STREAMER_WANT_INITIATE;
406     ctx->count = 0;
407     ctx->needs_chkpt = 0;
408     ctx->bytes_read = 0;
409     ctx->bytes_expected = 0;
410     if(ctx->buffer) free(ctx->buffer);
411     ctx->buffer = NULL;
412     noit_connection_schedule_reattempt(nctx, now);
413     eventer_remove_fd(e->fd);
414     nctx->e = NULL;
415     e->opset->close(e->fd, &mask, e);
416     return 0;
417   }
418
419   noit_connection_update_timeout(nctx);
420   while(1) {
421     switch(ctx->state) {
422       case JLOG_STREAMER_WANT_INITIATE:
423         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
424                               sizeof(ctx->jlog_feed_cmd),
425                               &mask, e);
426         if(len < 0) {
427           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
428           goto socket_error;
429         }
430         if(len != sizeof(ctx->jlog_feed_cmd)) {
431           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
432                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
433           goto socket_error;
434         }
435         ctx->state = JLOG_STREAMER_WANT_COUNT;
436         break;
437
438       case JLOG_STREAMER_WANT_ERROR:
439         FULLREAD(e, ctx, 0 - ctx->count);
440         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
441               0 - ctx->count, ctx->buffer);
442         free(ctx->buffer); ctx->buffer = NULL;
443         goto socket_error;
444         break;
445
446       case JLOG_STREAMER_WANT_COUNT:
447         FULLREAD(e, ctx, sizeof(u_int32_t));
448         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
449         ctx->count = ntohl(dummy.count);
450         ctx->needs_chkpt = 0;
451         free(ctx->buffer); ctx->buffer = NULL;
452         STRATCON_NOIT_STREAM_COUNT(e->fd, (char *)feedtype,
453                                    nctx->remote_str, (char *)cn_expected,
454                                    ctx->count);
455         if(ctx->count < 0)
456           ctx->state = JLOG_STREAMER_WANT_ERROR;
457         else
458           ctx->state = JLOG_STREAMER_WANT_HEADER;
459         break;
460
461       case JLOG_STREAMER_WANT_HEADER:
462         if(ctx->count == 0) {
463           ctx->state = JLOG_STREAMER_WANT_COUNT;
464           break;
465         }
466         FULLREAD(e, ctx, sizeof(ctx->header));
467         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
468         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
469         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
470         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
471         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
472         ctx->header.message_len = ntohl(dummy.header.message_len);
473         STRATCON_NOIT_STREAM_HEADER(e->fd, (char *)feedtype,
474                                     nctx->remote_str, (char *)cn_expected,
475                                     ctx->header.chkpt.log, ctx->header.chkpt.marker,
476                                     ctx->header.tv_sec, ctx->header.tv_usec,
477                                     ctx->header.message_len);
478         free(ctx->buffer); ctx->buffer = NULL;
479         ctx->state = JLOG_STREAMER_WANT_BODY;
480         break;
481
482       case JLOG_STREAMER_WANT_BODY:
483         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
484         STRATCON_NOIT_STREAM_BODY(e->fd, (char *)feedtype,
485                                   nctx->remote_str, (char *)cn_expected,
486                                   ctx->header.chkpt.log, ctx->header.chkpt.marker,
487                                   ctx->header.tv_sec, ctx->header.tv_usec,
488                                   ctx->buffer);
489         if(ctx->header.message_len > 0) {
490           ctx->needs_chkpt = 1;
491           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
492                     ctx->buffer, NULL);
493         }
494         else if(ctx->buffer)
495           free(ctx->buffer);
496         /* Don't free the buffer, it's used by the datastore process. */
497         ctx->buffer = NULL;
498         ctx->count--;
499         ctx->total_events++;
500         if(ctx->count == 0 && ctx->needs_chkpt) {
501           eventer_t completion_e;
502           eventer_remove_fd(e->fd);
503           completion_e = eventer_alloc();
504           memcpy(completion_e, e, sizeof(*e));
505           nctx->e = completion_e;
506           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
507           ctx->state = JLOG_STREAMER_IS_ASYNC;
508           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
509                     NULL, completion_e);
510           noitL(noit_debug, "Pushing %s batch async [%s]: [%u/%u]\n",
511                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
512                 nctx->remote_cn ? nctx->remote_cn : "(null)",
513                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
514           noit_connection_disable_timeout(nctx);
515           return 0;
516         }
517         else if(ctx->count == 0)
518           ctx->state = JLOG_STREAMER_WANT_CHKPT;
519         else
520           ctx->state = JLOG_STREAMER_WANT_HEADER;
521         break;
522
523       case JLOG_STREAMER_IS_ASYNC:
524         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
525       case JLOG_STREAMER_WANT_CHKPT:
526         noitL(noit_debug, "Pushing %s checkpoint [%s]: [%u/%u]\n",
527               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
528               nctx->remote_cn ? nctx->remote_cn : "(null)",
529               ctx->header.chkpt.log, ctx->header.chkpt.marker);
530         n_chkpt.log = htonl(ctx->header.chkpt.log);
531         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
532
533         /* screw short writes.  I'd rather die than not write my data! */
534         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
535                               &mask, e);
536         if(len < 0) {
537           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
538           goto socket_error;
539         }
540         if(len != sizeof(jlog_id)) {
541           noitL(noit_error, "short write on checkpointing stream.\n");
542           goto socket_error;
543         }
544         STRATCON_NOIT_STREAM_CHECKPOINT(e->fd, (char *)feedtype,
545                                         nctx->remote_str, (char *)cn_expected,
546                                         ctx->header.chkpt.log, ctx->header.chkpt.marker);
547         ctx->state = JLOG_STREAMER_WANT_COUNT;
548         break;
549     }
550   }
551   /* never get here */
552 }
553
554 int
555 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
556                             struct timeval *now) {
557   noit_connection_ctx_t *nctx = closure;
558   int rv;
559   const char *error = NULL, *cn_expected, *feedtype;
560
561   GET_EXPECTED_CN(nctx, cn_expected);
562   GET_FEEDTYPE(nctx, feedtype);
563   STRATCON_NOIT_CONNECT_SSL(e->fd, (char *)feedtype, nctx->remote_str,
564                             (char *)cn_expected);
565   rv = eventer_SSL_connect(e, &mask);
566   if(rv > 0) {
567     eventer_ssl_ctx_t *sslctx;
568     e->callback = nctx->consumer_callback;
569     /* We must make a copy of the acceptor_closure_t for each new
570      * connection.
571      */
572     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
573       const char *cn, *end;
574       cn = eventer_ssl_get_peer_subject(sslctx);
575       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
576         cn += 3;
577         end = cn;
578         while(*end && *end != '/') end++;
579         nctx->remote_cn = malloc(end - cn + 1);
580         memcpy(nctx->remote_cn, cn, end - cn);
581         nctx->remote_cn[end-cn] = '\0';
582       }
583       if(cn_expected && (!nctx->remote_cn ||
584                          strcmp(nctx->remote_cn, cn_expected))) {
585         error = "jlog connect CN mismatch\n";
586         goto error;
587       }
588     }
589     STRATCON_NOIT_CONNECT_SSL_SUCCESS(e->fd, (char *)feedtype,
590                                       nctx->remote_str, (char *)cn_expected);
591     return e->callback(e, mask, e->closure, now);
592   }
593   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
594   noitL(noit_debug, "jlog streamer SSL upgrade failed.\n");
595
596  error:
597   STRATCON_NOIT_CONNECT_SSL_FAILED(e->fd, (char *)feedtype,
598                                    nctx->remote_str, (char *)cn_expected,
599                                    (char *)error, errno);
600   if(error) noitL(noit_error, "%s", error);
601   eventer_remove_fd(e->fd);
602   nctx->e = NULL;
603   e->opset->close(e->fd, &mask, e);
604   noit_connection_schedule_reattempt(nctx, now);
605   return 0;
606 }
607 int
608 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
609                                  struct timeval *now) {
610   noit_connection_ctx_t *nctx = closure;
611   const char *cert, *key, *ca, *ciphers, *crl = NULL, *cn_expected, *feedtype;
612   char remote_str[128], tmp_str[128];
613   eventer_ssl_ctx_t *sslctx;
614   int aerrno, len;
615   socklen_t aerrno_len = sizeof(aerrno);
616
617   GET_EXPECTED_CN(nctx, cn_expected);
618   GET_FEEDTYPE(nctx, feedtype);
619   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
620     if(aerrno != 0) goto connect_error;
621   aerrno = 0;
622
623   if(mask & EVENTER_EXCEPTION) {
624     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
625       aerrno = errno;
626  connect_error:
627     switch(nctx->r.remote.sa_family) {
628       case AF_INET:
629         len = sizeof(struct sockaddr_in);
630         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
631                   tmp_str, len);
632         snprintf(remote_str, sizeof(remote_str), "%s:%d",
633                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
634         break;
635       case AF_INET6:
636         len = sizeof(struct sockaddr_in6);
637         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
638                   tmp_str, len);
639         snprintf(remote_str, sizeof(remote_str), "%s:%d",
640                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
641        break;
642       case AF_UNIX:
643         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
644         break;
645       default:
646         snprintf(remote_str, sizeof(remote_str), "(unknown)");
647     }
648     noitL(noit_error, "Error connecting to %s: %s\n",
649           remote_str, strerror(aerrno));
650     STRATCON_NOIT_CONNECT_FAILED(e->fd, (char *)feedtype, remote_str,
651                                  (char *)cn_expected, aerrno);
652     eventer_remove_fd(e->fd);
653     nctx->e = NULL;
654     e->opset->close(e->fd, &mask, e);
655     noit_connection_schedule_reattempt(nctx, now);
656     return 0;
657   }
658
659 #define SSLCONFGET(var,name) do { \
660   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
661                          &var)) var = NULL; } while(0)
662   SSLCONFGET(cert, "certificate_file");
663   SSLCONFGET(key, "key_file");
664   SSLCONFGET(ca, "ca_chain");
665   SSLCONFGET(ciphers, "ciphers");
666   SSLCONFGET(crl, "crl");
667   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
668   if(!sslctx) goto connect_error;
669   if(crl) {
670     if(!eventer_ssl_use_crl(sslctx, crl)) {
671       noitL(noit_error, "Failed to load CRL from %s\n", crl);
672       eventer_ssl_ctx_free(sslctx);
673       goto connect_error;
674     }
675   }
676
677   memcpy(&nctx->last_connect, now, sizeof(*now));
678   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
679                              nctx->sslconfig);
680   EVENTER_ATTACH_SSL(e, sslctx);
681   e->callback = noit_connection_ssl_upgrade;
682   STRATCON_NOIT_CONNECT_SUCCESS(e->fd, (char *)feedtype, nctx->remote_str,
683                                 (char *)cn_expected);
684   return e->callback(e, mask, closure, now);
685 }
686 static void
687 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
688   struct timeval __now;
689   const char *cn_expected, *feedtype;
690   eventer_t e;
691   int rv, fd = -1;
692 #ifdef SO_KEEPALIVE
693   int optval;
694   socklen_t optlen = sizeof(optval);
695 #endif
696
697   GET_EXPECTED_CN(nctx, cn_expected);
698   GET_FEEDTYPE(nctx, feedtype);
699   nctx->e = NULL;
700   if(nctx->wants_permanent_shutdown) {
701     STRATCON_NOIT_SHUTDOWN_PERMANENT(-1, (char *)feedtype,
702                                      nctx->remote_str, (char *)cn_expected);
703     noit_connection_ctx_dealloc(nctx);
704     return;
705   }
706   /* Open a socket */
707   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
708   if(fd < 0) goto reschedule;
709
710   /* Make it non-blocking */
711   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
712 #define set_or_bail(type, opt, val) do { \
713   optval = val; \
714   optlen = sizeof(optval); \
715   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
716     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
717           strerror(errno)); \
718     goto reschedule; \
719   } \
720 } while(0)
721 #ifdef SO_KEEPALIVE
722   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
723 #endif
724 #ifdef TCP_KEEPALIVE_THRESHOLD
725   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
726 #endif
727 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
728   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
729 #endif
730 #ifdef TCP_CONN_NOTIFY_THRESHOLD
731   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
732 #endif
733 #ifdef TCP_CONN_ABORT_THRESHOLD
734   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
735 #endif
736
737   /* Initiate a connection */
738   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
739   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
740
741   /* Register a handler for connection completion */
742   e = eventer_alloc();
743   e->fd = fd;
744   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
745   e->callback = noit_connection_complete_connect;
746   e->closure = nctx;
747   nctx->e = e;
748   eventer_add(e);
749
750   STRATCON_NOIT_CONNECT(e->fd, (char *)feedtype, nctx->remote_str,
751                         (char *)cn_expected);
752   noit_connection_update_timeout(nctx);
753   return;
754
755  reschedule:
756   if(fd >= 0) close(fd);
757   gettimeofday(&__now, NULL);
758   noit_connection_schedule_reattempt(nctx, &__now);
759   return;
760 }
761
762 int
763 noit_connection_update_timeout(noit_connection_ctx_t *nctx) {
764   struct timeval now, diff;
765   if(nctx->max_silence == 0) return 0;
766
767   diff.tv_sec = nctx->max_silence / 1000;
768   diff.tv_usec = (nctx->max_silence % 1000) * 1000;
769   gettimeofday(&now, NULL);
770
771   if(!nctx->timeout_event) {
772     nctx->timeout_event = eventer_alloc();
773     nctx->timeout_event->mask = EVENTER_TIMER;
774     nctx->timeout_event->closure = nctx;
775     nctx->timeout_event->callback = noit_connection_session_timeout;
776     add_timeval(now, diff, &nctx->timeout_event->whence);
777     eventer_add(nctx->timeout_event);
778   }
779   else {
780     add_timeval(now, diff, &nctx->timeout_event->whence);
781     eventer_update(nctx->timeout_event, EVENTER_TIMER);
782   }
783   return 0;
784 }
785
786 int
787 noit_connection_disable_timeout(noit_connection_ctx_t *nctx) {
788   if(nctx->timeout_event) {
789     eventer_remove(nctx->timeout_event);
790     eventer_free(nctx->timeout_event);
791     nctx->timeout_event = NULL;
792   }
793   return 0;
794 }
795
796 int
797 initiate_noit_connection(const char *host, unsigned short port,
798                          noit_hash_table *sslconfig, noit_hash_table *config,
799                          eventer_func_t handler, void *closure,
800                          void (*freefunc)(void *)) {
801   noit_connection_ctx_t *ctx;
802   const char *stimeout;
803   int8_t family;
804   int rv;
805   union {
806     struct in_addr addr4;
807     struct in6_addr addr6;
808   } a;
809
810   if(host[0] == '/') {
811     family = AF_UNIX;
812   }
813   else {
814     family = AF_INET;
815     rv = inet_pton(family, host, &a);
816     if(rv != 1) {
817       family = AF_INET6;
818       rv = inet_pton(family, host, &a);
819       if(rv != 1) {
820         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
821         return -1;
822       }
823     }
824   }
825
826   ctx = noit_connection_ctx_alloc();
827   ctx->remote_str = calloc(1, strlen(host) + 7);
828   snprintf(ctx->remote_str, strlen(host) + 7,
829            "%s:%d", host, port);
830  
831   memset(&ctx->r, 0, sizeof(ctx->r));
832   if(family == AF_UNIX) {
833     struct sockaddr_un *s = &ctx->r.remote_un;
834     s->sun_family = AF_UNIX;
835     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
836     ctx->remote_len = sizeof(*s);
837   }
838   else if(family == AF_INET) {
839     struct sockaddr_in *s = &ctx->r.remote_in;
840     s->sin_family = family;
841     s->sin_port = htons(port);
842     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
843     ctx->remote_len = sizeof(*s);
844   }
845   else {
846     struct sockaddr_in6 *s = &ctx->r.remote_in6;
847     s->sin6_family = family;
848     s->sin6_port = htons(port);
849     memcpy(&s->sin6_addr, &a, sizeof(a));
850     ctx->remote_len = sizeof(*s);
851   }
852
853   if(ctx->sslconfig)
854     noit_hash_delete_all(ctx->sslconfig, free, free);
855   else
856     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
857   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
858   if(ctx->config)
859     noit_hash_delete_all(ctx->config, free, free);
860   else
861     ctx->config = calloc(1, sizeof(noit_hash_table));
862   noit_hash_merge_as_dict(ctx->config, config);
863
864   if(noit_hash_retr_str(ctx->config, "timeout", strlen("timeout"), &stimeout))
865     ctx->max_silence = atoi(stimeout);
866   else
867     ctx->max_silence = DEFAULT_NOIT_CONNECTION_TIMEOUT;
868   ctx->consumer_callback = handler;
869   ctx->consumer_free = freefunc;
870   ctx->consumer_ctx = closure;
871   noit_connection_initiate_connection(ctx);
872   return 0;
873 }
874
875 void
876 stratcon_streamer_connection(const char *toplevel, const char *destination,
877                              eventer_func_t handler,
878                              void *(*handler_alloc)(void), void *handler_ctx,
879                              void (*handler_free)(void *)) {
880   int i, cnt = 0;
881   noit_conf_section_t *noit_configs;
882   char path[256];
883
884   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
885   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
886   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
887   for(i=0; i<cnt; i++) {
888     char address[256];
889     unsigned short port;
890     int portint;
891     noit_hash_table *sslconfig, *config;
892
893     if(!noit_conf_get_stringbuf(noit_configs[i],
894                                 "ancestor-or-self::node()/@address",
895                                 address, sizeof(address))) {
896       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
897       continue;
898     }
899     /* if destination is specified, exact match it */
900     if(destination && strcmp(address, destination)) continue;
901
902     if(!noit_conf_get_int(noit_configs[i],
903                           "ancestor-or-self::node()/@port", &portint))
904       portint = 0;
905     port = (unsigned short) portint;
906     if(address[0] != '/' && (portint == 0 || (port != portint))) {
907       /* UNIX sockets don't require a port (they'll ignore it if specified */
908       noitL(noit_stderr,
909             "Invalid port [%d] specified in stanza %d\n", port, i+1);
910       continue;
911     }
912     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
913     config = noit_conf_get_hash(noit_configs[i], "config");
914
915     noitL(noit_error, "initiating to %s\n", address);
916     initiate_noit_connection(address, port, sslconfig, config,
917                              handler,
918                              handler_alloc ? handler_alloc() : handler_ctx,
919                              handler_free);
920     noit_hash_destroy(sslconfig,free,free);
921     free(sslconfig);
922     noit_hash_destroy(config,free,free);
923     free(config);
924   }
925   free(noit_configs);
926 }
927 int
928 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
929   int rv = -1;
930   void *vip;
931   pthread_mutex_lock(&noit_ip_by_cn_lock);
932   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
933     int new_len;
934     char *new_ip = (char *)vip;
935     new_len = strlen(new_ip);
936     strlcpy(ip, new_ip, len);
937     if(new_len >= len) rv = new_len+1;
938     else rv = 0;
939   }
940   pthread_mutex_unlock(&noit_ip_by_cn_lock);
941   return rv;
942 }
943 void
944 stratcon_jlog_streamer_recache_noit() {
945   int di, cnt;
946   noit_conf_section_t *noit_configs;
947   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
948   pthread_mutex_lock(&noit_ip_by_cn_lock);
949   noit_hash_delete_all(&noit_ip_by_cn, free, free);
950   for(di=0; di<cnt; di++) {
951     char address[64];
952     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
953                                  address, sizeof(address))) {
954       char expected_cn[256];
955       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
956                                  expected_cn, sizeof(expected_cn)))
957         noit_hash_store(&noit_ip_by_cn,
958                         strdup(expected_cn), strlen(expected_cn),
959                         strdup(address));
960     }
961   }
962   free(noit_configs);
963   pthread_mutex_unlock(&noit_ip_by_cn_lock);
964 }
965 void
966 stratcon_jlog_streamer_reload(const char *toplevel) {
967   /* flush and repopulate the cn cache */
968   stratcon_jlog_streamer_recache_noit();
969   if(!stratcon_datastore_get_enabled()) return;
970   stratcon_streamer_connection(toplevel, NULL,
971                                stratcon_jlog_recv_handler,
972                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
973                                NULL,
974                                jlog_streamer_ctx_free);
975 }
976
977 char *
978 stratcon_console_noit_opts(noit_console_closure_t ncct,
979                            noit_console_state_stack_t *stack,
980                            noit_console_state_t *dstate,
981                            int argc, char **argv, int idx) {
982   if(argc == 1) {
983     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
984     const char *key_id;
985     int klen, i = 0;
986     void *vconn, *vcn;
987     noit_connection_ctx_t *ctx;
988     noit_hash_table dedup = NOIT_HASH_EMPTY;
989
990     pthread_mutex_lock(&noits_lock);
991     while(noit_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
992       ctx = (noit_connection_ctx_t *)vconn;
993       vcn = NULL;
994       if(ctx->config && noit_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
995          !noit_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
996         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
997           if(idx == i) {
998             pthread_mutex_unlock(&noits_lock);
999             noit_hash_destroy(&dedup, NULL, NULL);
1000             return strdup(vcn);
1001           }
1002           i++;
1003         }
1004       }
1005       if(ctx->remote_str &&
1006          !noit_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
1007         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
1008           if(idx == i) {
1009             pthread_mutex_unlock(&noits_lock);
1010             noit_hash_destroy(&dedup, NULL, NULL);
1011             return strdup(ctx->remote_str);
1012           }
1013           i++;
1014         }
1015       }
1016     }
1017     pthread_mutex_unlock(&noits_lock);
1018     noit_hash_destroy(&dedup, NULL, NULL);
1019   }
1020   if(argc == 2)
1021     return noit_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
1022   return NULL;
1023 }
1024 static int
1025 stratcon_console_show_noits(noit_console_closure_t ncct,
1026                             int argc, char **argv,
1027                             noit_console_state_t *dstate,
1028                             void *closure) {
1029   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1030   const char *key_id, *ecn;
1031   int klen, n = 0, i;
1032   void *vconn;
1033   noit_connection_ctx_t **ctx;
1034
1035   if(closure != (void *)0 && argc == 0) {
1036     nc_printf(ncct, "takes an argument\n");
1037     return 0;
1038   }
1039   if(closure == (void *)0 && argc > 0) {
1040     nc_printf(ncct, "takes no arguments\n");
1041     return 0;
1042   }
1043   pthread_mutex_lock(&noits_lock);
1044   ctx = malloc(sizeof(*ctx) * noits.size);
1045   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1046                        &vconn)) {
1047     ctx[n] = (noit_connection_ctx_t *)vconn;
1048     if(argc == 0 ||
1049        !strcmp(ctx[n]->remote_str, argv[0]) ||
1050        (ctx[n]->config && noit_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
1051         !strcmp(ecn, argv[0]))) {
1052       noit_atomic_inc32(&ctx[n]->refcnt);
1053       n++;
1054     }
1055   }
1056   pthread_mutex_unlock(&noits_lock);
1057   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
1058   for(i=0; i<n; i++) {
1059     nc_print_noit_conn_brief(ncct, ctx[i]);
1060     noit_connection_ctx_deref(ctx[i]);
1061   }
1062   free(ctx);
1063   return 0;
1064 }
1065
1066 static void
1067 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
1068                        noit_connection_ctx_t *nctx) {
1069   struct timeval last, session_duration, diff;
1070   u_int64_t session_duration_ms, last_event_ms;
1071   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
1072   char str[1024], *wr;
1073   int len;
1074   const char *cn_expected;
1075   const char *feedtype = "unknown";
1076
1077   GET_FEEDTYPE(nctx, feedtype);
1078   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
1079
1080   GET_EXPECTED_CN(nctx, cn_expected);
1081   if(!cn_expected) return;
1082
1083   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
1084            (long unsigned int)now->tv_sec,
1085            (long unsigned int)now->tv_usec/1000UL,
1086            uuid_str, cn_expected, feedtype);
1087   wr = str + strlen(str);
1088   len = sizeof(str) - (wr - str);
1089
1090   /* Now we write NAME TYPE VALUE into wr each time and push it */
1091 #define push_noit_m_str(name, value) do { \
1092   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
1093   stratcon_datastore_push(DS_OP_INSERT, \
1094                           (struct sockaddr *)&self_stratcon_ip, \
1095                           self_stratcon_hostname, strdup(str), NULL); \
1096   stratcon_iep_line_processor(DS_OP_INSERT, \
1097                               (struct sockaddr *)&self_stratcon_ip, \
1098                               self_stratcon_hostname, strdup(str), NULL); \
1099 } while(0)
1100 #define push_noit_m_u64(name, value) do { \
1101   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
1102   stratcon_datastore_push(DS_OP_INSERT, \
1103                           (struct sockaddr *)&self_stratcon_ip, \
1104                           self_stratcon_hostname, strdup(str), NULL); \
1105   stratcon_iep_line_processor(DS_OP_INSERT, \
1106                               (struct sockaddr *)&self_stratcon_ip, \
1107                               self_stratcon_hostname, strdup(str), NULL); \
1108 } while(0)
1109
1110   last.tv_sec = jctx->header.tv_sec;
1111   last.tv_usec = jctx->header.tv_usec;
1112   sub_timeval(*now, last, &diff);
1113   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
1114   sub_timeval(*now, nctx->last_connect, &session_duration);
1115   session_duration_ms = session_duration.tv_sec * 1000 +
1116                         session_duration.tv_usec / 1000;
1117
1118   push_noit_m_str("state", nctx->remote_cn ? "connected" :
1119                              (nctx->retry_event ? "disconnected" :
1120                                                   "connecting"));
1121   push_noit_m_u64("last_event_age_ms", last_event_ms);
1122   push_noit_m_u64("session_length_ms", session_duration_ms);
1123 }
1124 static int
1125 periodic_noit_metrics(eventer_t e, int mask, void *closure,
1126                       struct timeval *now) {
1127   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1128   noit_connection_ctx_t **ctxs;
1129   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1130   const char *key_id;
1131   void *vconn;
1132   int klen, n = 0, i;
1133   char str[1024];
1134   char uuid_str[UUID_STR_LEN+1];
1135
1136   uuid_unparse_lower(self_stratcon_id, uuid_str);
1137
1138   if(closure == NULL) {
1139     /* Only do this the first time we get called */
1140     char ip_str[128];
1141     inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
1142               sizeof(ip_str));
1143     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
1144              (long unsigned int)now->tv_sec,
1145              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
1146              self_stratcon_hostname);
1147     stratcon_datastore_push(DS_OP_INSERT,
1148                             (struct sockaddr *)&self_stratcon_ip,
1149                             self_stratcon_hostname, strdup(str), NULL);
1150     stratcon_iep_line_processor(DS_OP_INSERT,
1151                                 (struct sockaddr *)&self_stratcon_ip,
1152                                 self_stratcon_hostname, strdup(str), NULL);
1153   }
1154
1155   pthread_mutex_lock(&noits_lock);
1156   ctxs = malloc(sizeof(*ctxs) * noits.size);
1157   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1158                        &vconn)) {
1159     ctxs[n] = (noit_connection_ctx_t *)vconn;
1160     noit_atomic_inc32(&ctxs[n]->refcnt);
1161     n++;
1162   }
1163   pthread_mutex_unlock(&noits_lock);
1164
1165   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok\n",
1166            (long unsigned int)now->tv_sec,
1167            (long unsigned int)now->tv_usec/1000UL, uuid_str);
1168   stratcon_datastore_push(DS_OP_INSERT,
1169                           (struct sockaddr *)&self_stratcon_ip,
1170                           self_stratcon_hostname, strdup(str), NULL);
1171   stratcon_iep_line_processor(DS_OP_INSERT, \
1172                               (struct sockaddr *)&self_stratcon_ip, \
1173                               self_stratcon_hostname, strdup(str), NULL); \
1174   for(i=0; i<n; i++) {
1175     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
1176     noit_connection_ctx_deref(ctxs[i]);
1177   }
1178   free(ctxs);
1179   stratcon_datastore_push(DS_OP_CHKPT,
1180                           (struct sockaddr *)&self_stratcon_ip,
1181                           self_stratcon_hostname, NULL, NULL);
1182   stratcon_iep_line_processor(DS_OP_CHKPT, \
1183                               (struct sockaddr *)&self_stratcon_ip, \
1184                               self_stratcon_hostname, NULL, NULL); \
1185
1186   add_timeval(e->whence, whence, &whence);
1187   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
1188   return 0;
1189 }
1190
1191 static int
1192 rest_show_noits(noit_http_rest_closure_t *restc,
1193                 int npats, char **pats) {
1194   xmlDocPtr doc;
1195   xmlNodePtr root;
1196   noit_hash_table seen = NOIT_HASH_EMPTY;
1197   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1198   char path[256];
1199   const char *key_id;
1200   const char *type = NULL, *want_cn = NULL;
1201   int klen, n = 0, i, di, cnt;
1202   void *vconn;
1203   noit_connection_ctx_t **ctxs;
1204   noit_conf_section_t *noit_configs;
1205   struct timeval now, diff, last;
1206   xmlNodePtr node;
1207   noit_http_request *req = noit_http_session_request(restc->http_ctx);
1208
1209   noit_http_process_querystring(req);
1210   type = noit_http_request_querystring(req, "type");
1211   want_cn = noit_http_request_querystring(req, "cn");
1212
1213   gettimeofday(&now, NULL);
1214
1215   pthread_mutex_lock(&noits_lock);
1216   ctxs = malloc(sizeof(*ctxs) * noits.size);
1217   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1218                        &vconn)) {
1219     ctxs[n] = (noit_connection_ctx_t *)vconn;
1220     noit_atomic_inc32(&ctxs[n]->refcnt);
1221     n++;
1222   }
1223   pthread_mutex_unlock(&noits_lock);
1224   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
1225
1226   doc = xmlNewDoc((xmlChar *)"1.0");
1227   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
1228   xmlDocSetRootElement(doc, root);
1229
1230   for(i=0; i<n; i++) {
1231     char buff[256];
1232     const char *feedtype = "unknown", *state = "unknown";
1233     noit_connection_ctx_t *ctx = ctxs[i];
1234     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
1235
1236     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1237
1238     /* If the user requested a specific type and we're not it, skip. */
1239     if(type && strcmp(feedtype, type)) continue;
1240     /* If the user wants a specific CN... limit to that. */
1241     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn)))
1242       continue;
1243
1244     node = xmlNewNode(NULL, (xmlChar *)"noit");
1245     snprintf(buff, sizeof(buff), "%llu.%06d",
1246              (long long unsigned)ctx->last_connect.tv_sec,
1247              (int)ctx->last_connect.tv_usec);
1248     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1249     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1250                (xmlChar *)"connected" :
1251                (ctx->retry_event ? (xmlChar *)"disconnected" :
1252                                     (xmlChar *)"connecting"));
1253     if(ctx->e) {
1254       char buff[128];
1255       const char *addrstr = NULL;
1256       struct sockaddr_in6 addr6;
1257       socklen_t len = sizeof(addr6);
1258       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1259         unsigned short port = 0;
1260         if(addr6.sin6_family == AF_INET) {
1261           addrstr = inet_ntop(addr6.sin6_family,
1262                               &((struct sockaddr_in *)&addr6)->sin_addr,
1263                               buff, sizeof(buff));
1264           memcpy(&port, &(&addr6)->sin6_port, sizeof(port));
1265           port = ntohs(port);
1266         }
1267         else if(addr6.sin6_family == AF_INET6) {
1268           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1269                               buff, sizeof(buff));
1270           port = ntohs(addr6.sin6_port);
1271         }
1272         if(addrstr != NULL) {
1273           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1274                    ":%u", port);
1275           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1276         }
1277       }
1278     }
1279     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1280                       0, free, NULL);
1281     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1282     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1283     if(ctx->retry_event) {
1284       sub_timeval(ctx->retry_event->whence, now, &diff);
1285       snprintf(buff, sizeof(buff), "%llu.%06d",
1286                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1287       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1288     }
1289     else if(ctx->remote_cn) {
1290       if(ctx->remote_cn)
1291         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1292  
1293       switch(jctx->state) {
1294         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1295         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1296         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1297         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1298         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1299         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1300         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1301       }
1302       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1303       snprintf(buff, sizeof(buff), "%08x:%08x",
1304                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1305       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1306       snprintf(buff, sizeof(buff), "%llu",
1307                (unsigned long long)jctx->total_events);
1308       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1309       snprintf(buff, sizeof(buff), "%llu",
1310                (unsigned long long)jctx->total_bytes_read);
1311       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1312  
1313       sub_timeval(now, ctx->last_connect, &diff);
1314       snprintf(buff, sizeof(buff), "%lld.%06d",
1315                (long long)diff.tv_sec, (int)diff.tv_usec);
1316       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1317  
1318       if(jctx->header.tv_sec) {
1319         last.tv_sec = jctx->header.tv_sec;
1320         last.tv_usec = jctx->header.tv_usec;
1321         snprintf(buff, sizeof(buff), "%llu.%06d",
1322                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1323         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1324         sub_timeval(now, last, &diff);
1325         snprintf(buff, sizeof(buff), "%lld.%06d",
1326                  (long long)diff.tv_sec, (int)diff.tv_usec);
1327         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1328       }
1329     }
1330
1331     xmlAddChild(root, node);
1332     noit_connection_ctx_deref(ctx);
1333   }
1334   free(ctxs);
1335
1336   if(!type || !strcmp(type, "configured")) {
1337     snprintf(path, sizeof(path), "//noits//noit");
1338     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1339     for(di=0; di<cnt; di++) {
1340       char address[64], port_str[32], remote_str[98];
1341       char expected_cn_buff[256], *expected_cn = NULL;
1342       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1343                                  expected_cn_buff, sizeof(expected_cn_buff)))
1344         expected_cn = expected_cn_buff;
1345       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1346       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1347                                  address, sizeof(address))) {
1348         void *v;
1349         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1350                                    port_str, sizeof(port_str)))
1351           strlcpy(port_str, "43191", sizeof(port_str));
1352
1353         /* If the user wants a specific CN... limit to that. */
1354           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1355             continue;
1356
1357         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1358         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1359           node = xmlNewNode(NULL, (xmlChar *)"noit");
1360           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1361           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1362           if(expected_cn)
1363             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1364           xmlAddChild(root, node);
1365         }
1366       }
1367     }
1368     free(noit_configs);
1369   }
1370   noit_hash_destroy(&seen, free, NULL);
1371
1372   noit_http_response_ok(restc->http_ctx, "text/xml");
1373   noit_http_response_xml(restc->http_ctx, doc);
1374   noit_http_response_end(restc->http_ctx);
1375   xmlFreeDoc(doc);
1376   return 0;
1377 }
1378 static int
1379 stratcon_add_noit(const char *target, unsigned short port,
1380                   const char *cn) {
1381   int cnt;
1382   char path[256];
1383   char port_str[6];
1384   noit_conf_section_t *noit_configs, parent;
1385   xmlNodePtr newnoit, config, cnnode;
1386
1387   snprintf(path, sizeof(path),
1388            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1389   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1390   free(noit_configs);
1391   if(cnt != 0) return -1;
1392
1393   parent = noit_conf_get_section(NULL, "//noits");
1394   if(!parent) return -1;
1395   snprintf(port_str, sizeof(port_str), "%d", port);
1396   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1397   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1398   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1399   xmlAddChild(parent, newnoit);
1400   if(cn) {
1401     config = xmlNewNode(NULL, (xmlChar *)"config");
1402     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1403     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1404     xmlAddChild(config, cnnode);
1405     xmlAddChild(newnoit, config);
1406     pthread_mutex_lock(&noit_ip_by_cn_lock);
1407     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1408                       strdup(target), free, free);
1409     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1410   }
1411   if(stratcon_datastore_get_enabled())
1412     stratcon_streamer_connection(NULL, target,
1413                                  stratcon_jlog_recv_handler,
1414                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1415                                  NULL,
1416                                  jlog_streamer_ctx_free);
1417   if(stratcon_iep_get_enabled())
1418     stratcon_streamer_connection(NULL, target,
1419                                  stratcon_jlog_recv_handler,
1420                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1421                                  NULL,
1422                                  jlog_streamer_ctx_free);
1423   return 1;
1424 }
1425 static int
1426 stratcon_remove_noit(const char *target, unsigned short port) {
1427   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1428   const char *key_id;
1429   int klen, n = -1, i, cnt = 0;
1430   void *vconn;
1431   noit_connection_ctx_t **ctx;
1432   noit_conf_section_t *noit_configs;
1433   char path[256];
1434   char remote_str[256];
1435
1436   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1437
1438   snprintf(path, sizeof(path),
1439            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1440   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1441   for(i=0; i<cnt; i++) {
1442     char expected_cn[256];
1443     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1444                                expected_cn, sizeof(expected_cn))) {
1445       pthread_mutex_lock(&noit_ip_by_cn_lock);
1446       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1447                        free, free);
1448       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1449     }
1450     xmlUnlinkNode(noit_configs[i]);
1451     xmlFreeNode(noit_configs[i]);
1452     n = 0;
1453   }
1454   free(noit_configs);
1455
1456   pthread_mutex_lock(&noits_lock);
1457   ctx = malloc(sizeof(*ctx) * noits.size);
1458   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1459                        &vconn)) {
1460     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1461       ctx[n] = (noit_connection_ctx_t *)vconn;
1462       noit_atomic_inc32(&ctx[n]->refcnt);
1463       n++;
1464     }
1465   }
1466   pthread_mutex_unlock(&noits_lock);
1467   for(i=0; i<n; i++) {
1468     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1469     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1470   }
1471   free(ctx);
1472   return n;
1473 }
1474 static int
1475 rest_set_noit(noit_http_rest_closure_t *restc,
1476               int npats, char **pats) {
1477   const char *cn = NULL;
1478   noit_http_session_ctx *ctx = restc->http_ctx;
1479   noit_http_request *req = noit_http_session_request(ctx);
1480   unsigned short port = 43191;
1481   if(npats < 1 || npats > 2)
1482     noit_http_response_server_error(ctx, "text/xml");
1483   if(npats == 2) port = atoi(pats[1]);
1484   noit_http_process_querystring(req);
1485   cn = noit_http_request_querystring(req, "cn");
1486   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1487     noit_http_response_ok(ctx, "text/xml");
1488   else
1489     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1490   if(noit_conf_write_file(NULL) != 0)
1491     noitL(noit_error, "local config write failed\n");
1492   noit_conf_mark_changed();
1493   noit_http_response_end(ctx);
1494   return 0;
1495 }
1496 static int
1497 rest_delete_noit(noit_http_rest_closure_t *restc,
1498                  int npats, char **pats) {
1499   noit_http_session_ctx *ctx = restc->http_ctx;
1500   unsigned short port = 43191;
1501   if(npats < 1 || npats > 2)
1502     noit_http_response_server_error(ctx, "text/xml");
1503   if(npats == 2) port = atoi(pats[1]);
1504   if(stratcon_remove_noit(pats[0], port) >= 0)
1505     noit_http_response_ok(ctx, "text/xml");
1506   else
1507     noit_http_response_not_found(ctx, "text/xml");
1508   if(noit_conf_write_file(NULL) != 0)
1509     noitL(noit_error, "local config write failed\n");
1510   noit_conf_mark_changed();
1511   noit_http_response_end(ctx);
1512   return 0;
1513 }
1514 static int
1515 stratcon_console_conf_noits(noit_console_closure_t ncct,
1516                             int argc, char **argv,
1517                             noit_console_state_t *dstate,
1518                             void *closure) {
1519   char *cp, target[128];
1520   unsigned short port = 43191;
1521   int adding = (int)(vpsized_int)closure;
1522   if(argc != 1)
1523     return -1;
1524
1525   cp = strchr(argv[0], ':');
1526   if(cp) {
1527     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1528     port = atoi(cp+1);
1529   }
1530   else strlcpy(target, argv[0], sizeof(target));
1531   if(adding) {
1532     if(stratcon_add_noit(target, port, NULL) >= 0) {
1533       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1534     }
1535     else {
1536       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1537     }
1538   }
1539   else {
1540     if(stratcon_remove_noit(target, port) >= 0) {
1541       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1542     }
1543     else {
1544       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1545     }
1546   }
1547   return 0;
1548 }
1549
1550 static void
1551 register_console_streamer_commands() {
1552   noit_console_state_t *tl;
1553   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1554
1555   tl = noit_console_state_initial();
1556   showcmd = noit_console_state_get_cmd(tl, "show");
1557   assert(showcmd && showcmd->dstate);
1558   confcmd = noit_console_state_get_cmd(tl, "configure");
1559   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1560   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1561
1562   noit_console_state_add_cmd(conftcmd->dstate,
1563     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1564   noit_console_state_add_cmd(conftnocmd->dstate,
1565     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1566
1567   noit_console_state_add_cmd(showcmd->dstate,
1568     NCSCMD("noit", stratcon_console_show_noits,
1569            stratcon_console_noit_opts, NULL, (void *)1));
1570   noit_console_state_add_cmd(showcmd->dstate,
1571     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1572 }
1573
1574 void
1575 stratcon_jlog_streamer_init(const char *toplevel) {
1576   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1577   struct in_addr remote;
1578   char uuid_str[UUID_STR_LEN + 1];
1579
1580   pthread_mutex_init(&noits_lock, NULL);
1581   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1582   eventer_name_callback("noit_connection_reinitiate",
1583                         noit_connection_reinitiate);
1584   eventer_name_callback("stratcon_jlog_recv_handler",
1585                         stratcon_jlog_recv_handler);
1586   eventer_name_callback("noit_connection_ssl_upgrade",
1587                         noit_connection_ssl_upgrade);
1588   eventer_name_callback("noit_connection_complete_connect",
1589                         noit_connection_complete_connect);
1590   eventer_name_callback("noit_connection_session_timeout",
1591                         noit_connection_session_timeout);
1592   register_console_streamer_commands();
1593   stratcon_jlog_streamer_reload(toplevel);
1594   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1595   assert(noit_http_rest_register_auth(
1596     "GET", "/noits/", "^show$", rest_show_noits,
1597              noit_http_rest_client_cert_auth
1598   ) == 0);
1599   assert(noit_http_rest_register_auth(
1600     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1601              noit_http_rest_client_cert_auth
1602   ) == 0);
1603   assert(noit_http_rest_register_auth(
1604     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1605              noit_http_rest_client_cert_auth
1606   ) == 0);
1607   assert(noit_http_rest_register_auth(
1608     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1609              noit_http_rest_client_cert_auth
1610   ) == 0);
1611   assert(noit_http_rest_register_auth(
1612     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1613              noit_http_rest_client_cert_auth
1614   ) == 0);
1615
1616   uuid_clear(self_stratcon_id);
1617
1618   if(noit_conf_get_stringbuf(NULL, "/stratcon/@id",
1619                              uuid_str, sizeof(uuid_str)) &&
1620      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1621     int period;
1622     /* If a UUID was provided for stratcon itself, we will report metrics
1623      * on a large variety of things (including all noits).
1624      */
1625     if(noit_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1626        period > 0) {
1627       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1628       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1629     }
1630     self_stratcon_ip.sin_family = AF_INET;
1631     remote.s_addr = 0xffffffff;
1632     noit_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1633     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1634     eventer_add_in(periodic_noit_metrics, NULL, whence);
1635   }
1636 }
1637
Note: See TracBrowser for help on using the browser.