root/src/stratcon_jlog_streamer.c

Revision 3b79719687e49099efedc05ce9f278f78d4b329e, 47.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

fixup warnings

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "noit_rest.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_jlog_streamer.h"
42 #include "stratcon_iep.h"
43
44 #include <unistd.h>
45 #include <assert.h>
46 #include <errno.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #ifdef HAVE_SYS_FILIO_H
50 #include <sys/filio.h>
51 #endif
52 #include <netinet/in.h>
53 #include <sys/un.h>
54 #include <arpa/inet.h>
55
56 pthread_mutex_t noits_lock;
57 noit_hash_table noits = NOIT_HASH_EMPTY;
58 pthread_mutex_t noit_ip_by_cn_lock;
59 noit_hash_table noit_ip_by_cn = NOIT_HASH_EMPTY;
60
61 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
62
63 static const char *feed_type_to_str(int jlog_feed_cmd) {
64   switch(jlog_feed_cmd) {
65     case NOIT_JLOG_DATA_FEED: return "durable/storage";
66     case NOIT_JLOG_DATA_TEMP_FEED: return "transient/iep";
67   }
68   return "unknown";
69 }
70
71 static int
72 remote_str_sort(const void *a, const void *b) {
73   int rv;
74   noit_connection_ctx_t * const *actx = a;
75   noit_connection_ctx_t * const *bctx = b;
76   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
77   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
78   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
79   if(rv) return rv;
80   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
81            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
82 }
83 static void
84 nc_print_noit_conn_brief(noit_console_closure_t ncct,
85                           noit_connection_ctx_t *ctx) {
86   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
87   struct timeval now, diff, session_duration;
88   const char *feedtype = "unknown";
89   const char *lasttime = "never";
90   if(ctx->last_connect.tv_sec != 0) {
91     char cmdbuf[4096];
92     time_t r = ctx->last_connect.tv_sec;
93     struct tm tbuf, *tm;
94     tm = gmtime_r(&r, &tbuf);
95     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
96     lasttime = cmdbuf;
97   }
98   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
99             ctx->remote_cn ? "connected" :
100                              (ctx->retry_event ? "disconnected" :
101                                                    "connecting"), lasttime);
102   if(ctx->e) {
103     char buff[128];
104     const char *addrstr = NULL;
105     struct sockaddr_in6 addr6;
106     socklen_t len = sizeof(addr6);
107     if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
108       unsigned short port = 0;
109       if(addr6.sin6_family == AF_INET) {
110         addrstr = inet_ntop(addr6.sin6_family,
111                             &((struct sockaddr_in *)&addr6)->sin_addr,
112                             buff, sizeof(buff));
113         port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
114       }
115       else if(addr6.sin6_family == AF_INET6) {
116         addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
117                             buff, sizeof(buff));
118         port = ntohs(addr6.sin6_port);
119       }
120       if(addrstr != NULL)
121         nc_printf(ncct, "\tLocal address is %s:%u\n", buff, port);
122       else
123         nc_printf(ncct, "\tLocal address not interpretable\n");
124     }
125     else {
126       nc_printf(ncct, "\tLocal address error[%d]: %s\n",
127                 ctx->e->fd, strerror(errno));
128     }
129   }
130   feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
131   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
132   gettimeofday(&now, NULL);
133   if(ctx->retry_event) {
134     sub_timeval(ctx->retry_event->whence, now, &diff);
135     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
136               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
137   }
138   else if(ctx->remote_cn) {
139     nc_printf(ncct, "\tRemote CN: '%s'\n",
140               ctx->remote_cn ? ctx->remote_cn : "???");
141     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
142       struct timeval last;
143       double session_duration_seconds;
144       const char *state = "unknown";
145
146       switch(jctx->state) {
147         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
148         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
149         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
150         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
151         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
152         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
153         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
154       }
155       last.tv_sec = jctx->header.tv_sec;
156       last.tv_usec = jctx->header.tv_usec;
157       sub_timeval(now, last, &diff);
158       sub_timeval(now, ctx->last_connect, &session_duration);
159       session_duration_seconds = session_duration.tv_sec +
160                                  (double)session_duration.tv_usec/1000000.0;
161       nc_printf(ncct, "\tState: %s\n"
162                       "\tNext checkpoint: [%08x:%08x]\n"
163                       "\tLast event: %lld.%06us ago\n"
164                       "\tEvents this session: %llu (%0.2f/s)\n"
165                       "\tOctets this session: %llu (%0.2f/s)\n",
166                 state,
167                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
168                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
169                 jctx->total_events,
170                 (double)jctx->total_events/session_duration_seconds,
171                 jctx->total_bytes_read,
172                 (double)jctx->total_bytes_read/session_duration_seconds);
173     }
174     else {
175       nc_printf(ncct, "\tUnknown type.\n");
176     }
177   }
178 }
179
180 jlog_streamer_ctx_t *
181 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
182   jlog_streamer_ctx_t *ctx;
183   ctx = stratcon_jlog_streamer_ctx_alloc();
184   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
185   ctx->push = stratcon_datastore_push;
186   return ctx;
187 }
188 jlog_streamer_ctx_t *
189 stratcon_jlog_streamer_ctx_alloc(void) {
190   jlog_streamer_ctx_t *ctx;
191   ctx = calloc(1, sizeof(*ctx));
192   return ctx;
193 }
194 noit_connection_ctx_t *
195 noit_connection_ctx_alloc(void) {
196   noit_connection_ctx_t *ctx, **pctx;
197   ctx = calloc(1, sizeof(*ctx));
198   ctx->refcnt = 1;
199   pctx = malloc(sizeof(*pctx));
200   *pctx = ctx;
201   pthread_mutex_lock(&noits_lock);
202   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
203   pthread_mutex_unlock(&noits_lock);
204   return ctx;
205 }
206 int
207 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
208                          struct timeval *now) {
209   noit_connection_ctx_t *ctx = closure;
210   ctx->retry_event = NULL;
211   noit_connection_initiate_connection(closure);
212   return 0;
213 }
214 void
215 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
216                                    struct timeval *now) {
217   struct timeval __now, interval;
218   const char *v;
219   u_int32_t min_interval = 1000, max_interval = 8000;
220
221   noit_connection_disable_timeout(ctx);
222   if(ctx->remote_cn) {
223     free(ctx->remote_cn);
224     ctx->remote_cn = NULL;
225   }
226   if(noit_hash_retr_str(ctx->config,
227                         "reconnect_initial_interval",
228                         strlen("reconnect_initial_interval"),
229                         &v)) {
230     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
231   }
232   if(noit_hash_retr_str(ctx->config,
233                         "reconnect_maximum_interval",
234                         strlen("reconnect_maximum_interval"),
235                         &v)) {
236     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
237   }
238   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
239   else {
240     ctx->current_backoff *= 2;
241     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
242     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
243   }
244   if(!now) {
245     gettimeofday(&__now, NULL);
246     now = &__now;
247   }
248   interval.tv_sec = ctx->current_backoff / 1000;
249   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
250   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
251         ctx->current_backoff);
252   if(ctx->retry_event)
253     eventer_remove(ctx->retry_event);
254   else
255     ctx->retry_event = eventer_alloc();
256   ctx->retry_event->callback = noit_connection_reinitiate;
257   ctx->retry_event->closure = ctx;
258   ctx->retry_event->mask = EVENTER_TIMER;
259   add_timeval(*now, interval, &ctx->retry_event->whence);
260   eventer_add(ctx->retry_event);
261 }
262 static void
263 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
264   if(ctx->remote_cn) free(ctx->remote_cn);
265   if(ctx->remote_str) free(ctx->remote_str);
266   if(ctx->retry_event) {
267     eventer_remove(ctx->retry_event);
268     eventer_free(ctx->retry_event);
269   }
270   if(ctx->timeout_event) {
271     eventer_remove(ctx->timeout_event);
272     eventer_free(ctx->timeout_event);
273   }
274   ctx->consumer_free(ctx->consumer_ctx);
275   free(ctx);
276 }
277 void
278 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
279   if(noit_atomic_dec32(&ctx->refcnt) == 0)
280     noit_connection_ctx_free(ctx);
281 }
282 void
283 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
284   noit_connection_ctx_t **pctx = &ctx;
285   pthread_mutex_lock(&noits_lock);
286   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
287                    free, (void (*)(void *))noit_connection_ctx_deref);
288   pthread_mutex_unlock(&noits_lock);
289 }
290 void
291 jlog_streamer_ctx_free(void *cl) {
292   jlog_streamer_ctx_t *ctx = cl;
293   if(ctx->buffer) free(ctx->buffer);
294   free(ctx);
295 }
296
297 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
298 static int
299 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
300   int len, mask;
301   while(ctx->bytes_read < ctx->bytes_expected) {
302     len = Eread(ctx->buffer + ctx->bytes_read,
303                 ctx->bytes_expected - ctx->bytes_read);
304     if(len < 0) {
305       *newmask = mask;
306       return -1;
307     }
308     /* if we get 0 inside SSL, and there was a real error, we
309      * will actually get a -1 here.
310      * if(len == 0) return ctx->bytes_read;
311      */
312     ctx->total_bytes_read += len;
313     ctx->bytes_read += len;
314   }
315   assert(ctx->bytes_read == ctx->bytes_expected);
316   return ctx->bytes_read;
317 }
318 #define FULLREAD(e,ctx,size) do { \
319   int mask, len; \
320   if(!ctx->bytes_expected) { \
321     ctx->bytes_expected = size; \
322     if(ctx->buffer) free(ctx->buffer); \
323     ctx->buffer = malloc(size + 1); \
324     if(ctx->buffer == NULL) { \
325       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
326       goto socket_error; \
327     } \
328     ctx->buffer[size] = '\0'; \
329   } \
330   len = __read_on_ctx(e, ctx, &mask); \
331   if(len < 0) { \
332     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
333     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
334     goto socket_error; \
335   } \
336   ctx->bytes_read = 0; \
337   ctx->bytes_expected = 0; \
338   if(len != size) { \
339     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
340           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
341     goto socket_error; \
342   } \
343 } while(0)
344
345 int
346 noit_connection_session_timeout(eventer_t e, int mask, void *closure,
347                                 struct timeval *now) {
348   noit_connection_ctx_t *nctx = closure;
349   eventer_t fde = nctx->e;
350   nctx->timeout_event = NULL;
351   noitL(noit_error, "Timing out jlog session: %s\n",
352         nctx->remote_cn ? nctx->remote_cn : "(null)");
353   if(fde)
354     eventer_trigger(fde, EVENTER_EXCEPTION);
355   return 0;
356 }
357 int
358 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
359                            struct timeval *now) {
360   noit_connection_ctx_t *nctx = closure;
361   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
362   jlog_streamer_ctx_t dummy;
363   int len;
364   jlog_id n_chkpt;
365
366   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
367     if(write(e->fd, e, 0) == -1)
368       noitL(noit_error, "socket error: %s\n", strerror(errno));
369  socket_error:
370     ctx->state = JLOG_STREAMER_WANT_INITIATE;
371     ctx->count = 0;
372     ctx->bytes_read = 0;
373     ctx->bytes_expected = 0;
374     if(ctx->buffer) free(ctx->buffer);
375     ctx->buffer = NULL;
376     noit_connection_schedule_reattempt(nctx, now);
377     eventer_remove_fd(e->fd);
378     nctx->e = NULL;
379     e->opset->close(e->fd, &mask, e);
380     return 0;
381   }
382
383   noit_connection_update_timeout(nctx);
384   while(1) {
385     switch(ctx->state) {
386       case JLOG_STREAMER_WANT_INITIATE:
387         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
388                               sizeof(ctx->jlog_feed_cmd),
389                               &mask, e);
390         if(len < 0) {
391           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
392           goto socket_error;
393         }
394         if(len != sizeof(ctx->jlog_feed_cmd)) {
395           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
396                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
397           goto socket_error;
398         }
399         ctx->state = JLOG_STREAMER_WANT_COUNT;
400         break;
401
402       case JLOG_STREAMER_WANT_ERROR:
403         FULLREAD(e, ctx, 0 - ctx->count);
404         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
405               0 - ctx->count, ctx->buffer);
406         free(ctx->buffer); ctx->buffer = NULL;
407         goto socket_error;
408         break;
409
410       case JLOG_STREAMER_WANT_COUNT:
411         FULLREAD(e, ctx, sizeof(u_int32_t));
412         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
413         ctx->count = ntohl(dummy.count);
414         free(ctx->buffer); ctx->buffer = NULL;
415         if(ctx->count < 0)
416           ctx->state = JLOG_STREAMER_WANT_ERROR;
417         else
418           ctx->state = JLOG_STREAMER_WANT_HEADER;
419         break;
420
421       case JLOG_STREAMER_WANT_HEADER:
422         if(ctx->count == 0) {
423           ctx->state = JLOG_STREAMER_WANT_COUNT;
424           break;
425         }
426         FULLREAD(e, ctx, sizeof(ctx->header));
427         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
428         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
429         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
430         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
431         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
432         ctx->header.message_len = ntohl(dummy.header.message_len);
433         free(ctx->buffer); ctx->buffer = NULL;
434         ctx->state = JLOG_STREAMER_WANT_BODY;
435         break;
436
437       case JLOG_STREAMER_WANT_BODY:
438         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
439         if(ctx->header.message_len > 0)
440           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
441                     ctx->buffer, NULL);
442         else if(ctx->buffer)
443           free(ctx->buffer);
444         /* Don't free the buffer, it's used by the datastore process. */
445         ctx->buffer = NULL;
446         ctx->count--;
447         ctx->total_events++;
448         if(ctx->count == 0) {
449           eventer_t completion_e;
450           eventer_remove_fd(e->fd);
451           completion_e = eventer_alloc();
452           memcpy(completion_e, e, sizeof(*e));
453           nctx->e = completion_e;
454           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
455           ctx->state = JLOG_STREAMER_IS_ASYNC;
456           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
457                     NULL, completion_e);
458           noitL(noit_debug, "Pushing %s batch async [%s]: [%u/%u]\n",
459                 feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
460                 nctx->remote_cn ? nctx->remote_cn : "(null)",
461                 ctx->header.chkpt.log, ctx->header.chkpt.marker);
462           noit_connection_disable_timeout(nctx);
463           return 0;
464         } else
465           ctx->state = JLOG_STREAMER_WANT_HEADER;
466         break;
467
468       case JLOG_STREAMER_IS_ASYNC:
469         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
470       case JLOG_STREAMER_WANT_CHKPT:
471         noitL(noit_debug, "Pushing %s checkpoint [%s]: [%u/%u]\n",
472               feed_type_to_str(ntohl(ctx->jlog_feed_cmd)),
473               nctx->remote_cn ? nctx->remote_cn : "(null)",
474               ctx->header.chkpt.log, ctx->header.chkpt.marker);
475         n_chkpt.log = htonl(ctx->header.chkpt.log);
476         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
477
478         /* screw short writes.  I'd rather die than not write my data! */
479         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
480                               &mask, e);
481         if(len < 0) {
482           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
483           goto socket_error;
484         }
485         if(len != sizeof(jlog_id)) {
486           noitL(noit_error, "short write on checkpointing stream.\n");
487           goto socket_error;
488         }
489         ctx->state = JLOG_STREAMER_WANT_COUNT;
490         break;
491     }
492   }
493   /* never get here */
494 }
495
496 int
497 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
498                             struct timeval *now) {
499   noit_connection_ctx_t *nctx = closure;
500   int rv;
501   const char *error = "jlog streamer SSL upgrade failed.\n";
502
503   rv = eventer_SSL_connect(e, &mask);
504   if(rv > 0) {
505     eventer_ssl_ctx_t *sslctx;
506     e->callback = nctx->consumer_callback;
507     /* We must make a copy of the acceptor_closure_t for each new
508      * connection.
509      */
510     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
511       const char *cn, *end;
512       void *vcn;
513       cn = eventer_ssl_get_peer_subject(sslctx);
514       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
515         cn += 3;
516         end = cn;
517         while(*end && *end != '/') end++;
518         nctx->remote_cn = malloc(end - cn + 1);
519         memcpy(nctx->remote_cn, cn, end - cn);
520         nctx->remote_cn[end-cn] = '\0';
521       }
522       if(nctx->config &&
523          noit_hash_retrieve(nctx->config, "cn", 2, &vcn)) {
524         const char *cn_expected = vcn;
525         if(!nctx->remote_cn ||
526            strcmp(nctx->remote_cn, cn_expected)) {
527           error = "jlog connect CN mismatch\n";
528           goto error;
529         }
530       }
531     }
532     return e->callback(e, mask, e->closure, now);
533   }
534   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
535
536  error:
537   noitL(noit_error, "%s", error);
538   eventer_remove_fd(e->fd);
539   nctx->e = NULL;
540   e->opset->close(e->fd, &mask, e);
541   noit_connection_schedule_reattempt(nctx, now);
542   return 0;
543 }
544 int
545 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
546                                  struct timeval *now) {
547   noit_connection_ctx_t *nctx = closure;
548   const char *cert, *key, *ca, *ciphers, *crl = NULL;
549   char remote_str[128], tmp_str[128];
550   eventer_ssl_ctx_t *sslctx;
551   int aerrno, len;
552   socklen_t aerrno_len = sizeof(aerrno);
553
554   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
555     if(aerrno != 0) goto connect_error;
556   aerrno = 0;
557
558   if(mask & EVENTER_EXCEPTION) {
559     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
560       aerrno = errno;
561  connect_error:
562     switch(nctx->r.remote.sa_family) {
563       case AF_INET:
564         len = sizeof(struct sockaddr_in);
565         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
566                   tmp_str, len);
567         snprintf(remote_str, sizeof(remote_str), "%s:%d",
568                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
569         break;
570       case AF_INET6:
571         len = sizeof(struct sockaddr_in6);
572         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
573                   tmp_str, len);
574         snprintf(remote_str, sizeof(remote_str), "%s:%d",
575                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
576        break;
577       case AF_UNIX:
578         len = SUN_LEN(&(nctx->r.remote_un));
579         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
580         break;
581       default:
582         snprintf(remote_str, sizeof(remote_str), "(unknown)");
583     }
584     noitL(noit_error, "Error connecting to %s: %s\n",
585           remote_str, strerror(aerrno));
586     eventer_remove_fd(e->fd);
587     nctx->e = NULL;
588     e->opset->close(e->fd, &mask, e);
589     noit_connection_schedule_reattempt(nctx, now);
590     return 0;
591   }
592
593 #define SSLCONFGET(var,name) do { \
594   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
595                          &var)) var = NULL; } while(0)
596   SSLCONFGET(cert, "certificate_file");
597   SSLCONFGET(key, "key_file");
598   SSLCONFGET(ca, "ca_chain");
599   SSLCONFGET(ciphers, "ciphers");
600   SSLCONFGET(crl, "crl");
601   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
602   if(!sslctx) goto connect_error;
603   if(crl) {
604     if(!eventer_ssl_use_crl(sslctx, crl)) {
605       noitL(noit_error, "Failed to load CRL from %s\n", crl);
606       eventer_ssl_ctx_free(sslctx);
607       goto connect_error;
608     }
609   }
610
611   memcpy(&nctx->last_connect, now, sizeof(*now));
612   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
613                              nctx->sslconfig);
614   EVENTER_ATTACH_SSL(e, sslctx);
615   e->callback = noit_connection_ssl_upgrade;
616   return e->callback(e, mask, closure, now);
617 }
618 static void
619 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
620   struct timeval __now;
621   eventer_t e;
622   int rv, fd = -1;
623 #ifdef SO_KEEPALIVE
624   int optval;
625   socklen_t optlen = sizeof(optval);
626 #endif
627
628   nctx->e = NULL;
629   if(nctx->wants_permanent_shutdown) {
630     noit_connection_ctx_dealloc(nctx);
631     return;
632   }
633   /* Open a socket */
634   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
635   if(fd < 0) goto reschedule;
636
637   /* Make it non-blocking */
638   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
639 #define set_or_bail(type, opt, val) do { \
640   optval = val; \
641   optlen = sizeof(optval); \
642   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
643     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
644           strerror(errno)); \
645     goto reschedule; \
646   } \
647 } while(0)
648 #ifdef SO_KEEPALIVE
649   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
650 #endif
651 #ifdef TCP_KEEPALIVE_THRESHOLD
652   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
653 #endif
654 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
655   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
656 #endif
657 #ifdef TCP_CONN_NOTIFY_THRESHOLD
658   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
659 #endif
660 #ifdef TCP_CONN_ABORT_THRESHOLD
661   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
662 #endif
663
664   /* Initiate a connection */
665   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
666   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
667
668   /* Register a handler for connection completion */
669   e = eventer_alloc();
670   e->fd = fd;
671   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
672   e->callback = noit_connection_complete_connect;
673   e->closure = nctx;
674   nctx->e = e;
675   eventer_add(e);
676
677   noit_connection_update_timeout(nctx);
678   return;
679
680  reschedule:
681   if(fd >= 0) close(fd);
682   gettimeofday(&__now, NULL);
683   noit_connection_schedule_reattempt(nctx, &__now);
684   return;
685 }
686
687 int
688 noit_connection_update_timeout(noit_connection_ctx_t *nctx) {
689   struct timeval now, diff;
690   if(nctx->max_silence == 0) return 0;
691
692   diff.tv_sec = nctx->max_silence / 1000;
693   diff.tv_usec = (nctx->max_silence % 1000) * 1000;
694   gettimeofday(&now, NULL);
695
696   if(!nctx->timeout_event) {
697     nctx->timeout_event = eventer_alloc();
698     nctx->timeout_event->mask = EVENTER_TIMER;
699     nctx->timeout_event->closure = nctx;
700     nctx->timeout_event->callback = noit_connection_session_timeout;
701     add_timeval(now, diff, &nctx->timeout_event->whence);
702     eventer_add(nctx->timeout_event);
703   }
704   else {
705     add_timeval(now, diff, &nctx->timeout_event->whence);
706     eventer_update(nctx->timeout_event, EVENTER_TIMER);
707   }
708   return 0;
709 }
710
711 int
712 noit_connection_disable_timeout(noit_connection_ctx_t *nctx) {
713   if(nctx->timeout_event) {
714     eventer_remove(nctx->timeout_event);
715     eventer_free(nctx->timeout_event);
716     nctx->timeout_event = NULL;
717   }
718   return 0;
719 }
720
721 int
722 initiate_noit_connection(const char *host, unsigned short port,
723                          noit_hash_table *sslconfig, noit_hash_table *config,
724                          eventer_func_t handler, void *closure,
725                          void (*freefunc)(void *)) {
726   noit_connection_ctx_t *ctx;
727   const char *stimeout;
728   int8_t family;
729   int rv;
730   union {
731     struct in_addr addr4;
732     struct in6_addr addr6;
733   } a;
734
735   if(host[0] == '/') {
736     family = AF_UNIX;
737   }
738   else {
739     family = AF_INET;
740     rv = inet_pton(family, host, &a);
741     if(rv != 1) {
742       family = AF_INET6;
743       rv = inet_pton(family, host, &a);
744       if(rv != 1) {
745         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
746         return -1;
747       }
748     }
749   }
750
751   ctx = noit_connection_ctx_alloc();
752   ctx->remote_str = calloc(1, strlen(host) + 7);
753   snprintf(ctx->remote_str, strlen(host) + 7,
754            "%s:%d", host, port);
755  
756   memset(&ctx->r, 0, sizeof(ctx->r));
757   if(family == AF_UNIX) {
758     struct sockaddr_un *s = &ctx->r.remote_un;
759     s->sun_family = AF_UNIX;
760     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
761     ctx->remote_len = sizeof(*s);
762   }
763   else if(family == AF_INET) {
764     struct sockaddr_in *s = &ctx->r.remote_in;
765     s->sin_family = family;
766     s->sin_port = htons(port);
767     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
768     ctx->remote_len = sizeof(*s);
769   }
770   else {
771     struct sockaddr_in6 *s = &ctx->r.remote_in6;
772     s->sin6_family = family;
773     s->sin6_port = htons(port);
774     memcpy(&s->sin6_addr, &a, sizeof(a));
775     ctx->remote_len = sizeof(*s);
776   }
777
778   if(ctx->sslconfig)
779     noit_hash_delete_all(ctx->sslconfig, free, free);
780   else
781     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
782   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
783   if(ctx->config)
784     noit_hash_delete_all(ctx->config, free, free);
785   else
786     ctx->config = calloc(1, sizeof(noit_hash_table));
787   noit_hash_merge_as_dict(ctx->config, config);
788
789   if(noit_hash_retr_str(ctx->config, "timeout", strlen("timeout"), &stimeout))
790     ctx->max_silence = atoi(stimeout);
791   else
792     ctx->max_silence = DEFAULT_NOIT_CONNECTION_TIMEOUT;
793   ctx->consumer_callback = handler;
794   ctx->consumer_free = freefunc;
795   ctx->consumer_ctx = closure;
796   noit_connection_initiate_connection(ctx);
797   return 0;
798 }
799
800 void
801 stratcon_streamer_connection(const char *toplevel, const char *destination,
802                              eventer_func_t handler,
803                              void *(*handler_alloc)(void), void *handler_ctx,
804                              void (*handler_free)(void *)) {
805   int i, cnt = 0;
806   noit_conf_section_t *noit_configs;
807   char path[256];
808
809   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
810   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
811   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
812   for(i=0; i<cnt; i++) {
813     char address[256];
814     unsigned short port;
815     int portint;
816     noit_hash_table *sslconfig, *config;
817
818     if(!noit_conf_get_stringbuf(noit_configs[i],
819                                 "ancestor-or-self::node()/@address",
820                                 address, sizeof(address))) {
821       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
822       continue;
823     }
824     /* if destination is specified, exact match it */
825     if(destination && strcmp(address, destination)) continue;
826
827     if(!noit_conf_get_int(noit_configs[i],
828                           "ancestor-or-self::node()/@port", &portint))
829       portint = 0;
830     port = (unsigned short) portint;
831     if(address[0] != '/' && (portint == 0 || (port != portint))) {
832       /* UNIX sockets don't require a port (they'll ignore it if specified */
833       noitL(noit_stderr,
834             "Invalid port [%d] specified in stanza %d\n", port, i+1);
835       continue;
836     }
837     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
838     config = noit_conf_get_hash(noit_configs[i], "config");
839
840     noitL(noit_error, "initiating to %s\n", address);
841     initiate_noit_connection(address, port, sslconfig, config,
842                              handler,
843                              handler_alloc ? handler_alloc() : handler_ctx,
844                              handler_free);
845     noit_hash_destroy(sslconfig,free,free);
846     free(sslconfig);
847     noit_hash_destroy(config,free,free);
848     free(config);
849   }
850   free(noit_configs);
851 }
852 int
853 stratcon_find_noit_ip_by_cn(const char *cn, char *ip, int len) {
854   int rv = -1;
855   void *vip;
856   pthread_mutex_lock(&noit_ip_by_cn_lock);
857   if(noit_hash_retrieve(&noit_ip_by_cn, cn, strlen(cn), &vip)) {
858     int new_len;
859     char *new_ip = (char *)vip;
860     new_len = strlen(new_ip);
861     strlcpy(ip, new_ip, len);
862     if(new_len >= len) rv = new_len+1;
863     else rv = 0;
864   }
865   pthread_mutex_unlock(&noit_ip_by_cn_lock);
866   return rv;
867 }
868 void
869 stratcon_jlog_streamer_recache_noit() {
870   int di, cnt;
871   noit_conf_section_t *noit_configs;
872   noit_configs = noit_conf_get_sections(NULL, "//noits//noit", &cnt);
873   pthread_mutex_lock(&noit_ip_by_cn_lock);
874   noit_hash_delete_all(&noit_ip_by_cn, free, free);
875   for(di=0; di<cnt; di++) {
876     char address[64];
877     if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
878                                  address, sizeof(address))) {
879       char expected_cn[256];
880       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
881                                  expected_cn, sizeof(expected_cn)))
882         noit_hash_store(&noit_ip_by_cn,
883                         strdup(expected_cn), strlen(expected_cn),
884                         strdup(address));
885     }
886   }
887   free(noit_configs);
888   pthread_mutex_unlock(&noit_ip_by_cn_lock);
889 }
890 void
891 stratcon_jlog_streamer_reload(const char *toplevel) {
892   /* flush and repopulate the cn cache */
893   stratcon_jlog_streamer_recache_noit();
894   if(!stratcon_datastore_get_enabled()) return;
895   stratcon_streamer_connection(toplevel, NULL,
896                                stratcon_jlog_recv_handler,
897                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
898                                NULL,
899                                jlog_streamer_ctx_free);
900 }
901
902 static int
903 stratcon_console_show_noits(noit_console_closure_t ncct,
904                             int argc, char **argv,
905                             noit_console_state_t *dstate,
906                             void *closure) {
907   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
908   void *key_id;
909   int klen, n = 0, i;
910   void *vconn;
911   noit_connection_ctx_t **ctx;
912
913   pthread_mutex_lock(&noits_lock);
914   ctx = malloc(sizeof(*ctx) * noits.size);
915   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
916                        &vconn)) {
917     ctx[n] = (noit_connection_ctx_t *)vconn;
918     noit_atomic_inc32(&ctx[n]->refcnt);
919     n++;
920   }
921   pthread_mutex_unlock(&noits_lock);
922   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
923   for(i=0; i<n; i++) {
924     nc_print_noit_conn_brief(ncct, ctx[i]);
925     noit_connection_ctx_deref(ctx[i]);
926   }
927   free(ctx);
928   return 0;
929 }
930
931 static int
932 rest_show_noits(noit_http_rest_closure_t *restc,
933                 int npats, char **pats) {
934   xmlDocPtr doc;
935   xmlNodePtr root;
936   noit_hash_table seen = NOIT_HASH_EMPTY;
937   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
938   char path[256];
939   void *key_id, *vstr;
940   const char *type = NULL, *want_cn = NULL;
941   int klen, n = 0, i, di, cnt;
942   void *vconn;
943   noit_connection_ctx_t **ctxs;
944   noit_conf_section_t *noit_configs;
945   struct timeval now, diff, last;
946   xmlNodePtr node;
947
948   noit_http_process_querystring(&restc->http_ctx->req);
949   if(noit_hash_retrieve(&restc->http_ctx->req.querystring, "type", 4, &vstr))
950     type = vstr;
951   if(noit_hash_retrieve(&restc->http_ctx->req.querystring, "cn", 2, &vstr))
952     want_cn = vstr;
953
954   gettimeofday(&now, NULL);
955
956   pthread_mutex_lock(&noits_lock);
957   ctxs = malloc(sizeof(*ctxs) * noits.size);
958   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
959                        &vconn)) {
960     ctxs[n] = (noit_connection_ctx_t *)vconn;
961     noit_atomic_inc32(&ctxs[n]->refcnt);
962     n++;
963   }
964   pthread_mutex_unlock(&noits_lock);
965   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
966
967   doc = xmlNewDoc((xmlChar *)"1.0");
968   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
969   xmlDocSetRootElement(doc, root);
970
971   for(i=0; i<n; i++) {
972     char buff[256];
973     const char *feedtype = "unknown", *state = "unknown";
974     noit_connection_ctx_t *ctx = ctxs[i];
975     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
976
977     feedtype = feed_type_to_str(ntohl(jctx->jlog_feed_cmd));
978
979     /* If the user requested a specific type and we're not it, skip. */
980     if(type && strcmp(feedtype, type)) continue;
981     /* If the user wants a specific CN... limit to that. */
982     if(want_cn && (!ctx->remote_cn || strcmp(want_cn, ctx->remote_cn)))
983       continue;
984
985     node = xmlNewNode(NULL, (xmlChar *)"noit");
986     snprintf(buff, sizeof(buff), "%llu.%06d",
987              (long long unsigned)ctx->last_connect.tv_sec,
988              (int)ctx->last_connect.tv_usec);
989     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
990     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
991                (xmlChar *)"connected" :
992                (ctx->retry_event ? (xmlChar *)"disconnected" :
993                                     (xmlChar *)"connecting"));
994     if(ctx->e) {
995       char buff[128];
996       const char *addrstr = NULL;
997       struct sockaddr_in6 addr6;
998       socklen_t len = sizeof(addr6);
999       if(getsockname(ctx->e->fd, (struct sockaddr *)&addr6, &len) == 0) {
1000         unsigned short port = 0;
1001         if(addr6.sin6_family == AF_INET) {
1002           addrstr = inet_ntop(addr6.sin6_family,
1003                               &((struct sockaddr_in *)&addr6)->sin_addr,
1004                               buff, sizeof(buff));
1005           port = ntohs(((struct sockaddr_in *)&addr6)->sin_port);
1006         }
1007         else if(addr6.sin6_family == AF_INET6) {
1008           addrstr = inet_ntop(addr6.sin6_family, &addr6.sin6_addr,
1009                               buff, sizeof(buff));
1010           port = ntohs(addr6.sin6_port);
1011         }
1012         if(addrstr != NULL) {
1013           snprintf(buff + strlen(buff), sizeof(buff) - strlen(buff),
1014                    ":%u", port);
1015           xmlSetProp(node, (xmlChar *)"local", (xmlChar *)buff);
1016         }
1017       }
1018     }
1019     noit_hash_replace(&seen, strdup(ctx->remote_str), strlen(ctx->remote_str),
1020                       0, free, NULL);
1021     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
1022     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
1023     if(ctx->retry_event) {
1024       sub_timeval(ctx->retry_event->whence, now, &diff);
1025       snprintf(buff, sizeof(buff), "%llu.%06d",
1026                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
1027       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
1028     }
1029     else if(ctx->remote_cn) {
1030       if(ctx->remote_cn)
1031         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
1032  
1033       switch(jctx->state) {
1034         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
1035         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
1036         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
1037         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
1038         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
1039         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
1040         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
1041       }
1042       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
1043       snprintf(buff, sizeof(buff), "%08x:%08x",
1044                jctx->header.chkpt.log, jctx->header.chkpt.marker);
1045       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
1046       snprintf(buff, sizeof(buff), "%llu",
1047                (unsigned long long)jctx->total_events);
1048       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
1049       snprintf(buff, sizeof(buff), "%llu",
1050                (unsigned long long)jctx->total_bytes_read);
1051       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
1052  
1053       sub_timeval(now, ctx->last_connect, &diff);
1054       snprintf(buff, sizeof(buff), "%lld.%06d",
1055                (long long)diff.tv_sec, (int)diff.tv_usec);
1056       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
1057  
1058       if(jctx->header.tv_sec) {
1059         last.tv_sec = jctx->header.tv_sec;
1060         last.tv_usec = jctx->header.tv_usec;
1061         snprintf(buff, sizeof(buff), "%llu.%06d",
1062                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
1063         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
1064         sub_timeval(now, last, &diff);
1065         snprintf(buff, sizeof(buff), "%lld.%06d",
1066                  (long long)diff.tv_sec, (int)diff.tv_usec);
1067         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
1068       }
1069     }
1070
1071     xmlAddChild(root, node);
1072     noit_connection_ctx_deref(ctx);
1073   }
1074   free(ctxs);
1075
1076   if(!type || !strcmp(type, "configured")) {
1077     snprintf(path, sizeof(path), "//noits//noit");
1078     noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1079     for(di=0; di<cnt; di++) {
1080       char address[64], port_str[32], remote_str[98];
1081       char expected_cn_buff[256], *expected_cn = NULL;
1082       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/config/cn",
1083                                  expected_cn_buff, sizeof(expected_cn_buff)))
1084         expected_cn = expected_cn_buff;
1085       if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn))) continue;
1086       if(noit_conf_get_stringbuf(noit_configs[di], "self::node()/@address",
1087                                  address, sizeof(address))) {
1088         void *v;
1089         if(!noit_conf_get_stringbuf(noit_configs[di], "self::node()/@port",
1090                                    port_str, sizeof(port_str)))
1091           strlcpy(port_str, "43191", sizeof(port_str));
1092
1093         /* If the user wants a specific CN... limit to that. */
1094           if(want_cn && (!expected_cn || strcmp(want_cn, expected_cn)))
1095             continue;
1096
1097         snprintf(remote_str, sizeof(remote_str), "%s:%s", address, port_str);
1098         if(!noit_hash_retrieve(&seen, remote_str, strlen(remote_str), &v)) {
1099           node = xmlNewNode(NULL, (xmlChar *)"noit");
1100           xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)remote_str);
1101           xmlSetProp(node, (xmlChar *)"type", (xmlChar *)"configured");
1102           if(expected_cn)
1103             xmlSetProp(node, (xmlChar *)"cn", (xmlChar *)expected_cn);
1104           xmlAddChild(root, node);
1105         }
1106       }
1107     }
1108     free(noit_configs);
1109   }
1110   noit_hash_destroy(&seen, free, NULL);
1111
1112   noit_http_response_ok(restc->http_ctx, "text/xml");
1113   noit_http_response_xml(restc->http_ctx, doc);
1114   noit_http_response_end(restc->http_ctx);
1115   xmlFreeDoc(doc);
1116   return 0;
1117 }
1118 static int
1119 stratcon_add_noit(const char *target, unsigned short port,
1120                   const char *cn) {
1121   int cnt;
1122   char path[256];
1123   char port_str[6];
1124   noit_conf_section_t *noit_configs, parent;
1125   xmlNodePtr newnoit, config, cnnode;
1126
1127   snprintf(path, sizeof(path),
1128            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1129   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1130   free(noit_configs);
1131   if(cnt != 0) return -1;
1132
1133   parent = noit_conf_get_section(NULL, "//noits");
1134   if(!parent) return -1;
1135   snprintf(port_str, sizeof(port_str), "%d", port);
1136   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
1137   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
1138   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
1139   xmlAddChild(parent, newnoit);
1140   if(cn) {
1141     config = xmlNewNode(NULL, (xmlChar *)"config");
1142     cnnode = xmlNewNode(NULL, (xmlChar *)"cn");
1143     xmlNodeAddContent(cnnode, (xmlChar *)cn);
1144     xmlAddChild(config, cnnode);
1145     xmlAddChild(newnoit, config);
1146     pthread_mutex_lock(&noit_ip_by_cn_lock);
1147     noit_hash_replace(&noit_ip_by_cn, strdup(cn), strlen(cn),
1148                       strdup(target), free, free);
1149     pthread_mutex_unlock(&noit_ip_by_cn_lock);
1150   }
1151   if(stratcon_datastore_get_enabled())
1152     stratcon_streamer_connection(NULL, target,
1153                                  stratcon_jlog_recv_handler,
1154                                  (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
1155                                  NULL,
1156                                  jlog_streamer_ctx_free);
1157   if(stratcon_iep_get_enabled())
1158     stratcon_streamer_connection(NULL, target,
1159                                  stratcon_jlog_recv_handler,
1160                                  (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
1161                                  NULL,
1162                                  jlog_streamer_ctx_free);
1163   return 1;
1164 }
1165 static int
1166 stratcon_remove_noit(const char *target, unsigned short port) {
1167   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
1168   void *key_id;
1169   int klen, n = -1, i, cnt = 0;
1170   void *vconn;
1171   noit_connection_ctx_t **ctx;
1172   noit_conf_section_t *noit_configs;
1173   char path[256];
1174   char remote_str[256];
1175
1176   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
1177
1178   snprintf(path, sizeof(path),
1179            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
1180   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
1181   for(i=0; i<cnt; i++) {
1182     char expected_cn[256];
1183     if(noit_conf_get_stringbuf(noit_configs[i], "self::node()/config/cn",
1184                                expected_cn, sizeof(expected_cn))) {
1185       pthread_mutex_lock(&noit_ip_by_cn_lock);
1186       noit_hash_delete(&noit_ip_by_cn, expected_cn, strlen(expected_cn),
1187                        free, free);
1188       pthread_mutex_unlock(&noit_ip_by_cn_lock);
1189     }
1190     xmlUnlinkNode(noit_configs[i]);
1191     xmlFreeNode(noit_configs[i]);
1192     n = 0;
1193   }
1194   free(noit_configs);
1195
1196   pthread_mutex_lock(&noits_lock);
1197   ctx = malloc(sizeof(*ctx) * noits.size);
1198   while(noit_hash_next(&noits, &iter, (const char **)&key_id, &klen,
1199                        &vconn)) {
1200     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
1201       ctx[n] = (noit_connection_ctx_t *)vconn;
1202       noit_atomic_inc32(&ctx[n]->refcnt);
1203       n++;
1204     }
1205   }
1206   pthread_mutex_unlock(&noits_lock);
1207   for(i=0; i<n; i++) {
1208     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
1209     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
1210   }
1211   free(ctx);
1212   return n;
1213 }
1214 static int
1215 rest_set_noit(noit_http_rest_closure_t *restc,
1216               int npats, char **pats) {
1217   void *vcn;
1218   const char *cn = NULL;
1219   noit_http_session_ctx *ctx = restc->http_ctx;
1220   unsigned short port = 43191;
1221   if(npats < 1 || npats > 2)
1222     noit_http_response_server_error(ctx, "text/xml");
1223   if(npats == 2) port = atoi(pats[1]);
1224   noit_http_process_querystring(&ctx->req);
1225   if(noit_hash_retrieve(&ctx->req.querystring, "cn", 2, &vcn))
1226     cn = vcn;
1227   if(stratcon_add_noit(pats[0], port, cn) >= 0)
1228     noit_http_response_ok(ctx, "text/xml");
1229   else
1230     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
1231   if(noit_conf_write_file(NULL) != 0)
1232     noitL(noit_error, "local config write failed\n");
1233   noit_conf_mark_changed();
1234   noit_http_response_end(ctx);
1235   return 0;
1236 }
1237 static int
1238 rest_delete_noit(noit_http_rest_closure_t *restc,
1239                  int npats, char **pats) {
1240   noit_http_session_ctx *ctx = restc->http_ctx;
1241   unsigned short port = 43191;
1242   if(npats < 1 || npats > 2)
1243     noit_http_response_server_error(ctx, "text/xml");
1244   if(npats == 2) port = atoi(pats[1]);
1245   if(stratcon_remove_noit(pats[0], port) >= 0)
1246     noit_http_response_ok(ctx, "text/xml");
1247   else
1248     noit_http_response_not_found(ctx, "text/xml");
1249   if(noit_conf_write_file(NULL) != 0)
1250     noitL(noit_error, "local config write failed\n");
1251   noit_conf_mark_changed();
1252   noit_http_response_end(ctx);
1253   return 0;
1254 }
1255 static int
1256 stratcon_console_conf_noits(noit_console_closure_t ncct,
1257                             int argc, char **argv,
1258                             noit_console_state_t *dstate,
1259                             void *closure) {
1260   char *cp, target[128];
1261   unsigned short port = 43191;
1262   int adding = (int)(vpsized_int)closure;
1263   if(argc != 1)
1264     return -1;
1265
1266   cp = strchr(argv[0], ':');
1267   if(cp) {
1268     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
1269     port = atoi(cp+1);
1270   }
1271   else strlcpy(target, argv[0], sizeof(target));
1272   if(adding) {
1273     if(stratcon_add_noit(target, port, NULL) >= 0) {
1274       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1275     }
1276     else {
1277       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1278     }
1279   }
1280   else {
1281     if(stratcon_remove_noit(target, port) >= 0) {
1282       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1283     }
1284     else {
1285       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1286     }
1287   }
1288   return 0;
1289 }
1290
1291 static void
1292 register_console_streamer_commands() {
1293   noit_console_state_t *tl;
1294   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1295
1296   tl = noit_console_state_initial();
1297   showcmd = noit_console_state_get_cmd(tl, "show");
1298   assert(showcmd && showcmd->dstate);
1299   confcmd = noit_console_state_get_cmd(tl, "configure");
1300   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1301   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1302
1303   noit_console_state_add_cmd(conftcmd->dstate,
1304     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1305   noit_console_state_add_cmd(conftnocmd->dstate,
1306     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1307
1308   noit_console_state_add_cmd(showcmd->dstate,
1309     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1310 }
1311
1312 void
1313 stratcon_jlog_streamer_init(const char *toplevel) {
1314   pthread_mutex_init(&noits_lock, NULL);
1315   pthread_mutex_init(&noit_ip_by_cn_lock, NULL);
1316   eventer_name_callback("noit_connection_reinitiate",
1317                         noit_connection_reinitiate);
1318   eventer_name_callback("stratcon_jlog_recv_handler",
1319                         stratcon_jlog_recv_handler);
1320   eventer_name_callback("noit_connection_ssl_upgrade",
1321                         noit_connection_ssl_upgrade);
1322   eventer_name_callback("noit_connection_complete_connect",
1323                         noit_connection_complete_connect);
1324   eventer_name_callback("noit_connection_session_timeout",
1325                         noit_connection_session_timeout);
1326   register_console_streamer_commands();
1327   stratcon_jlog_streamer_reload(toplevel);
1328   stratcon_streamer_connection(toplevel, "", NULL, NULL, NULL, NULL);
1329   assert(noit_http_rest_register_auth(
1330     "GET", "/noits/", "^show$", rest_show_noits,
1331              noit_http_rest_client_cert_auth
1332   ) == 0);
1333   assert(noit_http_rest_register_auth(
1334     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1335              noit_http_rest_client_cert_auth
1336   ) == 0);
1337   assert(noit_http_rest_register_auth(
1338     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1339              noit_http_rest_client_cert_auth
1340   ) == 0);
1341   assert(noit_http_rest_register_auth(
1342     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1343              noit_http_rest_client_cert_auth
1344   ) == 0);
1345   assert(noit_http_rest_register_auth(
1346     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1347              noit_http_rest_client_cert_auth
1348   ) == 0);
1349 }
1350
Note: See TracBrowser for help on using the browser.