root/src/stratcon_jlog_streamer.c

Revision ae8bf798b9af68dc0cd9559a4e9c8076b47575f1, 39.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

Fix readdir on solaris and a bug in fetching the fqdn for storage nodes

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "noit_rest.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_jlog_streamer.h"
42 #include "stratcon_iep.h"
43
44 #include <unistd.h>
45 #include <assert.h>
46 #include <errno.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #ifdef HAVE_SYS_FILIO_H
50 #include <sys/filio.h>
51 #endif
52 #include <netinet/in.h>
53 #include <sys/un.h>
54 #include <arpa/inet.h>
55
56 pthread_mutex_t noits_lock;
57 noit_hash_table noits = NOIT_HASH_EMPTY;
58
59 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
60
61 static int
62 remote_str_sort(const void *a, const void *b) {
63   int rv;
64   noit_connection_ctx_t * const *actx = a;
65   noit_connection_ctx_t * const *bctx = b;
66   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
67   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
68   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
69   if(rv) return rv;
70   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
71            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
72 }
73 static void
74 nc_print_noit_conn_brief(noit_console_closure_t ncct,
75                           noit_connection_ctx_t *ctx) {
76   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
77   struct timeval now, diff, session_duration;
78   const char *feedtype = "unknown";
79   const char *lasttime = "never";
80   if(ctx->last_connect.tv_sec != 0) {
81     char cmdbuf[4096];
82     time_t r = ctx->last_connect.tv_sec;
83     struct tm tbuf, *tm;
84     tm = gmtime_r(&r, &tbuf);
85     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
86     lasttime = cmdbuf;
87   }
88   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
89             ctx->remote_cn ? "connected" :
90                              (ctx->timeout_event ? "disconnected" :
91                                                    "connecting"), lasttime);
92   if(ctx->e) {
93     char buff[128];
94     const char *addrstr = NULL;
95     struct sockaddr_in6 addr6;
96     socklen_t len = sizeof(addr6);
97     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
98       unsigned short port = 0;
99       if(addr6.sin6_family == AF_INET) {
100         addrstr = inet_ntop(addr6.sin6_family,
101                             &((struct sockaddr_in *)&addr6)->sin_addr,
102                             buff, sizeof(buff));
103         port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
104       }
105       else if(addr6.sin6_family == AF_INET6) {
106         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
107                             buff, sizeof(buff));
108         port = ntohs(addr6.sin6_port);
109       }
110       if(addrstr != NULL)
111         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
112       else
113         nc_printf(ncct, "\tLocal address not interpretable\n");
114     }
115     else {
116       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
117                 ctx->e->fd, strerror(errno));
118     }
119   }
120   switch(ntohl(jctx->jlog_feed_cmd)) {
121     case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
122     case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
123   }
124   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
125   gettimeofday(&now, NULL);
126   if(ctx->timeout_event) {
127     sub_timeval(ctx->timeout_event->whence, now, &diff);
128     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
129               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
130   }
131   else if(ctx->remote_cn) {
132     nc_printf(ncct, "\tRemote CN: '%s'\n",
133               ctx->remote_cn ? ctx->remote_cn : "???");
134     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
135       struct timeval last;
136       double session_duration_seconds;
137       const char *state = "unknown";
138
139       switch(jctx->state) {
140         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
141         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
142         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
143         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
144         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
145         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
146         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
147       }
148       last.tv_sec = jctx->header.tv_sec;
149       last.tv_usec = jctx->header.tv_usec;
150       sub_timeval(now, last, &diff);
151       sub_timeval(now, ctx->last_connect, &session_duration);
152       session_duration_seconds = session_duration.tv_sec +
153                                  (double)session_duration.tv_usec/1000000.0;
154       nc_printf(ncct, "\tState: %s\n"
155                       "\tNext checkpoint: [%08x:%08x]\n"
156                       "\tLast event: %lld.%06us ago\n"
157                       "\tEvents this session: %llu (%0.2f/s)\n"
158                       "\tOctets this session: %llu (%0.2f/s)\n",
159                 state,
160                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
161                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
162                 jctx->total_events,
163                 (double)jctx->total_events/session_duration_seconds,
164                 jctx->total_bytes_read,
165                 (double)jctx->total_bytes_read/session_duration_seconds);
166     }
167     else {
168       nc_printf(ncct, "\tUnknown type.\n");
169     }
170   }
171 }
172
173 jlog_streamer_ctx_t *
174 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
175   jlog_streamer_ctx_t *ctx;
176   ctx = stratcon_jlog_streamer_ctx_alloc();
177   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
178   ctx->push = stratcon_datastore_push;
179   return ctx;
180 }
181 jlog_streamer_ctx_t *
182 stratcon_jlog_streamer_ctx_alloc(void) {
183   jlog_streamer_ctx_t *ctx;
184   ctx = calloc(1, sizeof(*ctx));
185   return ctx;
186 }
187 noit_connection_ctx_t *
188 noit_connection_ctx_alloc(void) {
189   noit_connection_ctx_t *ctx, **pctx;
190   ctx = calloc(1, sizeof(*ctx));
191   ctx->refcnt = 1;
192   pctx = malloc(sizeof(*pctx));
193   *pctx = ctx;
194   pthread_mutex_lock(&noits_lock);
195   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
196   pthread_mutex_unlock(&noits_lock);
197   return ctx;
198 }
199 int
200 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
201                          struct timeval *now) {
202   noit_connection_ctx_t *ctx = closure;
203   ctx->timeout_event = NULL;
204   noit_connection_initiate_connection(closure);
205   return 0;
206 }
207 void
208 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
209                                    struct timeval *now) {
210   struct timeval __now, interval;
211   const char *v;
212   u_int32_t min_interval = 1000, max_interval = 8000;
213   if(ctx->remote_cn) {
214     free(ctx->remote_cn);
215     ctx->remote_cn = NULL;
216   }
217   if(noit_hash_retr_str(ctx->config,
218                         "reconnect_initial_interval",
219                         strlen("reconnect_initial_interval"),
220                         &v)) {
221     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
222   }
223   if(noit_hash_retr_str(ctx->config,
224                         "reconnect_maximum_interval",
225                         strlen("reconnect_maximum_interval"),
226                         &v)) {
227     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
228   }
229   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
230   else {
231     ctx->current_backoff *= 2;
232     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
233     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
234   }
235   if(!now) {
236     gettimeofday(&__now, NULL);
237     now = &__now;
238   }
239   interval.tv_sec = ctx->current_backoff / 1000;
240   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
241   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
242         ctx->current_backoff);
243   if(ctx->timeout_event)
244     eventer_remove(ctx->timeout_event);
245   else
246     ctx->timeout_event = eventer_alloc();
247   ctx->timeout_event->callback = noit_connection_reinitiate;
248   ctx->timeout_event->closure = ctx;
249   ctx->timeout_event->mask = EVENTER_TIMER;
250   add_timeval(*now, interval, &ctx->timeout_event->whence);
251   eventer_add(ctx->timeout_event);
252 }
253 static void
254 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
255   if(ctx->remote_cn) free(ctx->remote_cn);
256   if(ctx->remote_str) free(ctx->remote_str);
257   if(ctx->timeout_event) {
258     eventer_remove(ctx->timeout_event);
259     eventer_free(ctx->timeout_event);
260   }
261   ctx->consumer_free(ctx->consumer_ctx);
262   free(ctx);
263 }
264 void
265 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
266   if(noit_atomic_dec32(&ctx->refcnt) == 0)
267     noit_connection_ctx_free(ctx);
268 }
269 void
270 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
271   noit_connection_ctx_t **pctx = &ctx;
272   pthread_mutex_lock(&noits_lock);
273   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
274                    free, (void (*)(void *))noit_connection_ctx_deref);
275   pthread_mutex_unlock(&noits_lock);
276 }
277 void
278 jlog_streamer_ctx_free(void *cl) {
279   jlog_streamer_ctx_t *ctx = cl;
280   if(ctx->buffer) free(ctx->buffer);
281   free(ctx);
282 }
283
284 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
285 static int
286 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
287   int len, mask;
288   while(ctx->bytes_read < ctx->bytes_expected) {
289     len = Eread(ctx->buffer + ctx->bytes_read,
290                 ctx->bytes_expected - ctx->bytes_read);
291     if(len < 0) {
292       *newmask = mask;
293       return -1;
294     }
295     /* if we get 0 inside SSL, and there was a real error, we
296      * will actually get a -1 here.
297      * if(len == 0) return ctx->bytes_read;
298      */
299     ctx->total_bytes_read += len;
300     ctx->bytes_read += len;
301   }
302   assert(ctx->bytes_read == ctx->bytes_expected);
303   return ctx->bytes_read;
304 }
305 #define FULLREAD(e,ctx,size) do { \
306   int mask, len; \
307   if(!ctx->bytes_expected) { \
308     ctx->bytes_expected = size; \
309     if(ctx->buffer) free(ctx->buffer); \
310     ctx->buffer = malloc(size + 1); \
311     if(ctx->buffer == NULL) { \
312       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
313       goto socket_error; \
314     } \
315     ctx->buffer[size] = '\0'; \
316   } \
317   len = __read_on_ctx(e, ctx, &mask); \
318   if(len < 0) { \
319     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
320     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
321     goto socket_error; \
322   } \
323   ctx->bytes_read = 0; \
324   ctx->bytes_expected = 0; \
325   if(len != size) { \
326     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
327           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
328     goto socket_error; \
329   } \
330 } while(0)
331
332 int
333 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
334                            struct timeval *now) {
335   noit_connection_ctx_t *nctx = closure;
336   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
337   jlog_streamer_ctx_t dummy;
338   int len;
339   jlog_id n_chkpt;
340
341   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
342     if(write(e->fd, e, 0) == -1)
343       noitL(noit_error, "socket error: %s\n", strerror(errno));
344  socket_error:
345     ctx->state = JLOG_STREAMER_WANT_INITIATE;
346     ctx->count = 0;
347     ctx->bytes_read = 0;
348     ctx->bytes_expected = 0;
349     if(ctx->buffer) free(ctx->buffer);
350     ctx->buffer = NULL;
351     noit_connection_schedule_reattempt(nctx, now);
352     eventer_remove_fd(e->fd);
353     nctx->e = NULL;
354     e->opset->close(e->fd, &mask, e);
355     return 0;
356   }
357
358   while(1) {
359     switch(ctx->state) {
360       case JLOG_STREAMER_WANT_INITIATE:
361         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
362                               sizeof(ctx->jlog_feed_cmd),
363                               &mask, e);
364         if(len < 0) {
365           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
366           goto socket_error;
367         }
368         if(len != sizeof(ctx->jlog_feed_cmd)) {
369           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
370                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
371           goto socket_error;
372         }
373         ctx->state = JLOG_STREAMER_WANT_COUNT;
374         break;
375
376       case JLOG_STREAMER_WANT_ERROR:
377         FULLREAD(e, ctx, 0 - ctx->count);
378         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
379               0 - ctx->count, ctx->buffer);
380         free(ctx->buffer); ctx->buffer = NULL;
381         goto socket_error;
382         break;
383
384       case JLOG_STREAMER_WANT_COUNT:
385         FULLREAD(e, ctx, sizeof(u_int32_t));
386         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
387         ctx->count = ntohl(dummy.count);
388         free(ctx->buffer); ctx->buffer = NULL;
389         if(ctx->count < 0)
390           ctx->state = JLOG_STREAMER_WANT_ERROR;
391         else
392           ctx->state = JLOG_STREAMER_WANT_HEADER;
393         break;
394
395       case JLOG_STREAMER_WANT_HEADER:
396         if(ctx->count == 0) {
397           ctx->state = JLOG_STREAMER_WANT_COUNT;
398           break;
399         }
400         FULLREAD(e, ctx, sizeof(ctx->header));
401         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
402         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
403         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
404         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
405         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
406         ctx->header.message_len = ntohl(dummy.header.message_len);
407         free(ctx->buffer); ctx->buffer = NULL;
408         ctx->state = JLOG_STREAMER_WANT_BODY;
409         break;
410
411       case JLOG_STREAMER_WANT_BODY:
412         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
413         if(ctx->header.message_len > 0)
414           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
415                     ctx->buffer, NULL);
416         else if(ctx->buffer)
417           free(ctx->buffer);
418         /* Don't free the buffer, it's used by the datastore process. */
419         ctx->buffer = NULL;
420         ctx->count--;
421         ctx->total_events++;
422         if(ctx->count == 0) {
423           eventer_t completion_e;
424           eventer_remove_fd(e->fd);
425           completion_e = eventer_alloc();
426           memcpy(completion_e, e, sizeof(*e));
427           nctx->e = completion_e;
428           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
429           ctx->state = JLOG_STREAMER_IS_ASYNC;
430           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
431                     NULL, completion_e);
432           noitL(noit_debug, "Pushing batch asynch...\n");
433           return 0;
434         } else
435           ctx->state = JLOG_STREAMER_WANT_HEADER;
436         break;
437
438       case JLOG_STREAMER_IS_ASYNC:
439         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
440       case JLOG_STREAMER_WANT_CHKPT:
441         noitL(noit_debug, "Pushing checkpoint [%s]: [%u/%u]\n",
442               nctx->remote_cn ? nctx->remote_cn : "(null)",
443               ctx->header.chkpt.log, ctx->header.chkpt.marker);
444         n_chkpt.log = htonl(ctx->header.chkpt.log);
445         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
446
447         /* screw short writes.  I'd rather die than not write my data! */
448         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
449                               &mask, e);
450         if(len < 0) {
451           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
452           goto socket_error;
453         }
454         if(len != sizeof(jlog_id)) {
455           noitL(noit_error, "short write on checkpointing stream.\n");
456           goto socket_error;
457         }
458         ctx->state = JLOG_STREAMER_WANT_COUNT;
459         break;
460     }
461   }
462   /* never get here */
463 }
464
465 int
466 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
467                             struct timeval *now) {
468   noit_connection_ctx_t *nctx = closure;
469   int rv;
470
471   rv = eventer_SSL_connect(e, &mask);
472   if(rv > 0) {
473     eventer_ssl_ctx_t *sslctx;
474     e->callback = nctx->consumer_callback;
475     /* We must make a copy of the acceptor_closure_t for each new
476      * connection.
477      */
478     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
479       const char *cn, *end;
480       cn = eventer_ssl_get_peer_subject(sslctx);
481       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
482         cn += 3;
483         end = cn;
484         while(*end && *end != '/') end++;
485         nctx->remote_cn = malloc(end - cn + 1);
486         memcpy(nctx->remote_cn, cn, end - cn);
487         nctx->remote_cn[end-cn] = '\0';
488       }
489     }
490     return e->callback(e, mask, e->closure, now);
491   }
492   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
493
494   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
495   eventer_remove_fd(e->fd);
496   nctx->e = NULL;
497   e->opset->close(e->fd, &mask, e);
498   noit_connection_schedule_reattempt(nctx, now);
499   return 0;
500 }
501 int
502 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
503                                  struct timeval *now) {
504   noit_connection_ctx_t *nctx = closure;
505   const char *cert, *key, *ca, *ciphers;
506   char remote_str[128], tmp_str[128];
507   eventer_ssl_ctx_t *sslctx;
508   int aerrno, len;
509   socklen_t aerrno_len = sizeof(aerrno);
510
511   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
512     if(aerrno != 0) goto connect_error;
513   aerrno = 0;
514
515   if(mask & EVENTER_EXCEPTION) {
516     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
517       aerrno = errno;
518  connect_error:
519     switch(nctx->r.remote.sa_family) {
520       case AF_INET:
521         len = sizeof(struct sockaddr_in);
522         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
523                   tmp_str, len);
524         snprintf(remote_str, sizeof(remote_str), "%s:%d",
525                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
526         break;
527       case AF_INET6:
528         len = sizeof(struct sockaddr_in6);
529         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
530                   tmp_str, len);
531         snprintf(remote_str, sizeof(remote_str), "%s:%d",
532                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
533        break;
534       case AF_UNIX:
535         len = SUN_LEN(&(nctx->r.remote_un));
536         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
537         break;
538       default:
539         snprintf(remote_str, sizeof(remote_str), "(unknown)");
540     }
541     noitL(noit_error, "Error connecting to %s: %s\n",
542           remote_str, strerror(aerrno));
543     eventer_remove_fd(e->fd);
544     nctx->e = NULL;
545     e->opset->close(e->fd, &mask, e);
546     noit_connection_schedule_reattempt(nctx, now);
547     return 0;
548   }
549
550 #define SSLCONFGET(var,name) do { \
551   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
552                          &var)) var = NULL; } while(0)
553   SSLCONFGET(cert, "certificate_file");
554   SSLCONFGET(key, "key_file");
555   SSLCONFGET(ca, "ca_chain");
556   SSLCONFGET(ciphers, "ciphers");
557   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
558   if(!sslctx) goto connect_error;
559
560   memcpy(&nctx->last_connect, now, sizeof(*now));
561   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
562                              nctx->sslconfig);
563   EVENTER_ATTACH_SSL(e, sslctx);
564   e->callback = noit_connection_ssl_upgrade;
565   return e->callback(e, mask, closure, now);
566 }
567 static void
568 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
569   struct timeval __now;
570   eventer_t e;
571   int rv, fd = -1;
572 #ifdef SO_KEEPALIVE
573   int optval;
574   socklen_t optlen = sizeof(optval);
575 #endif
576
577   nctx->e = NULL;
578   if(nctx->wants_permanent_shutdown) {
579     noit_connection_ctx_dealloc(nctx);
580     return;
581   }
582   /* Open a socket */
583   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
584   if(fd < 0) goto reschedule;
585
586   /* Make it non-blocking */
587   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
588 #define set_or_bail(type, opt, val) do { \
589   optval = val; \
590   optlen = sizeof(optval); \
591   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
592     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
593           strerror(errno)); \
594     goto reschedule; \
595   } \
596 } while(0)
597 #ifdef SO_KEEPALIVE
598   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
599 #endif
600 #ifdef TCP_KEEPALIVE_THRESHOLD
601   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
602 #endif
603 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
604   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
605 #endif
606 #ifdef TCP_CONN_NOTIFY_THRESHOLD
607   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
608 #endif
609 #ifdef TCP_CONN_ABORT_THRESHOLD
610   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
611 #endif
612
613   /* Initiate a connection */
614   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
615   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
616
617   /* Register a handler for connection completion */
618   e = eventer_alloc();
619   e->fd = fd;
620   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
621   e->callback = noit_connection_complete_connect;
622   e->closure = nctx;
623   nctx->e = e;
624   eventer_add(e);
625   return;
626
627  reschedule:
628   if(fd >= 0) close(fd);
629   gettimeofday(&__now, NULL);
630   noit_connection_schedule_reattempt(nctx, &__now);
631   return;
632 }
633
634 int
635 initiate_noit_connection(const char *host, unsigned short port,
636                          noit_hash_table *sslconfig, noit_hash_table *config,
637                          eventer_func_t handler, void *closure,
638                          void (*freefunc)(void *)) {
639   noit_connection_ctx_t *ctx;
640
641   int8_t family;
642   int rv;
643   union {
644     struct in_addr addr4;
645     struct in6_addr addr6;
646   } a;
647
648   if(host[0] == '/') {
649     family = AF_UNIX;
650   }
651   else {
652     family = AF_INET;
653     rv = inet_pton(family, host, &a);
654     if(rv != 1) {
655       family = AF_INET6;
656       rv = inet_pton(family, host, &a);
657       if(rv != 1) {
658         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
659         return -1;
660       }
661     }
662   }
663
664   ctx = noit_connection_ctx_alloc();
665   ctx->remote_str = calloc(1, strlen(host) + 7);
666   snprintf(ctx->remote_str, strlen(host) + 7,
667            "%s:%d", host, port);
668  
669   memset(&ctx->r, 0, sizeof(ctx->r));
670   if(family == AF_UNIX) {
671     struct sockaddr_un *s = &ctx->r.remote_un;
672     s->sun_family = AF_UNIX;
673     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
674     ctx->remote_len = sizeof(*s);
675   }
676   else if(family == AF_INET) {
677     struct sockaddr_in *s = &ctx->r.remote_in;
678     s->sin_family = family;
679     s->sin_port = htons(port);
680     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
681     ctx->remote_len = sizeof(*s);
682   }
683   else {
684     struct sockaddr_in6 *s = &ctx->r.remote_in6;
685     s->sin6_family = family;
686     s->sin6_port = htons(port);
687     memcpy(&s->sin6_addr, &a, sizeof(a));
688     ctx->remote_len = sizeof(*s);
689   }
690
691   if(ctx->sslconfig)
692     noit_hash_delete_all(ctx->sslconfig, free, free);
693   else
694     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
695   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
696   if(ctx->config)
697     noit_hash_delete_all(ctx->config, free, free);
698   else
699     ctx->config = calloc(1, sizeof(noit_hash_table));
700   noit_hash_merge_as_dict(ctx->config, config);
701
702   ctx->consumer_callback = handler;
703   ctx->consumer_free = freefunc;
704   ctx->consumer_ctx = closure;
705   noit_connection_initiate_connection(ctx);
706   return 0;
707 }
708
709 void
710 stratcon_streamer_connection(const char *toplevel, const char *destination,
711                              eventer_func_t handler,
712                              void *(*handler_alloc)(void), void *handler_ctx,
713                              void (*handler_free)(void *)) {
714   int i, cnt = 0;
715   noit_conf_section_t *noit_configs;
716   char path[256];
717
718   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
719   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
720   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
721   for(i=0; i<cnt; i++) {
722     char address[256];
723     unsigned short port;
724     int portint;
725     noit_hash_table *sslconfig, *config;
726
727     if(!noit_conf_get_stringbuf(noit_configs[i],
728                                 "ancestor-or-self::node()/@address",
729                                 address, sizeof(address))) {
730       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
731       continue;
732     }
733     /* if destination is specified, exact match it */
734     if(destination && strcmp(address, destination)) continue;
735
736     if(!noit_conf_get_int(noit_configs[i],
737                           "ancestor-or-self::node()/@port", &portint))
738       portint = 0;
739     port = (unsigned short) portint;
740     if(address[0] != '/' && (portint == 0 || (port != portint))) {
741       /* UNIX sockets don't require a port (they'll ignore it if specified */
742       noitL(noit_stderr,
743             "Invalid port [%d] specified in stanza %d\n", port, i+1);
744       continue;
745     }
746     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
747     config = noit_conf_get_hash(noit_configs[i], "config");
748
749     noitL(noit_error, "initiating to %s\n", address);
750     initiate_noit_connection(address, port, sslconfig, config,
751                              handler,
752                              handler_alloc ? handler_alloc() : handler_ctx,
753                              handler_free);
754     noit_hash_destroy(sslconfig,free,free);
755     free(sslconfig);
756     noit_hash_destroy(config,free,free);
757     free(config);
758   }
759   free(noit_configs);
760 }
761 void
762 stratcon_jlog_streamer_reload(const char *toplevel) {
763   stratcon_streamer_connection(toplevel, NULL,
764                                stratcon_jlog_recv_handler,
765                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
766                                NULL,
767                                jlog_streamer_ctx_free);
768 }
769
770 static int
771 stratcon_console_show_noits(noit_console_closure_t ncct,
772                             int argc, char **argv,
773                             noit_console_state_t *dstate,
774                             void *closure) {
775   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
776   uuid_t key_id;
777   int klen, n = 0, i;
778   void *vconn;
779   noit_connection_ctx_t **ctx;
780
781   pthread_mutex_lock(&noits_lock);
782   ctx = malloc(sizeof(*ctx) * noits.size);
783   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
784                        &vconn)) {
785     ctx[n] = (noit_connection_ctx_t *)vconn;
786     noit_atomic_inc32(&ctx[n]->refcnt);
787     n++;
788   }
789   pthread_mutex_unlock(&noits_lock);
790   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
791   for(i=0; i<n; i++) {
792     nc_print_noit_conn_brief(ncct, ctx[i]);
793     noit_connection_ctx_deref(ctx[i]);
794   }
795   free(ctx);
796   return 0;
797 }
798
799 static int
800 rest_show_noits(noit_http_rest_closure_t *restc,
801                 int npats, char **pats) {
802   xmlDocPtr doc;
803   xmlNodePtr root;
804   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
805   uuid_t key_id;
806   int klen, n = 0, i;
807   void *vconn;
808   noit_connection_ctx_t **ctxs;
809   struct timeval now, diff, last;
810   gettimeofday(&now, NULL);
811
812   pthread_mutex_lock(&noits_lock);
813   ctxs = malloc(sizeof(*ctxs) * noits.size);
814   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
815                        &vconn)) {
816     ctxs[n] = (noit_connection_ctx_t *)vconn;
817     noit_atomic_inc32(&ctxs[n]->refcnt);
818     n++;
819   }
820   pthread_mutex_unlock(&noits_lock);
821   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
822
823   doc = xmlNewDoc((xmlChar *)"1.0");
824   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
825   xmlDocSetRootElement(doc, root);
826   for(i=0; i<n; i++) {
827     char buff[256], *feedtype = "unknown", *state = "unknown";
828     xmlNodePtr node;
829     noit_connection_ctx_t *ctx = ctxs[i];
830     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
831
832     node = xmlNewNode(NULL, (xmlChar *)"noit");
833     snprintf(buff, sizeof(buff), "%llu.%06d",
834              (long long unsigned)ctx->last_connect.tv_sec,
835              (int)ctx->last_connect.tv_usec);
836     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
837     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
838                (xmlChar *)"connected" :
839                (ctx->timeout_event ? (xmlChar *)"disconnected" :
840                                     (xmlChar *)"connecting"));
841     if(ctx->e) {
842       char buff[128];
843       const char *addrstr = NULL;
844       struct sockaddr_in6 addr6;
845       socklen_t len = sizeof(addr6);
846       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
847         unsigned short port = 0;
848         if(addr6.sin6_family == AF_INET) {
849           addrstr = inet_ntop(addr6.sin6_family,
850                               &((struct sockaddr_in *)&addr6)->sin_addr,
851                               buff, sizeof(buff));
852           port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
853         }
854         else if(addr6.sin6_family == AF_INET6) {
855           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
856                               buff, sizeof(buff));
857           port = ntohs(addr6.sin6_port);
858         }
859         if(addrstr != NULL) {
860           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
861                    ":%u", port);
862           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
863         }
864       }
865     }
866     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
867     switch(ntohl(jctx->jlog_feed_cmd)) {
868       case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
869       case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
870     }
871     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
872     if(ctx->timeout_event) {
873       sub_timeval(ctx->timeout_event->whence, now, &diff);
874       snprintf(buff, sizeof(buff), "%llu.%06d",
875                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
876       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
877     }
878     else if(ctx->remote_cn) {
879       if(ctx->remote_cn)
880         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
881  
882       switch(jctx->state) {
883         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
884         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
885         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
886         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
887         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
888         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
889         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
890       }
891       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
892       snprintf(buff, sizeof(buff), "%08x:%08x",
893                jctx->header.chkpt.log, jctx->header.chkpt.marker);
894       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
895       snprintf(buff, sizeof(buff), "%llu",
896                (unsigned long long)jctx->total_events);
897       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
898       snprintf(buff, sizeof(buff), "%llu",
899                (unsigned long long)jctx->total_bytes_read);
900       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
901  
902       sub_timeval(now, ctx->last_connect, &diff);
903       snprintf(buff, sizeof(buff), "%lld.%06d",
904                (long long)diff.tv_sec, (int)diff.tv_usec);
905       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
906  
907       if(jctx->header.tv_sec) {
908         last.tv_sec = jctx->header.tv_sec;
909         last.tv_usec = jctx->header.tv_usec;
910         snprintf(buff, sizeof(buff), "%llu.%06d",
911                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
912         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
913         sub_timeval(now, last, &diff);
914         snprintf(buff, sizeof(buff), "%lld.%06d",
915                  (long long)diff.tv_sec, (int)diff.tv_usec);
916         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
917       }
918     }
919
920     xmlAddChild(root, node);
921     noit_connection_ctx_deref(ctx);
922   }
923   free(ctxs);
924
925   noit_http_response_ok(restc->http_ctx, "text/xml");
926   noit_http_response_xml(restc->http_ctx, doc);
927   noit_http_response_end(restc->http_ctx);
928   xmlFreeDoc(doc);
929   return 0;
930 }
931 static int
932 stratcon_add_noit(const char *target, unsigned short port) {
933   int cnt;
934   char path[256];
935   char port_str[6];
936   noit_conf_section_t *noit_configs, parent;
937   xmlNodePtr newnoit;
938
939   snprintf(path, sizeof(path),
940            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
941   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
942   free(noit_configs);
943   if(cnt != 0) return 0;
944
945   parent = noit_conf_get_section(NULL, "//noits");
946   if(!parent) return 0;
947   snprintf(port_str, sizeof(port_str), "%d", port);
948   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
949   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
950   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
951   xmlAddChild(parent, newnoit);
952   stratcon_streamer_connection(NULL, target,
953                                stratcon_jlog_recv_handler,
954                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
955                                NULL,
956                                jlog_streamer_ctx_free);
957   stratcon_streamer_connection(NULL, target,
958                                stratcon_jlog_recv_handler,
959                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
960                                NULL,
961                                jlog_streamer_ctx_free);
962   return 1;
963 }
964 static int
965 stratcon_remove_noit(const char *target, unsigned short port) {
966   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
967   uuid_t key_id;
968   int klen, n = 0, i, cnt = 0;
969   void *vconn;
970   noit_connection_ctx_t **ctx;
971   noit_conf_section_t *noit_configs;
972   char path[256];
973   char remote_str[256];
974
975   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
976
977   snprintf(path, sizeof(path),
978            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
979   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
980   for(i=0; i<cnt; i++) {
981     xmlUnlinkNode(noit_configs[i]);
982     xmlFreeNode(noit_configs[i]);
983   }
984   free(noit_configs);
985
986   pthread_mutex_lock(&noits_lock);
987   ctx = malloc(sizeof(*ctx) * noits.size);
988   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
989                        &vconn)) {
990     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
991       ctx[n] = (noit_connection_ctx_t *)vconn;
992       noit_atomic_inc32(&ctx[n]->refcnt);
993       n++;
994     }
995   }
996   pthread_mutex_unlock(&noits_lock);
997   for(i=0; i<n; i++) {
998     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
999     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1000   }
1001   free(ctx);
1002   return n;
1003 }
1004 static int
1005 rest_set_noit(noit_http_rest_closure_t *restc,
1006               int npats, char **pats) {
1007   noit_http_session_ctx *ctx = restc->http_ctx;
1008   unsigned short port = 43191;
1009   if(npats < 1 || npats > 2)
1010     noit_http_response_server_error(ctx, "text/xml");
1011   if(npats == 2) port = atoi(pats[1]);
1012   if(stratcon_add_noit(pats[0], port))
1013     noit_http_response_ok(ctx, "text/xml");
1014   else
1015     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1016   if(noit_conf_write_file(NULL) != 0)
1017     noitL(noit_error, "local config write failed\n");
1018   noit_conf_mark_changed();
1019   noit_http_response_end(ctx);
1020   return 0;
1021 }
1022 static int
1023 rest_delete_noit(noit_http_rest_closure_t *restc,
1024                  int npats, char **pats) {
1025   noit_http_session_ctx *ctx = restc->http_ctx;
1026   unsigned short port = 43191;
1027   if(npats < 1 || npats > 2)
1028     noit_http_response_server_error(ctx, "text/xml");
1029   if(npats == 2) port = atoi(pats[1]);
1030   if(stratcon_remove_noit(pats[0], port))
1031     noit_http_response_ok(ctx, "text/xml");
1032   else
1033     noit_http_response_not_found(ctx, "text/xml");
1034   if(noit_conf_write_file(NULL) != 0)
1035     noitL(noit_error, "local config write failed\n");
1036   noit_conf_mark_changed();
1037   noit_http_response_end(ctx);
1038   return 0;
1039 }
1040 static int
1041 stratcon_console_conf_noits(noit_console_closure_t ncct,
1042                             int argc, char **argv,
1043                             noit_console_state_t *dstate,
1044                             void *closure) {
1045   char *cp, target[128];
1046   unsigned short port = 43191;
1047   int adding = (int)(vpsized_int)closure;
1048   if(argc != 1)
1049     return -1;
1050
1051   cp = strchr(argv[0], ':');
1052   if(cp) {
1053     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1054     port = atoi(cp+1);
1055   }
1056   else strlcpy(target, argv[0], sizeof(target));
1057   if(adding) {
1058     if(stratcon_add_noit(target, port)) {
1059       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1060     }
1061     else {
1062       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1063     }
1064   }
1065   else {
1066     if(stratcon_remove_noit(target, port)) {
1067       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1068     }
1069     else {
1070       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1071     }
1072   }
1073   return 0;
1074 }
1075
1076 static void
1077 register_console_streamer_commands() {
1078   noit_console_state_t *tl;
1079   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1080
1081   tl = noit_console_state_initial();
1082   showcmd = noit_console_state_get_cmd(tl, "show");
1083   assert(showcmd && showcmd->dstate);
1084   confcmd = noit_console_state_get_cmd(tl, "configure");
1085   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1086   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1087
1088   noit_console_state_add_cmd(conftcmd->dstate,
1089     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1090   noit_console_state_add_cmd(conftnocmd->dstate,
1091     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1092
1093   noit_console_state_add_cmd(showcmd->dstate,
1094     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1095 }
1096
1097 void
1098 stratcon_jlog_streamer_init(const char *toplevel) {
1099   pthread_mutex_init(&noits_lock, NULL);
1100   eventer_name_callback("noit_connection_reinitiate",
1101                         noit_connection_reinitiate);
1102   eventer_name_callback("stratcon_jlog_recv_handler",
1103                         stratcon_jlog_recv_handler);
1104   eventer_name_callback("noit_connection_ssl_upgrade",
1105                         noit_connection_ssl_upgrade);
1106   eventer_name_callback("noit_connection_complete_connect",
1107                         noit_connection_complete_connect);
1108   register_console_streamer_commands();
1109   stratcon_jlog_streamer_reload(toplevel);
1110   assert(noit_http_rest_register_auth(
1111     "GET", "/noits/", "^show$", rest_show_noits,
1112              noit_http_rest_client_cert_auth
1113   ) == 0);
1114   assert(noit_http_rest_register_auth(
1115     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1116              noit_http_rest_client_cert_auth
1117   ) == 0);
1118   assert(noit_http_rest_register_auth(
1119     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1120              noit_http_rest_client_cert_auth
1121   ) == 0);
1122   assert(noit_http_rest_register_auth(
1123     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1124              noit_http_rest_client_cert_auth
1125   ) == 0);
1126   assert(noit_http_rest_register_auth(
1127     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1128              noit_http_rest_client_cert_auth
1129   ) == 0);
1130 }
1131
Note: See TracBrowser for help on using the browser.