root/src/stratcon_jlog_streamer.c

Revision 3862f7ec50e174f1457653a8555eb245145913f9, 42.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

create better output for configured noits in the /noits/show xml REST call, refs #248

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "noit_rest.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_jlog_streamer.h"
42 #include "stratcon_iep.h"
43
44 #include <unistd.h>
45 #include <assert.h>
46 #include <errno.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #ifdef HAVE_SYS_FILIO_H
50 #include <sys/filio.h>
51 #endif
52 #include <netinet/in.h>
53 #include <sys/un.h>
54 #include <arpa/inet.h>
55
56 pthread_mutex_t noits_lock;
57 noit_hash_table noits = NOIT_HASH_EMPTY;
58
59 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
60
61 static const char *feed_type_to_str(int jlog_feed_cmd) {
62   switch(jlog_feed_cmd) {
63     case NOIT_JLOG_DATA_FEED: return "durable/storage";
64     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
65   }
66   return "unknown";
67 }
68
69 static int
70 remote_str_sort(const void *a, const void *b) {
71   int rv;
72   noit_connection_ctx_t * const *actx = a;
73   noit_connection_ctx_t * const *bctx = b;
74   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
75   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
76   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
77   if(rv) return rv;
78   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
79            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
80 }
81 static void
82 nc_print_noit_conn_brief(noit_console_closure_t ncct,
83                           noit_connection_ctx_t *ctx) {
84   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
85   struct timeval now, diff, session_duration;
86   const char *feedtype = "unknown";
87   const char *lasttime = "never";
88   if(ctx->last_connect.tv_sec != 0) {
89     char cmdbuf[4096];
90     time_t r = ctx->last_connect.tv_sec;
91     struct tm tbuf, *tm;
92     tm = gmtime_r(&r, &tbuf);
93     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
94     lasttime = cmdbuf;
95   }
96   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
97             ctx->remote_cn ? "connected" :
98                              (ctx->timeout_event ? "disconnected" :
99                                                    "connecting"), lasttime);
100   if(ctx->e) {
101     char buff[128];
102     const char *addrstr = NULL;
103     struct sockaddr_in6 addr6;
104     socklen_t len = sizeof(addr6);
105     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
106       unsigned short port = 0;
107       if(addr6.sin6_family == AF_INET) {
108         addrstr = inet_ntop(addr6.sin6_family,
109                             &((struct sockaddr_in *)&addr6)->sin_addr,
110                             buff, sizeof(buff));
111         port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
112       }
113       else if(addr6.sin6_family == AF_INET6) {
114         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
115                             buff, sizeof(buff));
116         port = ntohs(addr6.sin6_port);
117       }
118       if(addrstr != NULL)
119         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
120       else
121         nc_printf(ncct, "\tLocal address not interpretable\n");
122     }
123     else {
124       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
125                 ctx->e->fd, strerror(errno));
126     }
127   }
128   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
129   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
130   gettimeofday(&now, NULL);
131   if(ctx->timeout_event) {
132     sub_timeval(ctx->timeout_event->whence, now, &diff);
133     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
134               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
135   }
136   else if(ctx->remote_cn) {
137     nc_printf(ncct, "\tRemote CN: '%s'\n",
138               ctx->remote_cn ? ctx->remote_cn : "???");
139     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
140       struct timeval last;
141       double session_duration_seconds;
142       const char *state = "unknown";
143
144       switch(jctx->state) {
145         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
146         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
147         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
148         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
149         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
150         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
151         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
152       }
153       last.tv_sec = jctx->header.tv_sec;
154       last.tv_usec = jctx->header.tv_usec;
155       sub_timeval(now, last, &diff);
156       sub_timeval(now, ctx->last_connect, &session_duration);
157       session_duration_seconds = session_duration.tv_sec +
158                                  (double)session_duration.tv_usec/1000000.0;
159       nc_printf(ncct, "\tState: %s\n"
160                       "\tNext checkpoint: [%08x:%08x]\n"
161                       "\tLast event: %lld.%06us ago\n"
162                       "\tEvents this session: %llu (%0.2f/s)\n"
163                       "\tOctets this session: %llu (%0.2f/s)\n",
164                 state,
165                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
166                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
167                 jctx->total_events,
168                 (double)jctx->total_events/session_duration_seconds,
169                 jctx->total_bytes_read,
170                 (double)jctx->total_bytes_read/session_duration_seconds);
171     }
172     else {
173       nc_printf(ncct, "\tUnknown type.\n");
174     }
175   }
176 }
177
178 jlog_streamer_ctx_t *
179 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
180   jlog_streamer_ctx_t *ctx;
181   ctx = stratcon_jlog_streamer_ctx_alloc();
182   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
183   ctx->push = stratcon_datastore_push;
184   return ctx;
185 }
186 jlog_streamer_ctx_t *
187 stratcon_jlog_streamer_ctx_alloc(void) {
188   jlog_streamer_ctx_t *ctx;
189   ctx = calloc(1, sizeof(*ctx));
190   return ctx;
191 }
192 noit_connection_ctx_t *
193 noit_connection_ctx_alloc(void) {
194   noit_connection_ctx_t *ctx, **pctx;
195   ctx = calloc(1, sizeof(*ctx));
196   ctx->refcnt = 1;
197   pctx = malloc(sizeof(*pctx));
198   *pctx = ctx;
199   pthread_mutex_lock(&noits_lock);
200   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
201   pthread_mutex_unlock(&noits_lock);
202   return ctx;
203 }
204 int
205 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
206                          struct timeval *now) {
207   noit_connection_ctx_t *ctx = closure;
208   ctx->timeout_event = NULL;
209   noit_connection_initiate_connection(closure);
210   return 0;
211 }
212 void
213 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
214                                    struct timeval *now) {
215   struct timeval __now, interval;
216   const char *v;
217   u_int32_t min_interval = 1000, max_interval = 8000;
218   if(ctx->remote_cn) {
219     free(ctx->remote_cn);
220     ctx->remote_cn = NULL;
221   }
222   if(noit_hash_retr_str(ctx->config,
223                         "reconnect_initial_interval",
224                         strlen("reconnect_initial_interval"),
225                         &v)) {
226     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
227   }
228   if(noit_hash_retr_str(ctx->config,
229                         "reconnect_maximum_interval",
230                         strlen("reconnect_maximum_interval"),
231                         &v)) {
232     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
233   }
234   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
235   else {
236     ctx->current_backoff *= 2;
237     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
238     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
239   }
240   if(!now) {
241     gettimeofday(&__now, NULL);
242     now = &__now;
243   }
244   interval.tv_sec = ctx->current_backoff / 1000;
245   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
246   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
247         ctx->current_backoff);
248   if(ctx->timeout_event)
249     eventer_remove(ctx->timeout_event);
250   else
251     ctx->timeout_event = eventer_alloc();
252   ctx->timeout_event->callback = noit_connection_reinitiate;
253   ctx->timeout_event->closure = ctx;
254   ctx->timeout_event->mask = EVENTER_TIMER;
255   add_timeval(*now, interval, &ctx->timeout_event->whence);
256   eventer_add(ctx->timeout_event);
257 }
258 static void
259 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
260   if(ctx->remote_cn) free(ctx->remote_cn);
261   if(ctx->remote_str) free(ctx->remote_str);
262   if(ctx->timeout_event) {
263     eventer_remove(ctx->timeout_event);
264     eventer_free(ctx->timeout_event);
265   }
266   ctx->consumer_free(ctx->consumer_ctx);
267   free(ctx);
268 }
269 void
270 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
271   if(noit_atomic_dec32(&ctx->refcnt) == 0)
272     noit_connection_ctx_free(ctx);
273 }
274 void
275 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
276   noit_connection_ctx_t **pctx = &ctx;
277   pthread_mutex_lock(&noits_lock);
278   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
279                    free, (void (*)(void *))noit_connection_ctx_deref);
280   pthread_mutex_unlock(&noits_lock);
281 }
282 void
283 jlog_streamer_ctx_free(void *cl) {
284   jlog_streamer_ctx_t *ctx = cl;
285   if(ctx->buffer) free(ctx->buffer);
286   free(ctx);
287 }
288
289 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
290 static int
291 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
292   int len, mask;
293   while(ctx->bytes_read < ctx->bytes_expected) {
294     len = Eread(ctx->buffer + ctx->bytes_read,
295                 ctx->bytes_expected - ctx->bytes_read);
296     if(len < 0) {
297       *newmask = mask;
298       return -1;
299     }
300     /* if we get 0 inside SSL, and there was a real error, we
301      * will actually get a -1 here.
302      * if(len == 0) return ctx->bytes_read;
303      */
304     ctx->total_bytes_read += len;
305     ctx->bytes_read += len;
306   }
307   assert(ctx->bytes_read == ctx->bytes_expected);
308   return ctx->bytes_read;
309 }
310 #define FULLREAD(e,ctx,size) do { \
311   int mask, len; \
312   if(!ctx->bytes_expected) { \
313     ctx->bytes_expected = size; \
314     if(ctx->buffer) free(ctx->buffer); \
315     ctx->buffer = malloc(size + 1); \
316     if(ctx->buffer == NULL) { \
317       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
318       goto socket_error; \
319     } \
320     ctx->buffer[size] = '\0'; \
321   } \
322   len = __read_on_ctx(e, ctx, &mask); \
323   if(len < 0) { \
324     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
325     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
326     goto socket_error; \
327   } \
328   ctx->bytes_read = 0; \
329   ctx->bytes_expected = 0; \
330   if(len != size) { \
331     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
332           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
333     goto socket_error; \
334   } \
335 } while(0)
336
337 int
338 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
339                            struct timeval *now) {
340   noit_connection_ctx_t *nctx = closure;
341   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
342   jlog_streamer_ctx_t dummy;
343   int len;
344   jlog_id n_chkpt;
345
346   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
347     if(write(e->fd, e, 0) == -1)
348       noitL(noit_error, "socket error: %s\n", strerror(errno));
349  socket_error:
350     ctx->state = JLOG_STREAMER_WANT_INITIATE;
351     ctx->count = 0;
352     ctx->bytes_read = 0;
353     ctx->bytes_expected = 0;
354     if(ctx->buffer) free(ctx->buffer);
355     ctx->buffer = NULL;
356     noit_connection_schedule_reattempt(nctx, now);
357     eventer_remove_fd(e->fd);
358     nctx->e = NULL;
359     e->opset->close(e->fd, &mask, e);
360     return 0;
361   }
362
363   while(1) {
364     switch(ctx->state) {
365       case JLOG_STREAMER_WANT_INITIATE:
366         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
367                               sizeof(ctx->jlog_feed_cmd),
368                               &mask, e);
369         if(len < 0) {
370           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
371           goto socket_error;
372         }
373         if(len != sizeof(ctx->jlog_feed_cmd)) {
374           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
375                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
376           goto socket_error;
377         }
378         ctx->state = JLOG_STREAMER_WANT_COUNT;
379         break;
380
381       case JLOG_STREAMER_WANT_ERROR:
382         FULLREAD(e, ctx, 0 - ctx->count);
383         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
384               0 - ctx->count, ctx->buffer);
385         free(ctx->buffer); ctx->buffer = NULL;
386         goto socket_error;
387         break;
388
389       case JLOG_STREAMER_WANT_COUNT:
390         FULLREAD(e, ctx, sizeof(u_int32_t));
391         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
392         ctx->count = ntohl(dummy.count);
393         free(ctx->buffer); ctx->buffer = NULL;
394         if(ctx->count < 0)
395           ctx->state = JLOG_STREAMER_WANT_ERROR;
396         else
397           ctx->state = JLOG_STREAMER_WANT_HEADER;
398         break;
399
400       case JLOG_STREAMER_WANT_HEADER:
401         if(ctx->count == 0) {
402           ctx->state = JLOG_STREAMER_WANT_COUNT;
403           break;
404         }
405         FULLREAD(e, ctx, sizeof(ctx->header));
406         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
407         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
408         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
409         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
410         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
411         ctx->header.message_len = ntohl(dummy.header.message_len);
412         free(ctx->buffer); ctx->buffer = NULL;
413         ctx->state = JLOG_STREAMER_WANT_BODY;
414         break;
415
416       case JLOG_STREAMER_WANT_BODY:
417         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
418         if(ctx->header.message_len > 0)
419           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
420                     ctx->buffer, NULL);
421         else if(ctx->buffer)
422           free(ctx->buffer);
423         /* Don't free the buffer, it's used by the datastore process. */
424         ctx->buffer = NULL;
425         ctx->count--;
426         ctx->total_events++;
427         if(ctx->count == 0) {
428           eventer_t completion_e;
429           eventer_remove_fd(e->fd);
430           completion_e = eventer_alloc();
431           memcpy(completion_e, e, sizeof(*e));
432           nctx->e = completion_e;
433           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
434           ctx->state = JLOG_STREAMER_IS_ASYNC;
435           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
436                     NULL, completion_e);
437           noitL(noit_debug, "Pushing %s batch async [%s]: [%u/%u]\n",
438                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
439                 nctx->remote_cn ? nctx->remote_cn : "(null)",
440                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
441           return 0;
442         } else
443           ctx->state = JLOG_STREAMER_WANT_HEADER;
444         break;
445
446       case JLOG_STREAMER_IS_ASYNC:
447         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
448       case JLOG_STREAMER_WANT_CHKPT:
449         noitL(noit_debug, "Pushing %s checkpoint [%s]: [%u/%u]\n",
450               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
451               nctx->remote_cn ? nctx->remote_cn : "(null)",
452               ctx->header.chkpt.log, ctx->header.chkpt.marker);
453         n_chkpt.log = htonl(ctx->header.chkpt.log);
454         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
455
456         /* screw short writes.  I'd rather die than not write my data! */
457         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
458                               &mask, e);
459         if(len < 0) {
460           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
461           goto socket_error;
462         }
463         if(len != sizeof(jlog_id)) {
464           noitL(noit_error, "short write on checkpointing stream.\n");
465           goto socket_error;
466         }
467         ctx->state = JLOG_STREAMER_WANT_COUNT;
468         break;
469     }
470   }
471   /* never get here */
472 }
473
474 int
475 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
476                             struct timeval *now) {
477   noit_connection_ctx_t *nctx = closure;
478   int rv;
479   const char *error = "jlog streamer SSL upgrade failed.\n";
480
481   rv = eventer_SSL_connect(e, &mask);
482   if(rv > 0) {
483     eventer_ssl_ctx_t *sslctx;
484     e->callback = nctx->consumer_callback;
485     /* We must make a copy of the acceptor_closure_t for each new
486      * connection.
487      */
488     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
489       const char *cn, *end;
490       void *vcn;
491       cn = eventer_ssl_get_peer_subject(sslctx);
492       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
493         cn += 3;
494         end = cn;
495         while(*end && *end != '/') end++;
496         nctx->remote_cn = malloc(end - cn + 1);
497         memcpy(nctx->remote_cn, cn, end - cn);
498         nctx->remote_cn[end-cn] = '\0';
499       }
500       if(nctx->config &&
501          noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) {
502         const char *cn_expected = vcn;
503         if(!nctx->remote_cn ||
504            strcmp(nctx->remote_cn, cn_expected)) {
505           error = "jlog connect CN mismatch\n";
506           goto error;
507         }
508       }
509     }
510     return e->callback(e, mask, e->closure, now);
511   }
512   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
513
514  error:
515   noitL(noit_error, error);
516   eventer_remove_fd(e->fd);
517   nctx->e = NULL;
518   e->opset->close(e->fd, &mask, e);
519   noit_connection_schedule_reattempt(nctx, now);
520   return 0;
521 }
522 int
523 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
524                                  struct timeval *now) {
525   noit_connection_ctx_t *nctx = closure;
526   const char *cert, *key, *ca, *ciphers, *crl = NULL;
527   char remote_str[128], tmp_str[128];
528   eventer_ssl_ctx_t *sslctx;
529   int aerrno, len;
530   socklen_t aerrno_len = sizeof(aerrno);
531
532   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
533     if(aerrno != 0) goto connect_error;
534   aerrno = 0;
535
536   if(mask & EVENTER_EXCEPTION) {
537     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
538       aerrno = errno;
539  connect_error:
540     switch(nctx->r.remote.sa_family) {
541       case AF_INET:
542         len = sizeof(struct sockaddr_in);
543         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
544                   tmp_str, len);
545         snprintf(remote_str, sizeof(remote_str), "%s:%d",
546                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
547         break;
548       case AF_INET6:
549         len = sizeof(struct sockaddr_in6);
550         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
551                   tmp_str, len);
552         snprintf(remote_str, sizeof(remote_str), "%s:%d",
553                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
554        break;
555       case AF_UNIX:
556         len = SUN_LEN(&(nctx->r.remote_un));
557         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
558         break;
559       default:
560         snprintf(remote_str, sizeof(remote_str), "(unknown)");
561     }
562     noitL(noit_error, "Error connecting to %s: %s\n",
563           remote_str, strerror(aerrno));
564     eventer_remove_fd(e->fd);
565     nctx->e = NULL;
566     e->opset->close(e->fd, &mask, e);
567     noit_connection_schedule_reattempt(nctx, now);
568     return 0;
569   }
570
571 #define SSLCONFGET(var,name) do { \
572   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
573                          &var)) var = NULL; } while(0)
574   SSLCONFGET(cert, "certificate_file");
575   SSLCONFGET(key, "key_file");
576   SSLCONFGET(ca, "ca_chain");
577   SSLCONFGET(ciphers, "ciphers");
578   SSLCONFGET(crl, "crl");
579   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
580   if(!sslctx) goto connect_error;
581   if(crl) {
582     if(!eventer_ssl_use_crl(sslctx, crl)) {
583       noitL(noit_error, "Failed to load CRL from %s\n", crl);
584       eventer_ssl_ctx_free(sslctx);
585       goto connect_error;
586     }
587   }
588
589   memcpy(&nctx->last_connect, now, sizeof(*now));
590   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
591                              nctx->sslconfig);
592   EVENTER_ATTACH_SSL(e, sslctx);
593   e->callback = noit_connection_ssl_upgrade;
594   return e->callback(e, mask, closure, now);
595 }
596 static void
597 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
598   struct timeval __now;
599   eventer_t e;
600   int rv, fd = -1;
601 #ifdef SO_KEEPALIVE
602   int optval;
603   socklen_t optlen = sizeof(optval);
604 #endif
605
606   nctx->e = NULL;
607   if(nctx->wants_permanent_shutdown) {
608     noit_connection_ctx_dealloc(nctx);
609     return;
610   }
611   /* Open a socket */
612   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
613   if(fd < 0) goto reschedule;
614
615   /* Make it non-blocking */
616   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
617 #define set_or_bail(type, opt, val) do { \
618   optval = val; \
619   optlen = sizeof(optval); \
620   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
621     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
622           strerror(errno)); \
623     goto reschedule; \
624   } \
625 } while(0)
626 #ifdef SO_KEEPALIVE
627   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
628 #endif
629 #ifdef TCP_KEEPALIVE_THRESHOLD
630   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
631 #endif
632 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
633   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
634 #endif
635 #ifdef TCP_CONN_NOTIFY_THRESHOLD
636   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
637 #endif
638 #ifdef TCP_CONN_ABORT_THRESHOLD
639   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
640 #endif
641
642   /* Initiate a connection */
643   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
644   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
645
646   /* Register a handler for connection completion */
647   e = eventer_alloc();
648   e->fd = fd;
649   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
650   e->callback = noit_connection_complete_connect;
651   e->closure = nctx;
652   nctx->e = e;
653   eventer_add(e);
654   return;
655
656  reschedule:
657   if(fd >= 0) close(fd);
658   gettimeofday(&__now, NULL);
659   noit_connection_schedule_reattempt(nctx, &__now);
660   return;
661 }
662
663 int
664 initiate_noit_connection(const char *host, unsigned short port,
665                          noit_hash_table *sslconfig, noit_hash_table *config,
666                          eventer_func_t handler, void *closure,
667                          void (*freefunc)(void *)) {
668   noit_connection_ctx_t *ctx;
669
670   int8_t family;
671   int rv;
672   union {
673     struct in_addr addr4;
674     struct in6_addr addr6;
675   } a;
676
677   if(host[0] == '/') {
678     family = AF_UNIX;
679   }
680   else {
681     family = AF_INET;
682     rv = inet_pton(family, host, &a);
683     if(rv != 1) {
684       family = AF_INET6;
685       rv = inet_pton(family, host, &a);
686       if(rv != 1) {
687         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
688         return -1;
689       }
690     }
691   }
692
693   ctx = noit_connection_ctx_alloc();
694   ctx->remote_str = calloc(1, strlen(host) + 7);
695   snprintf(ctx->remote_str, strlen(host) + 7,
696            "%s:%d", host, port);
697  
698   memset(&ctx->r, 0, sizeof(ctx->r));
699   if(family == AF_UNIX) {
700     struct sockaddr_un *s = &ctx->r.remote_un;
701     s->sun_family = AF_UNIX;
702     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
703     ctx->remote_len = sizeof(*s);
704   }
705   else if(family == AF_INET) {
706     struct sockaddr_in *s = &ctx->r.remote_in;
707     s->sin_family = family;
708     s->sin_port = htons(port);
709     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
710     ctx->remote_len = sizeof(*s);
711   }
712   else {
713     struct sockaddr_in6 *s = &ctx->r.remote_in6;
714     s->sin6_family = family;
715     s->sin6_port = htons(port);
716     memcpy(&s->sin6_addr, &a, sizeof(a));
717     ctx->remote_len = sizeof(*s);
718   }
719
720   if(ctx->sslconfig)
721     noit_hash_delete_all(ctx->sslconfig, free, free);
722   else
723     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
724   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
725   if(ctx->config)
726     noit_hash_delete_all(ctx->config, free, free);
727   else
728     ctx->config = calloc(1, sizeof(noit_hash_table));
729   noit_hash_merge_as_dict(ctx->config, config);
730
731   ctx->consumer_callback = handler;
732   ctx->consumer_free = freefunc;
733   ctx->consumer_ctx = closure;
734   noit_connection_initiate_connection(ctx);
735   return 0;
736 }
737
738 void
739 stratcon_streamer_connection(const char *toplevel, const char *destination,
740                              eventer_func_t handler,
741                              void *(*handler_alloc)(void), void *handler_ctx,
742                              void (*handler_free)(void *)) {
743   int i, cnt = 0;
744   noit_conf_section_t *noit_configs;
745   char path[256];
746
747   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
748   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
749   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
750   for(i=0; i<cnt; i++) {
751     char address[256];
752     unsigned short port;
753     int portint;
754     noit_hash_table *sslconfig, *config;
755
756     if(!noit_conf_get_stringbuf(noit_configs[i],
757                                 "ancestor-or-self::node()/@address",
758                                 address, sizeof(address))) {
759       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
760       continue;
761     }
762     /* if destination is specified, exact match it */
763     if(destination && strcmp(address, destination)) continue;
764
765     if(!noit_conf_get_int(noit_configs[i],
766                           "ancestor-or-self::node()/@port", &portint))
767       portint = 0;
768     port = (unsigned short) portint;
769     if(address[0] != '/' && (portint == 0 || (port != portint))) {
770       /* UNIX sockets don't require a port (they'll ignore it if specified */
771       noitL(noit_stderr,
772             "Invalid port [%d] specified in stanza %d\n", port, i+1);
773       continue;
774     }
775     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
776     config = noit_conf_get_hash(noit_configs[i], "config");
777
778     noitL(noit_error, "initiating to %s\n", address);
779     initiate_noit_connection(address, port, sslconfig, config,
780                              handler,
781                              handler_alloc ? handler_alloc() : handler_ctx,
782                              handler_free);
783     noit_hash_destroy(sslconfig,free,free);
784     free(sslconfig);
785     noit_hash_destroy(config,free,free);
786     free(config);
787   }
788   free(noit_configs);
789 }
790 void
791 stratcon_jlog_streamer_reload(const char *toplevel) {
792   if(!stratcon_datastore_get_enabled()) return;
793   stratcon_streamer_connection(toplevel, NULL,
794                                stratcon_jlog_recv_handler,
795                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
796                                NULL,
797                                jlog_streamer_ctx_free);
798 }
799
800 static int
801 stratcon_console_show_noits(noit_console_closure_t ncct,
802                             int argc, char **argv,
803                             noit_console_state_t *dstate,
804                             void *closure) {
805   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
806   void *key_id;
807   int klen, n = 0, i;
808   void *vconn;
809   noit_connection_ctx_t **ctx;
810
811   pthread_mutex_lock(&noits_lock);
812   ctx = malloc(sizeof(*ctx) * noits.size);
813   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
814                        &vconn)) {
815     ctx[n] = (noit_connection_ctx_t *)vconn;
816     noit_atomic_inc32(&ctx[n]->refcnt);
817     n++;
818   }
819   pthread_mutex_unlock(&noits_lock);
820   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
821   for(i=0; i<n; i++) {
822     nc_print_noit_conn_brief(ncct, ctx[i]);
823     noit_connection_ctx_deref(ctx[i]);
824   }
825   free(ctx);
826   return 0;
827 }
828
829 static int
830 rest_show_noits(noit_http_rest_closure_t *restc,
831                 int npats, char **pats) {
832   xmlDocPtr doc;
833   xmlNodePtr root;
834   noit_hash_table seen = NOIT_HASH_EMPTY;
835   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
836   char path[256];
837   void *key_id, *vtype;
838   const char *type = NULL;
839   int klen, n = 0, i, di, cnt;
840   void *vconn;
841   noit_connection_ctx_t **ctxs;
842   noit_conf_section_t *noit_configs;
843   struct timeval now, diff, last;
844   xmlNodePtr node;
845
846   noit_http_process_querystring(&restc->http_ctx->req);
847   if(noit_hash_retrieve(&restc->http_ctx->req.querystring, "type", 4, &vtype))
848     type = vtype;
849
850   gettimeofday(&now, NULL);
851
852   pthread_mutex_lock(&noits_lock);
853   ctxs = malloc(sizeof(*ctxs) * noits.size);
854   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
855                        &vconn)) {
856     ctxs[n] = (noit_connection_ctx_t *)vconn;
857     noit_atomic_inc32(&ctxs[n]->refcnt);
858     n++;
859   }
860   pthread_mutex_unlock(&noits_lock);
861   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
862
863   doc = xmlNewDoc((xmlChar *)"1.0");
864   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
865   xmlDocSetRootElement(doc, root);
866
867   for(i=0; i<n; i++) {
868     char buff[256];
869     const char *feedtype = "unknown", *state = "unknown";
870     noit_connection_ctx_t *ctx = ctxs[i];
871     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
872
873     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
874
875     /* If the user requested a specific type and we're not it, skip. */
876     if(type && strcmp(feedtype, type)) continue;
877
878     node = xmlNewNode(NULL, (xmlChar *)"noit");
879     snprintf(buff, sizeof(buff), "%llu.%06d",
880              (long long unsigned)ctx->last_connect.tv_sec,
881              (int)ctx->last_connect.tv_usec);
882     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
883     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
884                (xmlChar *)"connected" :
885                (ctx->timeout_event ? (xmlChar *)"disconnected" :
886                                     (xmlChar *)"connecting"));
887     if(ctx->e) {
888       char buff[128];
889       const char *addrstr = NULL;
890       struct sockaddr_in6 addr6;
891       socklen_t len = sizeof(addr6);
892       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
893         unsigned short port = 0;
894         if(addr6.sin6_family == AF_INET) {
895           addrstr = inet_ntop(addr6.sin6_family,
896                               &((struct sockaddr_in *)&addr6)->sin_addr,
897                               buff, sizeof(buff));
898           port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
899         }
900         else if(addr6.sin6_family == AF_INET6) {
901           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
902                               buff, sizeof(buff));
903           port = ntohs(addr6.sin6_port);
904         }
905         if(addrstr != NULL) {
906           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
907                    ":%u", port);
908           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
909         }
910       }
911     }
912     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
913                       0, free, NULL);
914     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
915     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
916     if(ctx->timeout_event) {
917       sub_timeval(ctx->timeout_event->whence, now, &diff);
918       snprintf(buff, sizeof(buff), "%llu.%06d",
919                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
920       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
921     }
922     else if(ctx->remote_cn) {
923       if(ctx->remote_cn)
924         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
925  
926       switch(jctx->state) {
927         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
928         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
929         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
930         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
931         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
932         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
933         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
934       }
935       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
936       snprintf(buff, sizeof(buff), "%08x:%08x",
937                jctx->header.chkpt.log, jctx->header.chkpt.marker);
938       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
939       snprintf(buff, sizeof(buff), "%llu",
940                (unsigned long long)jctx->total_events);
941       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
942       snprintf(buff, sizeof(buff), "%llu",
943                (unsigned long long)jctx->total_bytes_read);
944       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
945  
946       sub_timeval(now, ctx->last_connect, &diff);
947       snprintf(buff, sizeof(buff), "%lld.%06d",
948                (long long)diff.tv_sec, (int)diff.tv_usec);
949       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
950  
951       if(jctx->header.tv_sec) {
952         last.tv_sec = jctx->header.tv_sec;
953         last.tv_usec = jctx->header.tv_usec;
954         snprintf(buff, sizeof(buff), "%llu.%06d",
955                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
956         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
957         sub_timeval(now, last, &diff);
958         snprintf(buff, sizeof(buff), "%lld.%06d",
959                  (long long)diff.tv_sec, (int)diff.tv_usec);
960         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
961       }
962     }
963
964     xmlAddChild(root, node);
965     noit_connection_ctx_deref(ctx);
966   }
967   free(ctxs);
968
969   if(type && !strcmp(type, "configured")) {
970     snprintf(path, sizeof(path), "//noits//noit");
971     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
972     for(di=0; di<cnt; di++) {
973       char address[64], port_str[32], remote_str[98];
974       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
975                                  address, sizeof(address))) {
976         void *v;
977         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
978                                    port_str, sizeof(port_str)))
979           strlcpy(port_str, "43191", sizeof(port_str));
980         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
981         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
982           char expected_cn[256];
983           node = xmlNewNode(NULL, (xmlChar *)"noit");
984           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
985           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
986           if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
987                                      expected_cn, sizeof(expected_cn)))
988             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
989           xmlAddChild(root, node);
990         }
991       }
992     }
993     free(noit_configs);
994   }
995   noit_hash_destroy(&seen, free, NULL);
996
997   noit_http_response_ok(restc->http_ctx, "text/xml");
998   noit_http_response_xml(restc->http_ctx, doc);
999   noit_http_response_end(restc->http_ctx);
1000   xmlFreeDoc(doc);
1001   return 0;
1002 }
1003 static int
1004 stratcon_add_noit(const char *target, unsigned short port,
1005                   const char *cn) {
1006   int cnt;
1007   char path[256];
1008   char port_str[6];
1009   noit_conf_section_t *noit_configs, parent;
1010   xmlNodePtr newnoit, config, cnnode;
1011
1012   snprintf(path, sizeof(path),
1013            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1014   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1015   free(noit_configs);
1016   if(cnt != 0) return -1;
1017
1018   parent = noit_conf_get_section(NULL, "//noits");
1019   if(!parent) return -1;
1020   snprintf(port_str, sizeof(port_str), "%d", port);
1021   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1022   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1023   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1024   xmlAddChild(parent, newnoit);
1025   if(cn) {
1026     config = xmlNewNode(NULL, (xmlChar *)"config");
1027     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1028     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1029     xmlAddChild(config, cnnode);
1030     xmlAddChild(newnoit, config);
1031   }
1032   if(stratcon_datastore_get_enabled())
1033     stratcon_streamer_connection(NULL, target,
1034                                  stratcon_jlog_recv_handler,
1035                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1036                                  NULL,
1037                                  jlog_streamer_ctx_free);
1038   if(stratcon_iep_get_enabled())
1039     stratcon_streamer_connection(NULL, target,
1040                                  stratcon_jlog_recv_handler,
1041                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1042                                  NULL,
1043                                  jlog_streamer_ctx_free);
1044   return 1;
1045 }
1046 static int
1047 stratcon_remove_noit(const char *target, unsigned short port) {
1048   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1049   void *key_id;
1050   int klen, n = -1, i, cnt = 0;
1051   void *vconn;
1052   noit_connection_ctx_t **ctx;
1053   noit_conf_section_t *noit_configs;
1054   char path[256];
1055   char remote_str[256];
1056
1057   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1058
1059   snprintf(path, sizeof(path),
1060            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1061   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1062   for(i=0; i<cnt; i++) {
1063     xmlUnlinkNode(noit_configs[i]);
1064     xmlFreeNode(noit_configs[i]);
1065     n = 0;
1066   }
1067   free(noit_configs);
1068
1069   pthread_mutex_lock(&noits_lock);
1070   ctx = malloc(sizeof(*ctx) * noits.size);
1071   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
1072                        &vconn)) {
1073     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1074       ctx[n] = (noit_connection_ctx_t *)vconn;
1075       noit_atomic_inc32(&ctx[n]->refcnt);
1076       n++;
1077     }
1078   }
1079   pthread_mutex_unlock(&noits_lock);
1080   for(i=0; i<n; i++) {
1081     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1082     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1083   }
1084   free(ctx);
1085   return n;
1086 }
1087 static int
1088 rest_set_noit(noit_http_rest_closure_t *restc,
1089               int npats, char **pats) {
1090   void *vcn;
1091   const char *cn = NULL;
1092   noit_http_session_ctx *ctx = restc->http_ctx;
1093   unsigned short port = 43191;
1094   if(npats < 1 || npats > 2)
1095     noit_http_response_server_error(ctx, "text/xml");
1096   if(npats == 2) port = atoi(pats[1]);
1097   noit_http_process_querystring(&ctx->req);
1098   if(noit_hash_retrieve(&ctx->req.querystring, "cn", 2, &vcn))
1099     cn = vcn;
1100   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1101     noit_http_response_ok(ctx, "text/xml");
1102   else
1103     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1104   if(noit_conf_write_file(NULL) != 0)
1105     noitL(noit_error, "local config write failed\n");
1106   noit_conf_mark_changed();
1107   noit_http_response_end(ctx);
1108   return 0;
1109 }
1110 static int
1111 rest_delete_noit(noit_http_rest_closure_t *restc,
1112                  int npats, char **pats) {
1113   noit_http_session_ctx *ctx = restc->http_ctx;
1114   unsigned short port = 43191;
1115   if(npats < 1 || npats > 2)
1116     noit_http_response_server_error(ctx, "text/xml");
1117   if(npats == 2) port = atoi(pats[1]);
1118   if(stratcon_remove_noit(pats[0], port) >= 0)
1119     noit_http_response_ok(ctx, "text/xml");
1120   else
1121     noit_http_response_not_found(ctx, "text/xml");
1122   if(noit_conf_write_file(NULL) != 0)
1123     noitL(noit_error, "local config write failed\n");
1124   noit_conf_mark_changed();
1125   noit_http_response_end(ctx);
1126   return 0;
1127 }
1128 static int
1129 stratcon_console_conf_noits(noit_console_closure_t ncct,
1130                             int argc, char **argv,
1131                             noit_console_state_t *dstate,
1132                             void *closure) {
1133   char *cp, target[128];
1134   unsigned short port = 43191;
1135   int adding = (int)(vpsized_int)closure;
1136   if(argc != 1)
1137     return -1;
1138
1139   cp = strchr(argv[0], ':');
1140   if(cp) {
1141     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1142     port = atoi(cp+1);
1143   }
1144   else strlcpy(target, argv[0], sizeof(target));
1145   if(adding) {
1146     if(stratcon_add_noit(target, port, NULL) >= 0) {
1147       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1148     }
1149     else {
1150       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1151     }
1152   }
1153   else {
1154     if(stratcon_remove_noit(target, port) >= 0) {
1155       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1156     }
1157     else {
1158       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1159     }
1160   }
1161   return 0;
1162 }
1163
1164 static void
1165 register_console_streamer_commands() {
1166   noit_console_state_t *tl;
1167   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1168
1169   tl = noit_console_state_initial();
1170   showcmd = noit_console_state_get_cmd(tl, "show");
1171   assert(showcmd && showcmd->dstate);
1172   confcmd = noit_console_state_get_cmd(tl, "configure");
1173   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1174   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1175
1176   noit_console_state_add_cmd(conftcmd->dstate,
1177     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1178   noit_console_state_add_cmd(conftnocmd->dstate,
1179     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1180
1181   noit_console_state_add_cmd(showcmd->dstate,
1182     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1183 }
1184
1185 void
1186 stratcon_jlog_streamer_init(const char *toplevel) {
1187   pthread_mutex_init(&noits_lock, NULL);
1188   eventer_name_callback("noit_connection_reinitiate",
1189                         noit_connection_reinitiate);
1190   eventer_name_callback("stratcon_jlog_recv_handler",
1191                         stratcon_jlog_recv_handler);
1192   eventer_name_callback("noit_connection_ssl_upgrade",
1193                         noit_connection_ssl_upgrade);
1194   eventer_name_callback("noit_connection_complete_connect",
1195                         noit_connection_complete_connect);
1196   register_console_streamer_commands();
1197   stratcon_jlog_streamer_reload(toplevel);
1198   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1199   assert(noit_http_rest_register_auth(
1200     "GET", "/noits/", "^show$", rest_show_noits,
1201              noit_http_rest_client_cert_auth
1202   ) == 0);
1203   assert(noit_http_rest_register_auth(
1204     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1205              noit_http_rest_client_cert_auth
1206   ) == 0);
1207   assert(noit_http_rest_register_auth(
1208     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1209              noit_http_rest_client_cert_auth
1210   ) == 0);
1211   assert(noit_http_rest_register_auth(
1212     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1213              noit_http_rest_client_cert_auth
1214   ) == 0);
1215   assert(noit_http_rest_register_auth(
1216     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1217              noit_http_rest_client_cert_auth
1218   ) == 0);
1219 }
1220
Note: See TracBrowser for help on using the browser.