root/src/stratcon_jlog_streamer.c

Revision 1921baa5ef388809bbacc4367cceb5f71ef0612c, 55.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

thanks clang. noted.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "utils/noit_getip.h"
39 #include "noit_jlog_listener.h"
40 #include "noit_rest.h"
41 #include "stratcon_datastore.h"
42 #include "stratcon_jlog_streamer.h"
43 #include "stratcon_iep.h"
44
45 #include <unistd.h>
46 #include <assert.h>
47 #include <errno.h>
48 #include <sys/types.h>
49 #include <sys/socket.h>
50 #ifdef HAVE_SYS_FILIO_H
51 #include <sys/filio.h>
52 #endif
53 #include <netinet/in.h>
54 #include <sys/un.h>
55 #include <arpa/inet.h>
56
57 pthread_mutex_t noits_lock;
58 noit_hash_table noits = NOIT_HASH_EMPTY;
59 pthread_mutex_t noit_ip_by_cn_lock;
60 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
61 static uuid_t self_stratcon_id;
62 static char self_stratcon_hostname[256] = "\0";
63 static struct sockaddr_in self_stratcon_ip;
64
65 static struct timeval DEFAULT_NOIT_PERIOD_TV = { 5UL, 0UL };
66
67 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
68
69 static const char *feed_type_to_str(int jlog_feed_cmd) {
70   switch(jlog_feed_cmd) {
71     case NOIT_JLOG_DATA_FEED: return "durable/storage";
72     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
73   }
74   return "unknown";
75 }
76
77 static int
78 remote_str_sort(const void *a, const void *b) {
79   int rv;
80   noit_connection_ctx_t * const *actx = a;
81   noit_connection_ctx_t * const *bctx = b;
82   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
83   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
84   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
85   if(rv) return rv;
86   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
87            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
88 }
89 static void
90 nc_print_noit_conn_brief(noit_console_closure_t ncct,
91                           noit_connection_ctx_t *ctx) {
92   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
93   struct timeval now, diff, session_duration;
94   const char *feedtype = "unknown";
95   const char *lasttime = "never";
96   if(ctx->last_connect.tv_sec != 0) {
97     char cmdbuf[4096];
98     time_t r = ctx->last_connect.tv_sec;
99     struct tm tbuf, *tm;
100     tm = gmtime_r(&r, &tbuf);
101     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
102     lasttime = cmdbuf;
103   }
104   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
105             ctx->remote_cn ? "connected" :
106                              (ctx->retry_event ? "disconnected" :
107                                                    "connecting"), lasttime);
108   if(ctx->e) {
109     char buff[128];
110     const char *addrstr = NULL;
111     struct sockaddr_in6 addr6;
112     socklen_t len = sizeof(addr6);
113     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
114       unsigned short port = 0;
115       if(addr6.sin6_family == AF_INET) {
116         addrstr = inet_ntop(addr6.sin6_family,
117                             &((struct sockaddr_in *)&addr6)->sin_addr,
118                             buff, sizeof(buff));
119         memcpy(&port, &((struct sockaddr_in *)&addr6)->sin_port, sizeof(port));
120         port = ntohs(port);
121       }
122       else if(addr6.sin6_family == AF_INET6) {
123         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
124                             buff, sizeof(buff));
125         port = ntohs(addr6.sin6_port);
126       }
127       if(addrstr != NULL)
128         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
129       else
130         nc_printf(ncct, "\tLocal address not interpretable\n");
131     }
132     else {
133       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
134                 ctx->e->fd, strerror(errno));
135     }
136   }
137   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
138   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
139   gettimeofday(&now, NULL);
140   if(ctx->retry_event) {
141     sub_timeval(ctx->retry_event->whence, now, &diff);
142     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
143               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
144   }
145   else if(ctx->remote_cn) {
146     nc_printf(ncct, "\tRemote CN: '%s'\n",
147               ctx->remote_cn ? ctx->remote_cn : "???");
148     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
149       struct timeval last;
150       double session_duration_seconds;
151       const char *state = "unknown";
152
153       switch(jctx->state) {
154         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
155         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
156         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
157         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
158         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
159         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
160         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
161       }
162       last.tv_sec = jctx->header.tv_sec;
163       last.tv_usec = jctx->header.tv_usec;
164       sub_timeval(now, last, &diff);
165       sub_timeval(now, ctx->last_connect, &session_duration);
166       session_duration_seconds = session_duration.tv_sec +
167                                  (double)session_duration.tv_usec/1000000.0;
168       nc_printf(ncct, "\tState: %s\n"
169                       "\tNext checkpoint: [%08x:%08x]\n"
170                       "\tLast event: %lld.%06us ago\n"
171                       "\tEvents this session: %llu (%0.2f/s)\n"
172                       "\tOctets this session: %llu (%0.2f/s)\n",
173                 state,
174                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
175                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
176                 jctx->total_events,
177                 (double)jctx->total_events/session_duration_seconds,
178                 jctx->total_bytes_read,
179                 (double)jctx->total_bytes_read/session_duration_seconds);
180     }
181     else {
182       nc_printf(ncct, "\tUnknown type.\n");
183     }
184   }
185 }
186
187 jlog_streamer_ctx_t *
188 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
189   jlog_streamer_ctx_t *ctx;
190   ctx = stratcon_jlog_streamer_ctx_alloc();
191   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
192   ctx->push = stratcon_datastore_push;
193   return ctx;
194 }
195 jlog_streamer_ctx_t *
196 stratcon_jlog_streamer_ctx_alloc(void) {
197   jlog_streamer_ctx_t *ctx;
198   ctx = calloc(1, sizeof(*ctx));
199   return ctx;
200 }
201 noit_connection_ctx_t *
202 noit_connection_ctx_alloc(void) {
203   noit_connection_ctx_t *ctx, **pctx;
204   ctx = calloc(1, sizeof(*ctx));
205   ctx->refcnt = 1;
206   pctx = malloc(sizeof(*pctx));
207   *pctx = ctx;
208   pthread_mutex_lock(&noits_lock);
209   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
210   pthread_mutex_unlock(&noits_lock);
211   return ctx;
212 }
213 int
214 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
215                          struct timeval *now) {
216   noit_connection_ctx_t *ctx = closure;
217   ctx->retry_event = NULL;
218   noit_connection_initiate_connection(closure);
219   return 0;
220 }
221 void
222 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
223                                    struct timeval *now) {
224   struct timeval __now, interval;
225   const char *v;
226   u_int32_t min_interval = 1000, max_interval = 8000;
227
228   noit_connection_disable_timeout(ctx);
229   if(ctx->remote_cn) {
230     free(ctx->remote_cn);
231     ctx->remote_cn = NULL;
232   }
233   if(noit_hash_retr_str(ctx->config,
234                         "reconnect_initial_interval",
235                         strlen("reconnect_initial_interval"),
236                         &v)) {
237     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
238   }
239   if(noit_hash_retr_str(ctx->config,
240                         "reconnect_maximum_interval",
241                         strlen("reconnect_maximum_interval"),
242                         &v)) {
243     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
244   }
245   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
246   else {
247     ctx->current_backoff *= 2;
248     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
249     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
250   }
251   if(!now) {
252     gettimeofday(&__now, NULL);
253     now = &__now;
254   }
255   interval.tv_sec = ctx->current_backoff / 1000;
256   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
257   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
258         ctx->current_backoff);
259   if(ctx->retry_event)
260     eventer_remove(ctx->retry_event);
261   else
262     ctx->retry_event = eventer_alloc();
263   ctx->retry_event->callback = noit_connection_reinitiate;
264   ctx->retry_event->closure = ctx;
265   ctx->retry_event->mask = EVENTER_TIMER;
266   add_timeval(*now, interval, &ctx->retry_event->whence);
267   eventer_add(ctx->retry_event);
268 }
269 static void
270 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
271   if(ctx->remote_cn) free(ctx->remote_cn);
272   if(ctx->remote_str) free(ctx->remote_str);
273   if(ctx->retry_event) {
274     eventer_remove(ctx->retry_event);
275     eventer_free(ctx->retry_event);
276   }
277   if(ctx->timeout_event) {
278     eventer_remove(ctx->timeout_event);
279     eventer_free(ctx->timeout_event);
280   }
281   ctx->consumer_free(ctx->consumer_ctx);
282   free(ctx);
283 }
284 void
285 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
286   if(noit_atomic_dec32(&ctx->refcnt) == 0)
287     noit_connection_ctx_free(ctx);
288 }
289 void
290 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
291   noit_connection_ctx_t **pctx = &ctx;
292   pthread_mutex_lock(&noits_lock);
293   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
294                    free, (void (*)(void *))noit_connection_ctx_deref);
295   pthread_mutex_unlock(&noits_lock);
296 }
297 void
298 jlog_streamer_ctx_free(void *cl) {
299   jlog_streamer_ctx_t *ctx = cl;
300   if(ctx->buffer) free(ctx->buffer);
301   free(ctx);
302 }
303
304 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
305 static int
306 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
307   int len, mask;
308   while(ctx->bytes_read < ctx->bytes_expected) {
309     len = Eread(ctx->buffer + ctx->bytes_read,
310                 ctx->bytes_expected - ctx->bytes_read);
311     if(len < 0) {
312       *newmask = mask;
313       return -1;
314     }
315     /* if we get 0 inside SSL, and there was a real error, we
316      * will actually get a -1 here.
317      * if(len == 0) return ctx->bytes_read;
318      */
319     ctx->total_bytes_read += len;
320     ctx->bytes_read += len;
321   }
322   assert(ctx->bytes_read == ctx->bytes_expected);
323   return ctx->bytes_read;
324 }
325 #define FULLREAD(e,ctx,size) do { \
326   int mask, len; \
327   if(!ctx->bytes_expected) { \
328     ctx->bytes_expected = size; \
329     if(ctx->buffer) free(ctx->buffer); \
330     ctx->buffer = malloc(size + 1); \
331     if(ctx->buffer == NULL) { \
332       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
333       goto socket_error; \
334     } \
335     ctx->buffer[size] = '\0'; \
336   } \
337   len = __read_on_ctx(e, ctx, &mask); \
338   if(len < 0) { \
339     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
340     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
341     goto socket_error; \
342   } \
343   ctx->bytes_read = 0; \
344   ctx->bytes_expected = 0; \
345   if(len != size) { \
346     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
347           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
348     goto socket_error; \
349   } \
350 } while(0)
351
352 int
353 noit_connection_session_timeout(eventer_t e, int mask, void *closure,
354                                 struct timeval *now) {
355   noit_connection_ctx_t *nctx = closure;
356   eventer_t fde = nctx->e;
357   nctx->timeout_event = NULL;
358   noitL(noit_error, "Timing out jlog session: %s\n",
359         nctx->remote_cn ? nctx->remote_cn : "(null)");
360   if(fde)
361     eventer_trigger(fde, EVENTER_EXCEPTION);
362   return 0;
363 }
364 int
365 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
366                            struct timeval *now) {
367   noit_connection_ctx_t *nctx = closure;
368   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
369   jlog_streamer_ctx_t dummy;
370   int len;
371   jlog_id n_chkpt;
372
373   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
374     if(write(e->fd, e, 0) == -1)
375       noitL(noit_error, "socket error: %s\n", strerror(errno));
376  socket_error:
377     ctx->state = JLOG_STREAMER_WANT_INITIATE;
378     ctx->count = 0;
379     ctx->needs_chkpt = 0;
380     ctx->bytes_read = 0;
381     ctx->bytes_expected = 0;
382     if(ctx->buffer) free(ctx->buffer);
383     ctx->buffer = NULL;
384     noit_connection_schedule_reattempt(nctx, now);
385     eventer_remove_fd(e->fd);
386     nctx->e = NULL;
387     e->opset->close(e->fd, &mask, e);
388     return 0;
389   }
390
391   noit_connection_update_timeout(nctx);
392   while(1) {
393     switch(ctx->state) {
394       case JLOG_STREAMER_WANT_INITIATE:
395         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
396                               sizeof(ctx->jlog_feed_cmd),
397                               &mask, e);
398         if(len < 0) {
399           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
400           goto socket_error;
401         }
402         if(len != sizeof(ctx->jlog_feed_cmd)) {
403           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
404                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
405           goto socket_error;
406         }
407         ctx->state = JLOG_STREAMER_WANT_COUNT;
408         break;
409
410       case JLOG_STREAMER_WANT_ERROR:
411         FULLREAD(e, ctx, 0 - ctx->count);
412         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
413               0 - ctx->count, ctx->buffer);
414         free(ctx->buffer); ctx->buffer = NULL;
415         goto socket_error;
416         break;
417
418       case JLOG_STREAMER_WANT_COUNT:
419         FULLREAD(e, ctx, sizeof(u_int32_t));
420         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
421         ctx->count = ntohl(dummy.count);
422         ctx->needs_chkpt = 0;
423         free(ctx->buffer); ctx->buffer = NULL;
424         if(ctx->count < 0)
425           ctx->state = JLOG_STREAMER_WANT_ERROR;
426         else
427           ctx->state = JLOG_STREAMER_WANT_HEADER;
428         break;
429
430       case JLOG_STREAMER_WANT_HEADER:
431         if(ctx->count == 0) {
432           ctx->state = JLOG_STREAMER_WANT_COUNT;
433           break;
434         }
435         FULLREAD(e, ctx, sizeof(ctx->header));
436         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
437         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
438         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
439         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
440         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
441         ctx->header.message_len = ntohl(dummy.header.message_len);
442         free(ctx->buffer); ctx->buffer = NULL;
443         ctx->state = JLOG_STREAMER_WANT_BODY;
444         break;
445
446       case JLOG_STREAMER_WANT_BODY:
447         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
448         if(ctx->header.message_len > 0) {
449           ctx->needs_chkpt = 1;
450           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
451                     ctx->buffer, NULL);
452         }
453         else if(ctx->buffer)
454           free(ctx->buffer);
455         /* Don't free the buffer, it's used by the datastore process. */
456         ctx->buffer = NULL;
457         ctx->count--;
458         ctx->total_events++;
459         if(ctx->count == 0 && ctx->needs_chkpt) {
460           eventer_t completion_e;
461           eventer_remove_fd(e->fd);
462           completion_e = eventer_alloc();
463           memcpy(completion_e, e, sizeof(*e));
464           nctx->e = completion_e;
465           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
466           ctx->state = JLOG_STREAMER_IS_ASYNC;
467           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
468                     NULL, completion_e);
469           noitL(noit_debug, "Pushing %s batch async [%s]: [%u/%u]\n",
470                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
471                 nctx->remote_cn ? nctx->remote_cn : "(null)",
472                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
473           noit_connection_disable_timeout(nctx);
474           return 0;
475         }
476         else if(ctx->count == 0)
477           ctx->state = JLOG_STREAMER_WANT_CHKPT;
478         else
479           ctx->state = JLOG_STREAMER_WANT_HEADER;
480         break;
481
482       case JLOG_STREAMER_IS_ASYNC:
483         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
484       case JLOG_STREAMER_WANT_CHKPT:
485         noitL(noit_debug, "Pushing %s checkpoint [%s]: [%u/%u]\n",
486               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
487               nctx->remote_cn ? nctx->remote_cn : "(null)",
488               ctx->header.chkpt.log, ctx->header.chkpt.marker);
489         n_chkpt.log = htonl(ctx->header.chkpt.log);
490         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
491
492         /* screw short writes.  I'd rather die than not write my data! */
493         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
494                               &mask, e);
495         if(len < 0) {
496           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
497           goto socket_error;
498         }
499         if(len != sizeof(jlog_id)) {
500           noitL(noit_error, "short write on checkpointing stream.\n");
501           goto socket_error;
502         }
503         ctx->state = JLOG_STREAMER_WANT_COUNT;
504         break;
505     }
506   }
507   /* never get here */
508 }
509
510 int
511 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
512                             struct timeval *now) {
513   noit_connection_ctx_t *nctx = closure;
514   int rv;
515   const char *error = NULL;
516
517   rv = eventer_SSL_connect(e, &mask);
518   if(rv > 0) {
519     eventer_ssl_ctx_t *sslctx;
520     e->callback = nctx->consumer_callback;
521     /* We must make a copy of the acceptor_closure_t for each new
522      * connection.
523      */
524     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
525       const char *cn, *end;
526       void *vcn;
527       cn = eventer_ssl_get_peer_subject(sslctx);
528       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
529         cn += 3;
530         end = cn;
531         while(*end && *end != '/') end++;
532         nctx->remote_cn = malloc(end - cn + 1);
533         memcpy(nctx->remote_cn, cn, end - cn);
534         nctx->remote_cn[end-cn] = '\0';
535       }
536       if(nctx->config &&
537          noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) {
538         const char *cn_expected = vcn;
539         if(!nctx->remote_cn ||
540            strcmp(nctx->remote_cn, cn_expected)) {
541           error = "jlog connect CN mismatch\n";
542           goto error;
543         }
544       }
545     }
546     return e->callback(e, mask, e->closure, now);
547   }
548   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
549   noitL(noit_debug, "jlog streamer SSL upgrade failed.\n");
550
551  error:
552   if(error) noitL(noit_error, "%s", error);
553   eventer_remove_fd(e->fd);
554   nctx->e = NULL;
555   e->opset->close(e->fd, &mask, e);
556   noit_connection_schedule_reattempt(nctx, now);
557   return 0;
558 }
559 int
560 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
561                                  struct timeval *now) {
562   noit_connection_ctx_t *nctx = closure;
563   const char *cert, *key, *ca, *ciphers, *crl = NULL;
564   char remote_str[128], tmp_str[128];
565   eventer_ssl_ctx_t *sslctx;
566   int aerrno, len;
567   socklen_t aerrno_len = sizeof(aerrno);
568
569   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
570     if(aerrno != 0) goto connect_error;
571   aerrno = 0;
572
573   if(mask & EVENTER_EXCEPTION) {
574     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
575       aerrno = errno;
576  connect_error:
577     switch(nctx->r.remote.sa_family) {
578       case AF_INET:
579         len = sizeof(struct sockaddr_in);
580         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
581                   tmp_str, len);
582         snprintf(remote_str, sizeof(remote_str), "%s:%d",
583                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
584         break;
585       case AF_INET6:
586         len = sizeof(struct sockaddr_in6);
587         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
588                   tmp_str, len);
589         snprintf(remote_str, sizeof(remote_str), "%s:%d",
590                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
591        break;
592       case AF_UNIX:
593         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
594         break;
595       default:
596         snprintf(remote_str, sizeof(remote_str), "(unknown)");
597     }
598     noitL(noit_error, "Error connecting to %s: %s\n",
599           remote_str, strerror(aerrno));
600     eventer_remove_fd(e->fd);
601     nctx->e = NULL;
602     e->opset->close(e->fd, &mask, e);
603     noit_connection_schedule_reattempt(nctx, now);
604     return 0;
605   }
606
607 #define SSLCONFGET(var,name) do { \
608   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
609                          &var)) var = NULL; } while(0)
610   SSLCONFGET(cert, "certificate_file");
611   SSLCONFGET(key, "key_file");
612   SSLCONFGET(ca, "ca_chain");
613   SSLCONFGET(ciphers, "ciphers");
614   SSLCONFGET(crl, "crl");
615   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
616   if(!sslctx) goto connect_error;
617   if(crl) {
618     if(!eventer_ssl_use_crl(sslctx, crl)) {
619       noitL(noit_error, "Failed to load CRL from %s\n", crl);
620       eventer_ssl_ctx_free(sslctx);
621       goto connect_error;
622     }
623   }
624
625   memcpy(&nctx->last_connect, now, sizeof(*now));
626   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
627                              nctx->sslconfig);
628   EVENTER_ATTACH_SSL(e, sslctx);
629   e->callback = noit_connection_ssl_upgrade;
630   return e->callback(e, mask, closure, now);
631 }
632 static void
633 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
634   struct timeval __now;
635   eventer_t e;
636   int rv, fd = -1;
637 #ifdef SO_KEEPALIVE
638   int optval;
639   socklen_t optlen = sizeof(optval);
640 #endif
641
642   nctx->e = NULL;
643   if(nctx->wants_permanent_shutdown) {
644     noit_connection_ctx_dealloc(nctx);
645     return;
646   }
647   /* Open a socket */
648   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
649   if(fd < 0) goto reschedule;
650
651   /* Make it non-blocking */
652   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
653 #define set_or_bail(type, opt, val) do { \
654   optval = val; \
655   optlen = sizeof(optval); \
656   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
657     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
658           strerror(errno)); \
659     goto reschedule; \
660   } \
661 } while(0)
662 #ifdef SO_KEEPALIVE
663   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
664 #endif
665 #ifdef TCP_KEEPALIVE_THRESHOLD
666   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
667 #endif
668 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
669   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
670 #endif
671 #ifdef TCP_CONN_NOTIFY_THRESHOLD
672   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
673 #endif
674 #ifdef TCP_CONN_ABORT_THRESHOLD
675   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
676 #endif
677
678   /* Initiate a connection */
679   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
680   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
681
682   /* Register a handler for connection completion */
683   e = eventer_alloc();
684   e->fd = fd;
685   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
686   e->callback = noit_connection_complete_connect;
687   e->closure = nctx;
688   nctx->e = e;
689   eventer_add(e);
690
691   noit_connection_update_timeout(nctx);
692   return;
693
694  reschedule:
695   if(fd >= 0) close(fd);
696   gettimeofday(&__now, NULL);
697   noit_connection_schedule_reattempt(nctx, &__now);
698   return;
699 }
700
701 int
702 noit_connection_update_timeout(noit_connection_ctx_t *nctx) {
703   struct timeval now, diff;
704   if(nctx->max_silence == 0) return 0;
705
706   diff.tv_sec = nctx->max_silence / 1000;
707   diff.tv_usec = (nctx->max_silence % 1000) * 1000;
708   gettimeofday(&now, NULL);
709
710   if(!nctx->timeout_event) {
711     nctx->timeout_event = eventer_alloc();
712     nctx->timeout_event->mask = EVENTER_TIMER;
713     nctx->timeout_event->closure = nctx;
714     nctx->timeout_event->callback = noit_connection_session_timeout;
715     add_timeval(now, diff, &nctx->timeout_event->whence);
716     eventer_add(nctx->timeout_event);
717   }
718   else {
719     add_timeval(now, diff, &nctx->timeout_event->whence);
720     eventer_update(nctx->timeout_event, EVENTER_TIMER);
721   }
722   return 0;
723 }
724
725 int
726 noit_connection_disable_timeout(noit_connection_ctx_t *nctx) {
727   if(nctx->timeout_event) {
728     eventer_remove(nctx->timeout_event);
729     eventer_free(nctx->timeout_event);
730     nctx->timeout_event = NULL;
731   }
732   return 0;
733 }
734
735 int
736 initiate_noit_connection(const char *host, unsigned short port,
737                          noit_hash_table *sslconfig, noit_hash_table *config,
738                          eventer_func_t handler, void *closure,
739                          void (*freefunc)(void *)) {
740   noit_connection_ctx_t *ctx;
741   const char *stimeout;
742   int8_t family;
743   int rv;
744   union {
745     struct in_addr addr4;
746     struct in6_addr addr6;
747   } a;
748
749   if(host[0] == '/') {
750     family = AF_UNIX;
751   }
752   else {
753     family = AF_INET;
754     rv = inet_pton(family, host, &a);
755     if(rv != 1) {
756       family = AF_INET6;
757       rv = inet_pton(family, host, &a);
758       if(rv != 1) {
759         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
760         return -1;
761       }
762     }
763   }
764
765   ctx = noit_connection_ctx_alloc();
766   ctx->remote_str = calloc(1, strlen(host) + 7);
767   snprintf(ctx->remote_str, strlen(host) + 7,
768            "%s:%d", host, port);
769  
770   memset(&ctx->r, 0, sizeof(ctx->r));
771   if(family == AF_UNIX) {
772     struct sockaddr_un *s = &ctx->r.remote_un;
773     s->sun_family = AF_UNIX;
774     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
775     ctx->remote_len = sizeof(*s);
776   }
777   else if(family == AF_INET) {
778     struct sockaddr_in *s = &ctx->r.remote_in;
779     s->sin_family = family;
780     s->sin_port = htons(port);
781     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
782     ctx->remote_len = sizeof(*s);
783   }
784   else {
785     struct sockaddr_in6 *s = &ctx->r.remote_in6;
786     s->sin6_family = family;
787     s->sin6_port = htons(port);
788     memcpy(&s->sin6_addr, &a, sizeof(a));
789     ctx->remote_len = sizeof(*s);
790   }
791
792   if(ctx->sslconfig)
793     noit_hash_delete_all(ctx->sslconfig, free, free);
794   else
795     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
796   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
797   if(ctx->config)
798     noit_hash_delete_all(ctx->config, free, free);
799   else
800     ctx->config = calloc(1, sizeof(noit_hash_table));
801   noit_hash_merge_as_dict(ctx->config, config);
802
803   if(noit_hash_retr_str(ctx->config, "timeout", strlen("timeout"), &stimeout))
804     ctx->max_silence = atoi(stimeout);
805   else
806     ctx->max_silence = DEFAULT_NOIT_CONNECTION_TIMEOUT;
807   ctx->consumer_callback = handler;
808   ctx->consumer_free = freefunc;
809   ctx->consumer_ctx = closure;
810   noit_connection_initiate_connection(ctx);
811   return 0;
812 }
813
814 void
815 stratcon_streamer_connection(const char *toplevel, const char *destination,
816                              eventer_func_t handler,
817                              void *(*handler_alloc)(void), void *handler_ctx,
818                              void (*handler_free)(void *)) {
819   int i, cnt = 0;
820   noit_conf_section_t *noit_configs;
821   char path[256];
822
823   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
824   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
825   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
826   for(i=0; i<cnt; i++) {
827     char address[256];
828     unsigned short port;
829     int portint;
830     noit_hash_table *sslconfig, *config;
831
832     if(!noit_conf_get_stringbuf(noit_configs[i],
833                                 "ancestor-or-self::node()/@address",
834                                 address, sizeof(address))) {
835       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
836       continue;
837     }
838     /* if destination is specified, exact match it */
839     if(destination && strcmp(address, destination)) continue;
840
841     if(!noit_conf_get_int(noit_configs[i],
842                           "ancestor-or-self::node()/@port", &portint))
843       portint = 0;
844     port = (unsigned short) portint;
845     if(address[0] != '/' && (portint == 0 || (port != portint))) {
846       /* UNIX sockets don't require a port (they'll ignore it if specified */
847       noitL(noit_stderr,
848             "Invalid port [%d] specified in stanza %d\n", port, i+1);
849       continue;
850     }
851     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
852     config = noit_conf_get_hash(noit_configs[i], "config");
853
854     noitL(noit_error, "initiating to %s\n", address);
855     initiate_noit_connection(address, port, sslconfig, config,
856                              handler,
857                              handler_alloc ? handler_alloc() : handler_ctx,
858                              handler_free);
859     noit_hash_destroy(sslconfig,free,free);
860     free(sslconfig);
861     noit_hash_destroy(config,free,free);
862     free(config);
863   }
864   free(noit_configs);
865 }
866 int
867 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
868   int rv = -1;
869   void *vip;
870   pthread_mutex_lock(&noit_ip_by_cn_lock);
871   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
872     int new_len;
873     char *new_ip = (char *)vip;
874     new_len = strlen(new_ip);
875     strlcpy(ip, new_ip, len);
876     if(new_len >= len) rv = new_len+1;
877     else rv = 0;
878   }
879   pthread_mutex_unlock(&noit_ip_by_cn_lock);
880   return rv;
881 }
882 void
883 stratcon_jlog_streamer_recache_noit() {
884   int di, cnt;
885   noit_conf_section_t *noit_configs;
886   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
887   pthread_mutex_lock(&noit_ip_by_cn_lock);
888   noit_hash_delete_all(&noit_ip_by_cn, free, free);
889   for(di=0; di<cnt; di++) {
890     char address[64];
891     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
892                                  address, sizeof(address))) {
893       char expected_cn[256];
894       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
895                                  expected_cn, sizeof(expected_cn)))
896         noit_hash_store(&noit_ip_by_cn,
897                         strdup(expected_cn), strlen(expected_cn),
898                         strdup(address));
899     }
900   }
901   free(noit_configs);
902   pthread_mutex_unlock(&noit_ip_by_cn_lock);
903 }
904 void
905 stratcon_jlog_streamer_reload(const char *toplevel) {
906   /* flush and repopulate the cn cache */
907   stratcon_jlog_streamer_recache_noit();
908   if(!stratcon_datastore_get_enabled()) return;
909   stratcon_streamer_connection(toplevel, NULL,
910                                stratcon_jlog_recv_handler,
911                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
912                                NULL,
913                                jlog_streamer_ctx_free);
914 }
915
916 char *
917 stratcon_console_noit_opts(noit_console_closure_t ncct,
918                            noit_console_state_stack_t *stack,
919                            noit_console_state_t *dstate,
920                            int argc, char **argv, int idx) {
921   if(argc == 1) {
922     noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
923     const char *key_id;
924     int klen, i = 0;
925     void *vconn, *vcn;
926     noit_connection_ctx_t *ctx;
927     noit_hash_table dedup = NOIT_HASH_EMPTY;
928
929     pthread_mutex_lock(&noits_lock);
930     while(noit_hash_next(&noits, &iter, &key_id, &klen, &vconn)) {
931       ctx = (noit_connection_ctx_t *)vconn;
932       vcn = NULL;
933       if(ctx->config && noit_hash_retrieve(ctx->config, "cn", 2, &vcn) &&
934          !noit_hash_store(&dedup, vcn, strlen(vcn), NULL)) {
935         if(!strncmp(vcn, argv[0], strlen(argv[0]))) {
936           if(idx == i) {
937             pthread_mutex_unlock(&noits_lock);
938             noit_hash_destroy(&dedup, NULL, NULL);
939             return strdup(vcn);
940           }
941           i++;
942         }
943       }
944       if(ctx->remote_str &&
945          !noit_hash_store(&dedup, ctx->remote_str, strlen(ctx->remote_str), NULL)) {
946         if(!strncmp(ctx->remote_str, argv[0], strlen(argv[0]))) {
947           if(idx == i) {
948             pthread_mutex_unlock(&noits_lock);
949             noit_hash_destroy(&dedup, NULL, NULL);
950             return strdup(ctx->remote_str);
951           }
952           i++;
953         }
954       }
955     }
956     pthread_mutex_unlock(&noits_lock);
957     noit_hash_destroy(&dedup, NULL, NULL);
958   }
959   if(argc == 2)
960     return noit_console_opt_delegate(ncct, stack, dstate, argc-1, argv+1, idx);
961   return NULL;
962 }
963 static int
964 stratcon_console_show_noits(noit_console_closure_t ncct,
965                             int argc, char **argv,
966                             noit_console_state_t *dstate,
967                             void *closure) {
968   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
969   const char *key_id, *ecn;
970   int klen, n = 0, i;
971   void *vconn;
972   noit_connection_ctx_t **ctx;
973
974   if(closure != (void *)0 && argc == 0) {
975     nc_printf(ncct, "takes an argument\n");
976     return 0;
977   }
978   if(closure == (void *)0 && argc > 0) {
979     nc_printf(ncct, "takes no arguments\n");
980     return 0;
981   }
982   pthread_mutex_lock(&noits_lock);
983   ctx = malloc(sizeof(*ctx) * noits.size);
984   while(noit_hash_next(&noits, &iter, &key_id, &klen,
985                        &vconn)) {
986     ctx[n] = (noit_connection_ctx_t *)vconn;
987     if(argc == 0 ||
988        !strcmp(ctx[n]->remote_str, argv[0]) ||
989        (ctx[n]->config && noit_hash_retr_str(ctx[n]->config, "cn", 2, &ecn) &&
990         !strcmp(ecn, argv[0]))) {
991       noit_atomic_inc32(&ctx[n]->refcnt);
992       n++;
993     }
994   }
995   pthread_mutex_unlock(&noits_lock);
996   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
997   for(i=0; i<n; i++) {
998     nc_print_noit_conn_brief(ncct, ctx[i]);
999     noit_connection_ctx_deref(ctx[i]);
1000   }
1001   free(ctx);
1002   return 0;
1003 }
1004
1005 static void
1006 emit_noit_info_metrics(struct timeval *now, const char *uuid_str,
1007                        noit_connection_ctx_t *nctx) {
1008   struct timeval last, session_duration, diff;
1009   u_int64_t session_duration_ms, last_event_ms;
1010   jlog_streamer_ctx_t *jctx = nctx->consumer_ctx;
1011   char str[1024], *wr;
1012   int len;
1013   void *vcn;
1014   const char *cn_expected;
1015   const char *feedtype = "unknown";
1016
1017   if(jctx->push == stratcon_datastore_push)
1018     feedtype = "storage";
1019   else if(jctx->push == stratcon_iep_line_processor)
1020     feedtype = "iep";
1021   if(NULL != (wr = strchr(feedtype, '/'))) feedtype = wr+1;
1022
1023   noit_hash_retrieve(nctx->config, "cn", 2, &vcn);
1024   if(!vcn) return;
1025   cn_expected = vcn;
1026
1027   snprintf(str, sizeof(str), "M\t%lu.%03lu\t%s\t%s`%s`",
1028            (long unsigned int)now->tv_sec,
1029            (long unsigned int)now->tv_usec/1000UL,
1030            uuid_str, cn_expected, feedtype);
1031   wr = str + strlen(str);
1032   len = sizeof(str) - (wr - str);
1033
1034   /* Now we write NAME TYPE VALUE into wr each time and push it */
1035 #define push_noit_m_str(name, value) do { \
1036   snprintf(wr, len, "%s\ts\t%s\n", name, value); \
1037   stratcon_datastore_push(DS_OP_INSERT, \
1038                           (struct sockaddr *)&self_stratcon_ip, \
1039                           self_stratcon_hostname, strdup(str), NULL); \
1040   stratcon_iep_line_processor(DS_OP_INSERT, \
1041                               (struct sockaddr *)&self_stratcon_ip, \
1042                               self_stratcon_hostname, strdup(str), NULL); \
1043 } while(0)
1044 #define push_noit_m_u64(name, value) do { \
1045   snprintf(wr, len, "%s\tL\t%llu\n", name, (long long unsigned int)value); \
1046   stratcon_datastore_push(DS_OP_INSERT, \
1047                           (struct sockaddr *)&self_stratcon_ip, \
1048                           self_stratcon_hostname, strdup(str), NULL); \
1049   stratcon_iep_line_processor(DS_OP_INSERT, \
1050                               (struct sockaddr *)&self_stratcon_ip, \
1051                               self_stratcon_hostname, strdup(str), NULL); \
1052 } while(0)
1053
1054   last.tv_sec = jctx->header.tv_sec;
1055   last.tv_usec = jctx->header.tv_usec;
1056   sub_timeval(*now, last, &diff);
1057   last_event_ms = diff.tv_sec * 1000 + diff.tv_usec / 1000;
1058   sub_timeval(*now, nctx->last_connect, &session_duration);
1059   session_duration_ms = session_duration.tv_sec * 1000 +
1060                         session_duration.tv_usec / 1000;
1061
1062   push_noit_m_str("state", nctx->remote_cn ? "connected" :
1063                              (nctx->retry_event ? "disconnected" :
1064                                                   "connecting"));
1065   push_noit_m_u64("last_event_age_ms", last_event_ms);
1066   push_noit_m_u64("session_length_ms", session_duration_ms);
1067 }
1068 static int
1069 periodic_noit_metrics(eventer_t e, int mask, void *closure,
1070                       struct timeval *now) {
1071   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1072   noit_connection_ctx_t **ctxs;
1073   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1074   const char *key_id;
1075   void *vconn;
1076   int klen, n = 0, i;
1077   char str[1024];
1078   char uuid_str[UUID_STR_LEN+1];
1079
1080   uuid_unparse_lower(self_stratcon_id, uuid_str);
1081
1082   if(closure == NULL) {
1083     /* Only do this the first time we get called */
1084     char ip_str[128];
1085     inet_ntop(AF_INET, &self_stratcon_ip.sin_addr, ip_str,
1086               sizeof(ip_str));
1087     snprintf(str, sizeof(str), "C\t%lu.%03lu\t%s\t%s\tstratcon\t%s\n",
1088              (long unsigned int)now->tv_sec,
1089              (long unsigned int)now->tv_usec/1000UL, uuid_str, ip_str,
1090              self_stratcon_hostname);
1091     stratcon_datastore_push(DS_OP_INSERT,
1092                             (struct sockaddr *)&self_stratcon_ip,
1093                             self_stratcon_hostname, strdup(str), NULL);
1094     stratcon_iep_line_processor(DS_OP_INSERT,
1095                                 (struct sockaddr *)&self_stratcon_ip,
1096                                 self_stratcon_hostname, strdup(str), NULL);
1097   }
1098
1099   pthread_mutex_lock(&noits_lock);
1100   ctxs = malloc(sizeof(*ctxs) * noits.size);
1101   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1102                        &vconn)) {
1103     ctxs[n] = (noit_connection_ctx_t *)vconn;
1104     noit_atomic_inc32(&ctxs[n]->refcnt);
1105     n++;
1106   }
1107   pthread_mutex_unlock(&noits_lock);
1108
1109   snprintf(str, sizeof(str), "S\t%lu.%03lu\t%s\tG\tA\t0\tok\n",
1110            (long unsigned int)now->tv_sec,
1111            (long unsigned int)now->tv_usec/1000UL, uuid_str);
1112   stratcon_datastore_push(DS_OP_INSERT,
1113                           (struct sockaddr *)&self_stratcon_ip,
1114                           self_stratcon_hostname, strdup(str), NULL);
1115   stratcon_iep_line_processor(DS_OP_INSERT, \
1116                               (struct sockaddr *)&self_stratcon_ip, \
1117                               self_stratcon_hostname, strdup(str), NULL); \
1118   for(i=0; i<n; i++) {
1119     emit_noit_info_metrics(now, uuid_str, ctxs[i]);
1120     noit_connection_ctx_deref(ctxs[i]);
1121   }
1122   free(ctxs);
1123   stratcon_datastore_push(DS_OP_CHKPT,
1124                           (struct sockaddr *)&self_stratcon_ip,
1125                           self_stratcon_hostname, NULL, NULL);
1126   stratcon_iep_line_processor(DS_OP_CHKPT, \
1127                               (struct sockaddr *)&self_stratcon_ip, \
1128                               self_stratcon_hostname, NULL, NULL); \
1129
1130   add_timeval(e->whence, whence, &whence);
1131   eventer_add_at(periodic_noit_metrics, (void *)0x1, whence);
1132   return 0;
1133 }
1134
1135 static int
1136 rest_show_noits(noit_http_rest_closure_t *restc,
1137                 int npats, char **pats) {
1138   xmlDocPtr doc;
1139   xmlNodePtr root;
1140   noit_hash_table seen = NOIT_HASH_EMPTY;
1141   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1142   char path[256];
1143   const char *key_id;
1144   const char *type = NULL, *want_cn = NULL;
1145   int klen, n = 0, i, di, cnt;
1146   void *vconn;
1147   noit_connection_ctx_t **ctxs;
1148   noit_conf_section_t *noit_configs;
1149   struct timeval now, diff, last;
1150   xmlNodePtr node;
1151   noit_http_request *req = noit_http_session_request(restc->http_ctx);
1152
1153   noit_http_process_querystring(req);
1154   type = noit_http_request_querystring(req, "type");
1155   want_cn = noit_http_request_querystring(req, "cn");
1156
1157   gettimeofday(&now, NULL);
1158
1159   pthread_mutex_lock(&noits_lock);
1160   ctxs = malloc(sizeof(*ctxs) * noits.size);
1161   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1162                        &vconn)) {
1163     ctxs[n] = (noit_connection_ctx_t *)vconn;
1164     noit_atomic_inc32(&ctxs[n]->refcnt);
1165     n++;
1166   }
1167   pthread_mutex_unlock(&noits_lock);
1168   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
1169
1170   doc = xmlNewDoc((xmlChar *)"1.0");
1171   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
1172   xmlDocSetRootElement(doc, root);
1173
1174   for(i=0; i<n; i++) {
1175     char buff[256];
1176     const char *feedtype = "unknown", *state = "unknown";
1177     noit_connection_ctx_t *ctx = ctxs[i];
1178     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
1179
1180     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
1181
1182     /* If the user requested a specific type and we're not it, skip. */
1183     if(type && strcmp(feedtype, type)) continue;
1184     /* If the user wants a specific CN... limit to that. */
1185     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn)))
1186       continue;
1187
1188     node = xmlNewNode(NULL, (xmlChar *)"noit");
1189     snprintf(buff, sizeof(buff), "%llu.%06d",
1190              (long long unsigned)ctx->last_connect.tv_sec,
1191              (int)ctx->last_connect.tv_usec);
1192     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
1193     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
1194                (xmlChar *)"connected" :
1195                (ctx->retry_event ? (xmlChar *)"disconnected" :
1196                                     (xmlChar *)"connecting"));
1197     if(ctx->e) {
1198       char buff[128];
1199       const char *addrstr = NULL;
1200       struct sockaddr_in6 addr6;
1201       socklen_t len = sizeof(addr6);
1202       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1203         unsigned short port = 0;
1204         if(addr6.sin6_family == AF_INET) {
1205           addrstr = inet_ntop(addr6.sin6_family,
1206                               &((struct sockaddr_in *)&addr6)->sin_addr,
1207                               buff, sizeof(buff));
1208           memcpy(&port, &((struct sockaddr_in *)&addr6)->sin_port, sizeof(port));
1209           port = ntohs(port);
1210         }
1211         else if(addr6.sin6_family == AF_INET6) {
1212           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1213                               buff, sizeof(buff));
1214           port = ntohs(addr6.sin6_port);
1215         }
1216         if(addrstr != NULL) {
1217           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1218                    ":%u", port);
1219           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1220         }
1221       }
1222     }
1223     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1224                       0, free, NULL);
1225     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1226     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1227     if(ctx->retry_event) {
1228       sub_timeval(ctx->retry_event->whence, now, &diff);
1229       snprintf(buff, sizeof(buff), "%llu.%06d",
1230                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1231       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1232     }
1233     else if(ctx->remote_cn) {
1234       if(ctx->remote_cn)
1235         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1236  
1237       switch(jctx->state) {
1238         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1239         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1240         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1241         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1242         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1243         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1244         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1245       }
1246       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1247       snprintf(buff, sizeof(buff), "%08x:%08x",
1248                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1249       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1250       snprintf(buff, sizeof(buff), "%llu",
1251                (unsigned long long)jctx->total_events);
1252       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1253       snprintf(buff, sizeof(buff), "%llu",
1254                (unsigned long long)jctx->total_bytes_read);
1255       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1256  
1257       sub_timeval(now, ctx->last_connect, &diff);
1258       snprintf(buff, sizeof(buff), "%lld.%06d",
1259                (long long)diff.tv_sec, (int)diff.tv_usec);
1260       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1261  
1262       if(jctx->header.tv_sec) {
1263         last.tv_sec = jctx->header.tv_sec;
1264         last.tv_usec = jctx->header.tv_usec;
1265         snprintf(buff, sizeof(buff), "%llu.%06d",
1266                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1267         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1268         sub_timeval(now, last, &diff);
1269         snprintf(buff, sizeof(buff), "%lld.%06d",
1270                  (long long)diff.tv_sec, (int)diff.tv_usec);
1271         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1272       }
1273     }
1274
1275     xmlAddChild(root, node);
1276     noit_connection_ctx_deref(ctx);
1277   }
1278   free(ctxs);
1279
1280   if(!type || !strcmp(type, "configured")) {
1281     snprintf(path, sizeof(path), "//noits//noit");
1282     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1283     for(di=0; di<cnt; di++) {
1284       char address[64], port_str[32], remote_str[98];
1285       char expected_cn_buff[256], *expected_cn = NULL;
1286       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1287                                  expected_cn_buff, sizeof(expected_cn_buff)))
1288         expected_cn = expected_cn_buff;
1289       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1290       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1291                                  address, sizeof(address))) {
1292         void *v;
1293         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1294                                    port_str, sizeof(port_str)))
1295           strlcpy(port_str, "43191", sizeof(port_str));
1296
1297         /* If the user wants a specific CN... limit to that. */
1298           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1299             continue;
1300
1301         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1302         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1303           node = xmlNewNode(NULL, (xmlChar *)"noit");
1304           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1305           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1306           if(expected_cn)
1307             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1308           xmlAddChild(root, node);
1309         }
1310       }
1311     }
1312     free(noit_configs);
1313   }
1314   noit_hash_destroy(&seen, free, NULL);
1315
1316   noit_http_response_ok(restc->http_ctx, "text/xml");
1317   noit_http_response_xml(restc->http_ctx, doc);
1318   noit_http_response_end(restc->http_ctx);
1319   xmlFreeDoc(doc);
1320   return 0;
1321 }
1322 static int
1323 stratcon_add_noit(const char *target, unsigned short port,
1324                   const char *cn) {
1325   int cnt;
1326   char path[256];
1327   char port_str[6];
1328   noit_conf_section_t *noit_configs, parent;
1329   xmlNodePtr newnoit, config, cnnode;
1330
1331   snprintf(path, sizeof(path),
1332            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1333   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1334   free(noit_configs);
1335   if(cnt != 0) return -1;
1336
1337   parent = noit_conf_get_section(NULL, "//noits");
1338   if(!parent) return -1;
1339   snprintf(port_str, sizeof(port_str), "%d", port);
1340   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1341   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1342   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1343   xmlAddChild(parent, newnoit);
1344   if(cn) {
1345     config = xmlNewNode(NULL, (xmlChar *)"config");
1346     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1347     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1348     xmlAddChild(config, cnnode);
1349     xmlAddChild(newnoit, config);
1350     pthread_mutex_lock(&noit_ip_by_cn_lock);
1351     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1352                       strdup(target), free, free);
1353     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1354   }
1355   if(stratcon_datastore_get_enabled())
1356     stratcon_streamer_connection(NULL, target,
1357                                  stratcon_jlog_recv_handler,
1358                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1359                                  NULL,
1360                                  jlog_streamer_ctx_free);
1361   if(stratcon_iep_get_enabled())
1362     stratcon_streamer_connection(NULL, target,
1363                                  stratcon_jlog_recv_handler,
1364                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1365                                  NULL,
1366                                  jlog_streamer_ctx_free);
1367   return 1;
1368 }
1369 static int
1370 stratcon_remove_noit(const char *target, unsigned short port) {
1371   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1372   const char *key_id;
1373   int klen, n = -1, i, cnt = 0;
1374   void *vconn;
1375   noit_connection_ctx_t **ctx;
1376   noit_conf_section_t *noit_configs;
1377   char path[256];
1378   char remote_str[256];
1379
1380   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1381
1382   snprintf(path, sizeof(path),
1383            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1384   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1385   for(i=0; i<cnt; i++) {
1386     char expected_cn[256];
1387     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1388                                expected_cn, sizeof(expected_cn))) {
1389       pthread_mutex_lock(&noit_ip_by_cn_lock);
1390       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1391                        free, free);
1392       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1393     }
1394     xmlUnlinkNode(noit_configs[i]);
1395     xmlFreeNode(noit_configs[i]);
1396     n = 0;
1397   }
1398   free(noit_configs);
1399
1400   pthread_mutex_lock(&noits_lock);
1401   ctx = malloc(sizeof(*ctx) * noits.size);
1402   while(noit_hash_next(&noits, &iter, &key_id, &klen,
1403                        &vconn)) {
1404     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1405       ctx[n] = (noit_connection_ctx_t *)vconn;
1406       noit_atomic_inc32(&ctx[n]->refcnt);
1407       n++;
1408     }
1409   }
1410   pthread_mutex_unlock(&noits_lock);
1411   for(i=0; i<n; i++) {
1412     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1413     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1414   }
1415   free(ctx);
1416   return n;
1417 }
1418 static int
1419 rest_set_noit(noit_http_rest_closure_t *restc,
1420               int npats, char **pats) {
1421   const char *cn = NULL;
1422   noit_http_session_ctx *ctx = restc->http_ctx;
1423   noit_http_request *req = noit_http_session_request(ctx);
1424   unsigned short port = 43191;
1425   if(npats < 1 || npats > 2)
1426     noit_http_response_server_error(ctx, "text/xml");
1427   if(npats == 2) port = atoi(pats[1]);
1428   noit_http_process_querystring(req);
1429   cn = noit_http_request_querystring(req, "cn");
1430   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1431     noit_http_response_ok(ctx, "text/xml");
1432   else
1433     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1434   if(noit_conf_write_file(NULL) != 0)
1435     noitL(noit_error, "local config write failed\n");
1436   noit_conf_mark_changed();
1437   noit_http_response_end(ctx);
1438   return 0;
1439 }
1440 static int
1441 rest_delete_noit(noit_http_rest_closure_t *restc,
1442                  int npats, char **pats) {
1443   noit_http_session_ctx *ctx = restc->http_ctx;
1444   unsigned short port = 43191;
1445   if(npats < 1 || npats > 2)
1446     noit_http_response_server_error(ctx, "text/xml");
1447   if(npats == 2) port = atoi(pats[1]);
1448   if(stratcon_remove_noit(pats[0], port) >= 0)
1449     noit_http_response_ok(ctx, "text/xml");
1450   else
1451     noit_http_response_not_found(ctx, "text/xml");
1452   if(noit_conf_write_file(NULL) != 0)
1453     noitL(noit_error, "local config write failed\n");
1454   noit_conf_mark_changed();
1455   noit_http_response_end(ctx);
1456   return 0;
1457 }
1458 static int
1459 stratcon_console_conf_noits(noit_console_closure_t ncct,
1460                             int argc, char **argv,
1461                             noit_console_state_t *dstate,
1462                             void *closure) {
1463   char *cp, target[128];
1464   unsigned short port = 43191;
1465   int adding = (int)(vpsized_int)closure;
1466   if(argc != 1)
1467     return -1;
1468
1469   cp = strchr(argv[0], ':');
1470   if(cp) {
1471     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1472     port = atoi(cp+1);
1473   }
1474   else strlcpy(target, argv[0], sizeof(target));
1475   if(adding) {
1476     if(stratcon_add_noit(target, port, NULL) >= 0) {
1477       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1478     }
1479     else {
1480       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1481     }
1482   }
1483   else {
1484     if(stratcon_remove_noit(target, port) >= 0) {
1485       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1486     }
1487     else {
1488       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1489     }
1490   }
1491   return 0;
1492 }
1493
1494 static void
1495 register_console_streamer_commands() {
1496   noit_console_state_t *tl;
1497   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1498
1499   tl = noit_console_state_initial();
1500   showcmd = noit_console_state_get_cmd(tl, "show");
1501   assert(showcmd && showcmd->dstate);
1502   confcmd = noit_console_state_get_cmd(tl, "configure");
1503   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1504   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1505
1506   noit_console_state_add_cmd(conftcmd->dstate,
1507     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1508   noit_console_state_add_cmd(conftnocmd->dstate,
1509     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1510
1511   noit_console_state_add_cmd(showcmd->dstate,
1512     NCSCMD("noit", stratcon_console_show_noits,
1513            stratcon_console_noit_opts, NULL, (void *)1));
1514   noit_console_state_add_cmd(showcmd->dstate,
1515     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1516 }
1517
1518 void
1519 stratcon_jlog_streamer_init(const char *toplevel) {
1520   struct timeval whence = DEFAULT_NOIT_PERIOD_TV;
1521   struct in_addr remote;
1522   char uuid_str[UUID_STR_LEN + 1];
1523
1524   pthread_mutex_init(&noits_lock, NULL);
1525   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1526   eventer_name_callback("noit_connection_reinitiate",
1527                         noit_connection_reinitiate);
1528   eventer_name_callback("stratcon_jlog_recv_handler",
1529                         stratcon_jlog_recv_handler);
1530   eventer_name_callback("noit_connection_ssl_upgrade",
1531                         noit_connection_ssl_upgrade);
1532   eventer_name_callback("noit_connection_complete_connect",
1533                         noit_connection_complete_connect);
1534   eventer_name_callback("noit_connection_session_timeout",
1535                         noit_connection_session_timeout);
1536   register_console_streamer_commands();
1537   stratcon_jlog_streamer_reload(toplevel);
1538   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1539   assert(noit_http_rest_register_auth(
1540     "GET", "/noits/", "^show$", rest_show_noits,
1541              noit_http_rest_client_cert_auth
1542   ) == 0);
1543   assert(noit_http_rest_register_auth(
1544     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1545              noit_http_rest_client_cert_auth
1546   ) == 0);
1547   assert(noit_http_rest_register_auth(
1548     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1549              noit_http_rest_client_cert_auth
1550   ) == 0);
1551   assert(noit_http_rest_register_auth(
1552     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1553              noit_http_rest_client_cert_auth
1554   ) == 0);
1555   assert(noit_http_rest_register_auth(
1556     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1557              noit_http_rest_client_cert_auth
1558   ) == 0);
1559
1560   uuid_clear(self_stratcon_id);
1561
1562   if(noit_conf_get_stringbuf(NULL, "/stratcon/@id",
1563                              uuid_str, sizeof(uuid_str)) &&
1564      uuid_parse(uuid_str, self_stratcon_id) == 0) {
1565     int period;
1566     /* If a UUID was provided for stratcon itself, we will report metrics
1567      * on a large variety of things (including all noits).
1568      */
1569     if(noit_conf_get_int(NULL, "/stratcon/@metric_period", &period) &&
1570        period > 0) {
1571       DEFAULT_NOIT_PERIOD_TV.tv_sec = period / 1000;
1572       DEFAULT_NOIT_PERIOD_TV.tv_usec = (period % 1000) * 1000;
1573     }
1574     self_stratcon_ip.sin_family = AF_INET;
1575     remote.s_addr = 0xffffffff;
1576     noit_getip_ipv4(remote, &self_stratcon_ip.sin_addr);
1577     gethostname(self_stratcon_hostname, sizeof(self_stratcon_hostname));
1578     eventer_add_in(periodic_noit_metrics, NULL, whence);
1579   }
1580 }
1581
Note: See TracBrowser for help on using the browser.