root/src/stratcon_jlog_streamer.c

Revision 5360a1ee7f4ade3bd6299e566204ee1820aeffa3, 36.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

hefty patch. First part of stratcond support for horizontal partitioning of storage. refs #150

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "noit_rest.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_jlog_streamer.h"
42 #include "stratcon_iep.h"
43
44 #include <unistd.h>
45 #include <assert.h>
46 #include <errno.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #ifdef HAVE_SYS_FILIO_H
50 #include <sys/filio.h>
51 #endif
52 #include <netinet/in.h>
53 #include <sys/un.h>
54 #include <arpa/inet.h>
55
56 pthread_mutex_t noits_lock;
57 noit_hash_table noits = NOIT_HASH_EMPTY;
58
59 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
60
61 static int
62 remote_str_sort(const void *a, const void *b) {
63   int rv;
64   noit_connection_ctx_t * const *actx = a;
65   noit_connection_ctx_t * const *bctx = b;
66   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
67   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
68   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
69   if(rv) return rv;
70   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
71            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
72 }
73 static void
74 nc_print_noit_conn_brief(noit_console_closure_t ncct,
75                           noit_connection_ctx_t *ctx) {
76   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
77   struct timeval now, diff, session_duration;
78   const char *feedtype = "unknown";
79   const char *lasttime = "never";
80   if(ctx->last_connect.tv_sec != 0) {
81     char cmdbuf[4096];
82     time_t r = ctx->last_connect.tv_sec;
83     struct tm tbuf, *tm;
84     tm = gmtime_r(&r, &tbuf);
85     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
86     lasttime = cmdbuf;
87   }
88   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
89             ctx->remote_cn ? "connected" :
90                              (ctx->timeout_event ? "disconnected" :
91                                                    "connecting"), lasttime);
92   switch(ntohl(jctx->jlog_feed_cmd)) {
93     case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
94     case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
95   }
96   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
97   gettimeofday(&now, NULL);
98   if(ctx->timeout_event) {
99     sub_timeval(ctx->timeout_event->whence, now, &diff);
100     nc_printf(ncct, "\tNext attempt in %llu.%06us\n",
101               (unsigned long long)diff.tv_sec, (unsigned int) diff.tv_usec);
102   }
103   else if(ctx->remote_cn) {
104     nc_printf(ncct, "\tRemote CN: '%s'\n",
105               ctx->remote_cn ? ctx->remote_cn : "???");
106     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
107       struct timeval last;
108       double session_duration_seconds;
109       const char *state = "unknown";
110
111       switch(jctx->state) {
112         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
113         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
114         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
115         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
116         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
117         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
118       }
119       last.tv_sec = jctx->header.tv_sec;
120       last.tv_usec = jctx->header.tv_usec;
121       sub_timeval(now, last, &diff);
122       sub_timeval(now, ctx->last_connect, &session_duration);
123       session_duration_seconds = session_duration.tv_sec +
124                                  (double)session_duration.tv_usec/1000000.0;
125       nc_printf(ncct, "\tState: %s\n"
126                       "\tNext checkpoint: [%08x:%08x]\n"
127                       "\tLast event: %llu.%06us ago\n"
128                       "\tEvents this session: %llu (%0.2f/s)\n"
129                       "\tOctets this session: %llu (%0.2f/s)\n",
130                 state,
131                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
132                 (unsigned long long)diff.tv_sec, (unsigned int)diff.tv_usec,
133                 jctx->total_events,
134                 (double)jctx->total_events/session_duration_seconds,
135                 jctx->total_bytes_read,
136                 (double)jctx->total_bytes_read/session_duration_seconds);
137     }
138     else {
139       nc_printf(ncct, "\tUnknown type.\n");
140     }
141   }
142 }
143
144 jlog_streamer_ctx_t *
145 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
146   jlog_streamer_ctx_t *ctx;
147   ctx = stratcon_jlog_streamer_ctx_alloc();
148   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
149   ctx->push = stratcon_datastore_push;
150   return ctx;
151 }
152 jlog_streamer_ctx_t *
153 stratcon_jlog_streamer_ctx_alloc(void) {
154   jlog_streamer_ctx_t *ctx;
155   ctx = calloc(1, sizeof(*ctx));
156   return ctx;
157 }
158 noit_connection_ctx_t *
159 noit_connection_ctx_alloc(void) {
160   noit_connection_ctx_t *ctx, **pctx;
161   ctx = calloc(1, sizeof(*ctx));
162   ctx->refcnt = 1;
163   pctx = malloc(sizeof(*pctx));
164   *pctx = ctx;
165   pthread_mutex_lock(&noits_lock);
166   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
167   pthread_mutex_unlock(&noits_lock);
168   return ctx;
169 }
170 int
171 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
172                          struct timeval *now) {
173   noit_connection_ctx_t *ctx = closure;
174   ctx->timeout_event = NULL;
175   noit_connection_initiate_connection(closure);
176   return 0;
177 }
178 void
179 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
180                                    struct timeval *now) {
181   struct timeval __now, interval;
182   const char *v;
183   u_int32_t min_interval = 1000, max_interval = 8000;
184   if(ctx->remote_cn) {
185     free(ctx->remote_cn);
186     ctx->remote_cn = NULL;
187   }
188   if(noit_hash_retr_str(ctx->config,
189                         "reconnect_initial_interval",
190                         strlen("reconnect_initial_interval"),
191                         &v)) {
192     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
193   }
194   if(noit_hash_retr_str(ctx->config,
195                         "reconnect_maximum_interval",
196                         strlen("reconnect_maximum_interval"),
197                         &v)) {
198     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
199   }
200   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
201   else {
202     ctx->current_backoff *= 2;
203     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
204     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
205   }
206   if(!now) {
207     gettimeofday(&__now, NULL);
208     now = &__now;
209   }
210   interval.tv_sec = ctx->current_backoff / 1000;
211   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
212   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
213         ctx->current_backoff);
214   if(ctx->timeout_event)
215     eventer_remove(ctx->timeout_event);
216   else
217     ctx->timeout_event = eventer_alloc();
218   ctx->timeout_event->callback = noit_connection_reinitiate;
219   ctx->timeout_event->closure = ctx;
220   ctx->timeout_event->mask = EVENTER_TIMER;
221   add_timeval(*now, interval, &ctx->timeout_event->whence);
222   eventer_add(ctx->timeout_event);
223 }
224 static void
225 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
226   if(ctx->remote_cn) free(ctx->remote_cn);
227   if(ctx->remote_str) free(ctx->remote_str);
228   if(ctx->timeout_event) {
229     eventer_remove(ctx->timeout_event);
230     eventer_free(ctx->timeout_event);
231   }
232   ctx->consumer_free(ctx->consumer_ctx);
233   free(ctx);
234 }
235 void
236 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
237   if(noit_atomic_dec32(&ctx->refcnt) == 0)
238     noit_connection_ctx_free(ctx);
239 }
240 void
241 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
242   noit_connection_ctx_t **pctx = &ctx;
243   pthread_mutex_lock(&noits_lock);
244   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
245                    free, (void (*)(void *))noit_connection_ctx_deref);
246   pthread_mutex_unlock(&noits_lock);
247 }
248 void
249 jlog_streamer_ctx_free(void *cl) {
250   jlog_streamer_ctx_t *ctx = cl;
251   if(ctx->buffer) free(ctx->buffer);
252   free(ctx);
253 }
254
255 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
256 static int
257 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
258   int len, mask;
259   while(ctx->bytes_read < ctx->bytes_expected) {
260     len = Eread(ctx->buffer + ctx->bytes_read,
261                 ctx->bytes_expected - ctx->bytes_read);
262     if(len < 0) {
263       *newmask = mask;
264       return -1;
265     }
266     /* if we get 0 inside SSL, and there was a real error, we
267      * will actually get a -1 here.
268      * if(len == 0) return ctx->bytes_read;
269      */
270     ctx->total_bytes_read += len;
271     ctx->bytes_read += len;
272   }
273   assert(ctx->bytes_read == ctx->bytes_expected);
274   return ctx->bytes_read;
275 }
276 #define FULLREAD(e,ctx,size) do { \
277   int mask, len; \
278   if(!ctx->bytes_expected) { \
279     ctx->bytes_expected = size; \
280     if(ctx->buffer) free(ctx->buffer); \
281     ctx->buffer = malloc(size + 1); \
282     if(ctx->buffer == NULL) { \
283       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
284       goto socket_error; \
285     } \
286     ctx->buffer[size] = '\0'; \
287   } \
288   len = __read_on_ctx(e, ctx, &mask); \
289   if(len < 0) { \
290     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
291     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
292     goto socket_error; \
293   } \
294   ctx->bytes_read = 0; \
295   ctx->bytes_expected = 0; \
296   if(len != size) { \
297     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
298           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
299     goto socket_error; \
300   } \
301 } while(0)
302
303 int
304 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
305                            struct timeval *now) {
306   noit_connection_ctx_t *nctx = closure;
307   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
308   jlog_streamer_ctx_t dummy;
309   int len;
310   jlog_id n_chkpt;
311
312   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
313     if(write(e->fd, e, 0) == -1)
314       noitL(noit_error, "socket error: %s\n", strerror(errno));
315  socket_error:
316     ctx->state = JLOG_STREAMER_WANT_INITIATE;
317     ctx->count = 0;
318     ctx->bytes_read = 0;
319     ctx->bytes_expected = 0;
320     if(ctx->buffer) free(ctx->buffer);
321     ctx->buffer = NULL;
322     noit_connection_schedule_reattempt(nctx, now);
323     eventer_remove_fd(e->fd);
324     e->opset->close(e->fd, &mask, e);
325     return 0;
326   }
327
328   while(1) {
329     switch(ctx->state) {
330       case JLOG_STREAMER_WANT_INITIATE:
331         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
332                               sizeof(ctx->jlog_feed_cmd),
333                               &mask, e);
334         if(len < 0) {
335           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
336           goto socket_error;
337         }
338         if(len != sizeof(ctx->jlog_feed_cmd)) {
339           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
340                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
341           goto socket_error;
342         }
343         ctx->state = JLOG_STREAMER_WANT_COUNT;
344         break;
345
346       case JLOG_STREAMER_WANT_COUNT:
347         FULLREAD(e, ctx, sizeof(u_int32_t));
348         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
349         ctx->count = ntohl(dummy.count);
350         free(ctx->buffer); ctx->buffer = NULL;
351         ctx->state = JLOG_STREAMER_WANT_HEADER;
352         break;
353
354       case JLOG_STREAMER_WANT_HEADER:
355         if(ctx->count == 0) {
356           ctx->state = JLOG_STREAMER_WANT_COUNT;
357           break;
358         }
359         FULLREAD(e, ctx, sizeof(ctx->header));
360         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
361         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
362         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
363         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
364         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
365         ctx->header.message_len = ntohl(dummy.header.message_len);
366         free(ctx->buffer); ctx->buffer = NULL;
367         ctx->state = JLOG_STREAMER_WANT_BODY;
368         break;
369
370       case JLOG_STREAMER_WANT_BODY:
371         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
372         if(ctx->header.message_len > 0)
373           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
374                     ctx->buffer, NULL);
375         else if(ctx->buffer)
376           free(ctx->buffer);
377         /* Don't free the buffer, it's used by the datastore process. */
378         ctx->buffer = NULL;
379         ctx->count--;
380         ctx->total_events++;
381         if(ctx->count == 0) {
382           eventer_t completion_e;
383           eventer_remove_fd(e->fd);
384           completion_e = eventer_alloc();
385           memcpy(completion_e, e, sizeof(*e));
386           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
387           ctx->state = JLOG_STREAMER_IS_ASYNC;
388           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
389                     NULL, completion_e);
390           noitL(noit_debug, "Pushing batch asynch...\n");
391           return 0;
392         } else
393           ctx->state = JLOG_STREAMER_WANT_HEADER;
394         break;
395
396       case JLOG_STREAMER_IS_ASYNC:
397         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
398       case JLOG_STREAMER_WANT_CHKPT:
399         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
400               ctx->header.chkpt.log, ctx->header.chkpt.marker);
401         n_chkpt.log = htonl(ctx->header.chkpt.log);
402         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
403
404         /* screw short writes.  I'd rather die than not write my data! */
405         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
406                               &mask, e);
407         if(len < 0) {
408           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
409           goto socket_error;
410         }
411         if(len != sizeof(jlog_id)) {
412           noitL(noit_error, "short write on checkpointing stream.\n");
413           goto socket_error;
414         }
415         ctx->state = JLOG_STREAMER_WANT_COUNT;
416         break;
417     }
418   }
419   /* never get here */
420 }
421
422 int
423 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
424                             struct timeval *now) {
425   noit_connection_ctx_t *nctx = closure;
426   int rv;
427
428   rv = eventer_SSL_connect(e, &mask);
429   if(rv > 0) {
430     eventer_ssl_ctx_t *sslctx;
431     e->callback = nctx->consumer_callback;
432     /* We must make a copy of the acceptor_closure_t for each new
433      * connection.
434      */
435     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
436       const char *cn, *end;
437       cn = eventer_ssl_get_peer_subject(sslctx);
438       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
439         cn += 3;
440         end = cn;
441         while(*end && *end != '/') end++;
442         nctx->remote_cn = malloc(end - cn + 1);
443         memcpy(nctx->remote_cn, cn, end - cn);
444         nctx->remote_cn[end-cn] = '\0';
445       }
446     }
447     return e->callback(e, mask, e->closure, now);
448   }
449   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
450
451   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
452   eventer_remove_fd(e->fd);
453   e->opset->close(e->fd, &mask, e);
454   noit_connection_schedule_reattempt(nctx, now);
455   return 0;
456 }
457 int
458 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
459                                  struct timeval *now) {
460   noit_connection_ctx_t *nctx = closure;
461   const char *cert, *key, *ca, *ciphers;
462   char remote_str[128], tmp_str[128];
463   eventer_ssl_ctx_t *sslctx;
464   int aerrno, len;
465   socklen_t aerrno_len = sizeof(aerrno);
466
467   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
468     if(aerrno != 0) goto connect_error;
469   aerrno = 0;
470
471   if(mask & EVENTER_EXCEPTION) {
472     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
473       aerrno = errno;
474  connect_error:
475     switch(nctx->r.remote.sa_family) {
476       case AF_INET:
477         len = sizeof(struct sockaddr_in);
478         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
479                   tmp_str, len);
480         snprintf(remote_str, sizeof(remote_str), "%s:%d",
481                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
482         break;
483       case AF_INET6:
484         len = sizeof(struct sockaddr_in6);
485         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
486                   tmp_str, len);
487         snprintf(remote_str, sizeof(remote_str), "%s:%d",
488                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
489        break;
490       case AF_UNIX:
491         len = SUN_LEN(&(nctx->r.remote_un));
492         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
493         break;
494       default:
495         snprintf(remote_str, sizeof(remote_str), "(unknown)");
496     }
497     noitL(noit_error, "Error connecting to %s: %s\n",
498           remote_str, strerror(aerrno));
499     eventer_remove_fd(e->fd);
500     e->opset->close(e->fd, &mask, e);
501     noit_connection_schedule_reattempt(nctx, now);
502     return 0;
503   }
504
505 #define SSLCONFGET(var,name) do { \
506   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
507                          &var)) var = NULL; } while(0)
508   SSLCONFGET(cert, "certificate_file");
509   SSLCONFGET(key, "key_file");
510   SSLCONFGET(ca, "ca_chain");
511   SSLCONFGET(ciphers, "ciphers");
512   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
513   if(!sslctx) goto connect_error;
514
515   memcpy(&nctx->last_connect, now, sizeof(*now));
516   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
517                              nctx->sslconfig);
518   EVENTER_ATTACH_SSL(e, sslctx);
519   e->callback = noit_connection_ssl_upgrade;
520   return e->callback(e, mask, closure, now);
521 }
522 static void
523 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
524   struct timeval __now;
525   eventer_t e;
526   int rv, fd = -1;
527 #ifdef SO_KEEPALIVE
528   int optval;
529   socklen_t optlen = sizeof(optval);
530 #endif
531
532   if(nctx->wants_permanent_shutdown) {
533     noit_connection_ctx_dealloc(nctx);
534     return;
535   }
536   /* Open a socket */
537   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
538   if(fd < 0) goto reschedule;
539
540   /* Make it non-blocking */
541   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
542 #define set_or_bail(type, opt, val) do { \
543   optval = val; \
544   optlen = sizeof(optval); \
545   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
546     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
547           strerror(errno)); \
548     goto reschedule; \
549   } \
550 } while(0)
551 #ifdef SO_KEEPALIVE
552   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
553 #endif
554 #ifdef TCP_KEEPALIVE_THRESHOLD
555   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
556 #endif
557 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
558   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
559 #endif
560 #ifdef TCP_CONN_NOTIFY_THRESHOLD
561   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
562 #endif
563 #ifdef TCP_CONN_ABORT_THRESHOLD
564   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
565 #endif
566
567   /* Initiate a connection */
568   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
569   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
570
571   /* Register a handler for connection completion */
572   e = eventer_alloc();
573   e->fd = fd;
574   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
575   e->callback = noit_connection_complete_connect;
576   e->closure = nctx;
577   eventer_add(e);
578   return;
579
580  reschedule:
581   if(fd >= 0) close(fd);
582   gettimeofday(&__now, NULL);
583   noit_connection_schedule_reattempt(nctx, &__now);
584   return;
585 }
586
587 int
588 initiate_noit_connection(const char *host, unsigned short port,
589                          noit_hash_table *sslconfig, noit_hash_table *config,
590                          eventer_func_t handler, void *closure,
591                          void (*freefunc)(void *)) {
592   noit_connection_ctx_t *ctx;
593
594   int8_t family;
595   int rv;
596   union {
597     struct in_addr addr4;
598     struct in6_addr addr6;
599   } a;
600
601   if(host[0] == '/') {
602     family = AF_UNIX;
603   }
604   else {
605     family = AF_INET;
606     rv = inet_pton(family, host, &a);
607     if(rv != 1) {
608       family = AF_INET6;
609       rv = inet_pton(family, host, &a);
610       if(rv != 1) {
611         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
612         return -1;
613       }
614     }
615   }
616
617   ctx = noit_connection_ctx_alloc();
618   ctx->remote_str = calloc(1, strlen(host) + 7);
619   snprintf(ctx->remote_str, strlen(host) + 7,
620            "%s:%d", host, port);
621  
622   memset(&ctx->r, 0, sizeof(ctx->r));
623   if(family == AF_UNIX) {
624     struct sockaddr_un *s = &ctx->r.remote_un;
625     s->sun_family = AF_UNIX;
626     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
627     ctx->remote_len = sizeof(*s);
628   }
629   else if(family == AF_INET) {
630     struct sockaddr_in *s = &ctx->r.remote_in;
631     s->sin_family = family;
632     s->sin_port = htons(port);
633     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
634     ctx->remote_len = sizeof(*s);
635   }
636   else {
637     struct sockaddr_in6 *s = &ctx->r.remote_in6;
638     s->sin6_family = family;
639     s->sin6_port = htons(port);
640     memcpy(&s->sin6_addr, &a, sizeof(a));
641     ctx->remote_len = sizeof(*s);
642   }
643
644   if(ctx->sslconfig)
645     noit_hash_delete_all(ctx->sslconfig, free, free);
646   else
647     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
648   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
649   if(ctx->config)
650     noit_hash_delete_all(ctx->config, free, free);
651   else
652     ctx->config = calloc(1, sizeof(noit_hash_table));
653   noit_hash_merge_as_dict(ctx->config, config);
654
655   ctx->consumer_callback = handler;
656   ctx->consumer_free = freefunc;
657   ctx->consumer_ctx = closure;
658   noit_connection_initiate_connection(ctx);
659   return 0;
660 }
661
662 void
663 stratcon_streamer_connection(const char *toplevel, const char *destination,
664                              eventer_func_t handler,
665                              void *(*handler_alloc)(void), void *handler_ctx,
666                              void (*handler_free)(void *)) {
667   int i, cnt = 0;
668   noit_conf_section_t *noit_configs;
669   char path[256];
670
671   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
672   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
673   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
674   for(i=0; i<cnt; i++) {
675     char address[256];
676     unsigned short port;
677     int portint;
678     noit_hash_table *sslconfig, *config;
679
680     if(!noit_conf_get_stringbuf(noit_configs[i],
681                                 "ancestor-or-self::node()/@address",
682                                 address, sizeof(address))) {
683       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
684       continue;
685     }
686     /* if destination is specified, exact match it */
687     if(destination && strcmp(address, destination)) continue;
688
689     if(!noit_conf_get_int(noit_configs[i],
690                           "ancestor-or-self::node()/@port", &portint))
691       portint = 0;
692     port = (unsigned short) portint;
693     if(address[0] != '/' && (portint == 0 || (port != portint))) {
694       /* UNIX sockets don't require a port (they'll ignore it if specified */
695       noitL(noit_stderr,
696             "Invalid port [%d] specified in stanza %d\n", port, i+1);
697       continue;
698     }
699     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
700     config = noit_conf_get_hash(noit_configs[i], "config");
701
702     noitL(noit_error, "initiating to %s\n", address);
703     initiate_noit_connection(address, port, sslconfig, config,
704                              handler,
705                              handler_alloc ? handler_alloc() : handler_ctx,
706                              handler_free);
707     noit_hash_destroy(sslconfig,free,free);
708     free(sslconfig);
709     noit_hash_destroy(config,free,free);
710     free(config);
711   }
712   free(noit_configs);
713 }
714 void
715 stratcon_jlog_streamer_reload(const char *toplevel) {
716   stratcon_streamer_connection(toplevel, NULL,
717                                stratcon_jlog_recv_handler,
718                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
719                                NULL,
720                                jlog_streamer_ctx_free);
721 }
722
723 static int
724 stratcon_console_show_noits(noit_console_closure_t ncct,
725                             int argc, char **argv,
726                             noit_console_state_t *dstate,
727                             void *closure) {
728   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
729   uuid_t key_id;
730   int klen, n = 0, i;
731   void *vconn;
732   noit_connection_ctx_t **ctx;
733
734   pthread_mutex_lock(&noits_lock);
735   ctx = malloc(sizeof(*ctx) * noits.size);
736   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
737                        &vconn)) {
738     ctx[n] = (noit_connection_ctx_t *)vconn;
739     noit_atomic_inc32(&ctx[n]->refcnt);
740     n++;
741   }
742   pthread_mutex_unlock(&noits_lock);
743   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
744   for(i=0; i<n; i++) {
745     nc_print_noit_conn_brief(ncct, ctx[i]);
746     noit_connection_ctx_deref(ctx[i]);
747   }
748   free(ctx);
749   return 0;
750 }
751
752 static int
753 rest_show_noits(noit_http_rest_closure_t *restc,
754                 int npats, char **pats) {
755   xmlDocPtr doc;
756   xmlNodePtr root;
757   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
758   uuid_t key_id;
759   int klen, n = 0, i;
760   void *vconn;
761   noit_connection_ctx_t **ctxs;
762   struct timeval now, diff, last;
763   gettimeofday(&now, NULL);
764
765   pthread_mutex_lock(&noits_lock);
766   ctxs = malloc(sizeof(*ctxs) * noits.size);
767   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
768                        &vconn)) {
769     ctxs[n] = (noit_connection_ctx_t *)vconn;
770     noit_atomic_inc32(&ctxs[n]->refcnt);
771     n++;
772   }
773   pthread_mutex_unlock(&noits_lock);
774   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
775
776   doc = xmlNewDoc((xmlChar *)"1.0");
777   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
778   xmlDocSetRootElement(doc, root);
779   for(i=0; i<n; i++) {
780     char buff[256], *feedtype = "unknown", *state = "unknown";
781     xmlNodePtr node;
782     noit_connection_ctx_t *ctx = ctxs[i];
783     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
784
785     node = xmlNewNode(NULL, (xmlChar *)"noit");
786     snprintf(buff, sizeof(buff), "%llu.%06d",
787              (long long unsigned)ctx->last_connect.tv_sec,
788              (int)ctx->last_connect.tv_usec);
789     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
790     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
791                (xmlChar *)"connected" :
792                (ctx->timeout_event ? (xmlChar *)"disconnected" :
793                                     (xmlChar *)"connecting"));
794     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
795     switch(ntohl(jctx->jlog_feed_cmd)) {
796       case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
797       case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
798     }
799     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
800     if(ctx->timeout_event) {
801       sub_timeval(ctx->timeout_event->whence, now, &diff);
802       snprintf(buff, sizeof(buff), "%llu.%06d",
803                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
804       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
805     }
806     else if(ctx->remote_cn) {
807       if(ctx->remote_cn)
808         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
809  
810       switch(jctx->state) {
811         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
812         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
813         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
814         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
815         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
816         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
817       }
818       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
819       snprintf(buff, sizeof(buff), "%08x:%08x",
820                jctx->header.chkpt.log, jctx->header.chkpt.marker);
821       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
822       snprintf(buff, sizeof(buff), "%llu",
823                (unsigned long long)jctx->total_events);
824       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
825       snprintf(buff, sizeof(buff), "%llu",
826                (unsigned long long)jctx->total_bytes_read);
827       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
828  
829       sub_timeval(now, ctx->last_connect, &diff);
830       snprintf(buff, sizeof(buff), "%llu.%06d",
831                (unsigned long long)diff.tv_sec, (int)diff.tv_usec);
832       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
833  
834       if(jctx->header.tv_sec) {
835         last.tv_sec = jctx->header.tv_sec;
836         last.tv_usec = jctx->header.tv_usec;
837         snprintf(buff, sizeof(buff), "%llu.%06d",
838                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
839         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
840         sub_timeval(now, last, &diff);
841         snprintf(buff, sizeof(buff), "%llu.%06d",
842                  (unsigned long long)diff.tv_sec, (int)diff.tv_usec);
843         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
844       }
845     }
846
847     xmlAddChild(root, node);
848     noit_connection_ctx_deref(ctx);
849   }
850   free(ctxs);
851
852   noit_http_response_ok(restc->http_ctx, "text/xml");
853   noit_http_response_xml(restc->http_ctx, doc);
854   noit_http_response_end(restc->http_ctx);
855   xmlFreeDoc(doc);
856   return 0;
857 }
858 static int
859 stratcon_add_noit(const char *target, unsigned short port) {
860   int cnt;
861   char path[256];
862   char port_str[6];
863   noit_conf_section_t *noit_configs, parent;
864   xmlNodePtr newnoit;
865
866   snprintf(path, sizeof(path),
867            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
868   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
869   free(noit_configs);
870   if(cnt != 0) return 0;
871
872   parent = noit_conf_get_section(NULL, "//noits");
873   if(!parent) return 0;
874   snprintf(port_str, sizeof(port_str), "%d", port);
875   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
876   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
877   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
878   xmlAddChild(parent, newnoit);
879   stratcon_streamer_connection(NULL, target,
880                                stratcon_jlog_recv_handler,
881                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
882                                NULL,
883                                jlog_streamer_ctx_free);
884   stratcon_streamer_connection(NULL, target,
885                                stratcon_jlog_recv_handler,
886                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
887                                NULL,
888                                jlog_streamer_ctx_free);
889   return 1;
890 }
891 static int
892 stratcon_remove_noit(const char *target, unsigned short port) {
893   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
894   uuid_t key_id;
895   int klen, n = 0, i, cnt = 0;
896   void *vconn;
897   noit_connection_ctx_t **ctx;
898   noit_conf_section_t *noit_configs;
899   char path[256];
900   char remote_str[256];
901
902   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
903
904   snprintf(path, sizeof(path),
905            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
906   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
907   for(i=0; i<cnt; i++) {
908     xmlUnlinkNode(noit_configs[i]);
909     xmlFreeNode(noit_configs[i]);
910   }
911   free(noit_configs);
912
913   pthread_mutex_lock(&noits_lock);
914   ctx = malloc(sizeof(*ctx) * noits.size);
915   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
916                        &vconn)) {
917     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
918       ctx[n] = (noit_connection_ctx_t *)vconn;
919       noit_atomic_inc32(&ctx[n]->refcnt);
920       n++;
921     }
922   }
923   pthread_mutex_unlock(&noits_lock);
924   for(i=0; i<n; i++) {
925     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
926     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
927   }
928   free(ctx);
929   return n;
930 }
931 static int
932 rest_set_noit(noit_http_rest_closure_t *restc,
933               int npats, char **pats) {
934   noit_http_session_ctx *ctx = restc->http_ctx;
935   unsigned short port = 43191;
936   if(npats < 1 || npats > 2)
937     noit_http_response_server_error(ctx, "text/xml");
938   if(npats == 2) port = atoi(pats[1]);
939   if(stratcon_add_noit(pats[0], port))
940     noit_http_response_ok(ctx, "text/xml");
941   else
942     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
943   if(noit_conf_write_file(NULL) != 0)
944     noitL(noit_error, "local config write failed\n");
945   noit_conf_mark_changed();
946   noit_http_response_end(ctx);
947   return 0;
948 }
949 static int
950 rest_delete_noit(noit_http_rest_closure_t *restc,
951                  int npats, char **pats) {
952   noit_http_session_ctx *ctx = restc->http_ctx;
953   unsigned short port = 43191;
954   if(npats < 1 || npats > 2)
955     noit_http_response_server_error(ctx, "text/xml");
956   if(npats == 2) port = atoi(pats[1]);
957   if(stratcon_remove_noit(pats[0], port))
958     noit_http_response_ok(ctx, "text/xml");
959   else
960     noit_http_response_not_found(ctx, "text/xml");
961   if(noit_conf_write_file(NULL) != 0)
962     noitL(noit_error, "local config write failed\n");
963   noit_conf_mark_changed();
964   noit_http_response_end(ctx);
965   return 0;
966 }
967 static int
968 stratcon_console_conf_noits(noit_console_closure_t ncct,
969                             int argc, char **argv,
970                             noit_console_state_t *dstate,
971                             void *closure) {
972   char *cp, target[128];
973   unsigned short port = 43191;
974   int adding = (int)closure;
975   if(argc != 1)
976     return -1;
977
978   cp = strchr(argv[0], ':');
979   if(cp) {
980     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
981     port = atoi(cp+1);
982   }
983   else strlcpy(target, argv[0], sizeof(target));
984   if(adding) {
985     if(stratcon_add_noit(target, port)) {
986       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
987     }
988     else {
989       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
990     }
991   }
992   else {
993     if(stratcon_remove_noit(target, port)) {
994       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
995     }
996     else {
997       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
998     }
999   }
1000   return 0;
1001 }
1002
1003 static void
1004 register_console_streamer_commands() {
1005   noit_console_state_t *tl;
1006   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1007
1008   tl = noit_console_state_initial();
1009   showcmd = noit_console_state_get_cmd(tl, "show");
1010   assert(showcmd && showcmd->dstate);
1011   confcmd = noit_console_state_get_cmd(tl, "configure");
1012   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1013   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1014
1015   noit_console_state_add_cmd(conftcmd->dstate,
1016     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1017   noit_console_state_add_cmd(conftnocmd->dstate,
1018     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1019
1020   noit_console_state_add_cmd(showcmd->dstate,
1021     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1022 }
1023
1024 void
1025 stratcon_jlog_streamer_init(const char *toplevel) {
1026   pthread_mutex_init(&noits_lock, NULL);
1027   eventer_name_callback("noit_connection_reinitiate",
1028                         noit_connection_reinitiate);
1029   eventer_name_callback("stratcon_jlog_recv_handler",
1030                         stratcon_jlog_recv_handler);
1031   eventer_name_callback("noit_connection_ssl_upgrade",
1032                         noit_connection_ssl_upgrade);
1033   eventer_name_callback("noit_connection_complete_connect",
1034                         noit_connection_complete_connect);
1035   register_console_streamer_commands();
1036   stratcon_jlog_streamer_reload(toplevel);
1037   assert(noit_http_rest_register_auth(
1038     "GET", "/noits/", "^show$", rest_show_noits,
1039              noit_http_rest_client_cert_auth
1040   ) == 0);
1041   assert(noit_http_rest_register_auth(
1042     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1043              noit_http_rest_client_cert_auth
1044   ) == 0);
1045   assert(noit_http_rest_register_auth(
1046     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1047              noit_http_rest_client_cert_auth
1048   ) == 0);
1049   assert(noit_http_rest_register_auth(
1050     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1051              noit_http_rest_client_cert_auth
1052   ) == 0);
1053   assert(noit_http_rest_register_auth(
1054     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1055              noit_http_rest_client_cert_auth
1056   ) == 0);
1057 }
1058
Note: See TracBrowser for help on using the browser.