root/src/stratcon_jlog_streamer.c

Revision 1da42f7ee53fe062b95962b97d885d16c7363589, 37.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

fixes #202 flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "noit_rest.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_jlog_streamer.h"
42 #include "stratcon_iep.h"
43
44 #include <unistd.h>
45 #include <assert.h>
46 #include <errno.h>
47 #include <sys/types.h>
48 #include <sys/socket.h>
49 #ifdef HAVE_SYS_FILIO_H
50 #include <sys/filio.h>
51 #endif
52 #include <netinet/in.h>
53 #include <sys/un.h>
54 #include <arpa/inet.h>
55
56 pthread_mutex_t noits_lock;
57 noit_hash_table noits = NOIT_HASH_EMPTY;
58
59 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
60
61 static int
62 remote_str_sort(const void *a, const void *b) {
63   int rv;
64   noit_connection_ctx_t * const *actx = a;
65   noit_connection_ctx_t * const *bctx = b;
66   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
67   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
68   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
69   if(rv) return rv;
70   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
71            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
72 }
73 static void
74 nc_print_noit_conn_brief(noit_console_closure_t ncct,
75                           noit_connection_ctx_t *ctx) {
76   jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
77   struct timeval now, diff, session_duration;
78   const char *feedtype = "unknown";
79   const char *lasttime = "never";
80   if(ctx->last_connect.tv_sec != 0) {
81     char cmdbuf[4096];
82     time_t r = ctx->last_connect.tv_sec;
83     struct tm tbuf, *tm;
84     tm = gmtime_r(&r, &tbuf);
85     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
86     lasttime = cmdbuf;
87   }
88   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
89             ctx->remote_cn ? "connected" :
90                              (ctx->timeout_event ? "disconnected" :
91                                                    "connecting"), lasttime);
92   switch(ntohl(jctx->jlog_feed_cmd)) {
93     case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
94     case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
95   }
96   nc_printf(ncct, "\tJLog event streamer [%s]\n", feedtype);
97   gettimeofday(&now, NULL);
98   if(ctx->timeout_event) {
99     sub_timeval(ctx->timeout_event->whence, now, &diff);
100     nc_printf(ncct, "\tNext attempt in %lld.%06us\n",
101               (long long)diff.tv_sec, (unsigned int) diff.tv_usec);
102   }
103   else if(ctx->remote_cn) {
104     nc_printf(ncct, "\tRemote CN: '%s'\n",
105               ctx->remote_cn ? ctx->remote_cn : "???");
106     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
107       struct timeval last;
108       double session_duration_seconds;
109       const char *state = "unknown";
110
111       switch(jctx->state) {
112         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
113         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
114         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
115         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
116         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
117         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
118         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
119       }
120       last.tv_sec = jctx->header.tv_sec;
121       last.tv_usec = jctx->header.tv_usec;
122       sub_timeval(now, last, &diff);
123       sub_timeval(now, ctx->last_connect, &session_duration);
124       session_duration_seconds = session_duration.tv_sec +
125                                  (double)session_duration.tv_usec/1000000.0;
126       nc_printf(ncct, "\tState: %s\n"
127                       "\tNext checkpoint: [%08x:%08x]\n"
128                       "\tLast event: %lld.%06us ago\n"
129                       "\tEvents this session: %llu (%0.2f/s)\n"
130                       "\tOctets this session: %llu (%0.2f/s)\n",
131                 state,
132                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
133                 (long long)diff.tv_sec, (unsigned int)diff.tv_usec,
134                 jctx->total_events,
135                 (double)jctx->total_events/session_duration_seconds,
136                 jctx->total_bytes_read,
137                 (double)jctx->total_bytes_read/session_duration_seconds);
138     }
139     else {
140       nc_printf(ncct, "\tUnknown type.\n");
141     }
142   }
143 }
144
145 jlog_streamer_ctx_t *
146 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
147   jlog_streamer_ctx_t *ctx;
148   ctx = stratcon_jlog_streamer_ctx_alloc();
149   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
150   ctx->push = stratcon_datastore_push;
151   return ctx;
152 }
153 jlog_streamer_ctx_t *
154 stratcon_jlog_streamer_ctx_alloc(void) {
155   jlog_streamer_ctx_t *ctx;
156   ctx = calloc(1, sizeof(*ctx));
157   return ctx;
158 }
159 noit_connection_ctx_t *
160 noit_connection_ctx_alloc(void) {
161   noit_connection_ctx_t *ctx, **pctx;
162   ctx = calloc(1, sizeof(*ctx));
163   ctx->refcnt = 1;
164   pctx = malloc(sizeof(*pctx));
165   *pctx = ctx;
166   pthread_mutex_lock(&noits_lock);
167   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
168   pthread_mutex_unlock(&noits_lock);
169   return ctx;
170 }
171 int
172 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
173                          struct timeval *now) {
174   noit_connection_ctx_t *ctx = closure;
175   ctx->timeout_event = NULL;
176   noit_connection_initiate_connection(closure);
177   return 0;
178 }
179 void
180 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
181                                    struct timeval *now) {
182   struct timeval __now, interval;
183   const char *v;
184   u_int32_t min_interval = 1000, max_interval = 8000;
185   if(ctx->remote_cn) {
186     free(ctx->remote_cn);
187     ctx->remote_cn = NULL;
188   }
189   if(noit_hash_retr_str(ctx->config,
190                         "reconnect_initial_interval",
191                         strlen("reconnect_initial_interval"),
192                         &v)) {
193     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
194   }
195   if(noit_hash_retr_str(ctx->config,
196                         "reconnect_maximum_interval",
197                         strlen("reconnect_maximum_interval"),
198                         &v)) {
199     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
200   }
201   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
202   else {
203     ctx->current_backoff *= 2;
204     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
205     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
206   }
207   if(!now) {
208     gettimeofday(&__now, NULL);
209     now = &__now;
210   }
211   interval.tv_sec = ctx->current_backoff / 1000;
212   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
213   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
214         ctx->current_backoff);
215   if(ctx->timeout_event)
216     eventer_remove(ctx->timeout_event);
217   else
218     ctx->timeout_event = eventer_alloc();
219   ctx->timeout_event->callback = noit_connection_reinitiate;
220   ctx->timeout_event->closure = ctx;
221   ctx->timeout_event->mask = EVENTER_TIMER;
222   add_timeval(*now, interval, &ctx->timeout_event->whence);
223   eventer_add(ctx->timeout_event);
224 }
225 static void
226 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
227   if(ctx->remote_cn) free(ctx->remote_cn);
228   if(ctx->remote_str) free(ctx->remote_str);
229   if(ctx->timeout_event) {
230     eventer_remove(ctx->timeout_event);
231     eventer_free(ctx->timeout_event);
232   }
233   ctx->consumer_free(ctx->consumer_ctx);
234   free(ctx);
235 }
236 void
237 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
238   if(noit_atomic_dec32(&ctx->refcnt) == 0)
239     noit_connection_ctx_free(ctx);
240 }
241 void
242 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
243   noit_connection_ctx_t **pctx = &ctx;
244   pthread_mutex_lock(&noits_lock);
245   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
246                    free, (void (*)(void *))noit_connection_ctx_deref);
247   pthread_mutex_unlock(&noits_lock);
248 }
249 void
250 jlog_streamer_ctx_free(void *cl) {
251   jlog_streamer_ctx_t *ctx = cl;
252   if(ctx->buffer) free(ctx->buffer);
253   free(ctx);
254 }
255
256 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
257 static int
258 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
259   int len, mask;
260   while(ctx->bytes_read < ctx->bytes_expected) {
261     len = Eread(ctx->buffer + ctx->bytes_read,
262                 ctx->bytes_expected - ctx->bytes_read);
263     if(len < 0) {
264       *newmask = mask;
265       return -1;
266     }
267     /* if we get 0 inside SSL, and there was a real error, we
268      * will actually get a -1 here.
269      * if(len == 0) return ctx->bytes_read;
270      */
271     ctx->total_bytes_read += len;
272     ctx->bytes_read += len;
273   }
274   assert(ctx->bytes_read == ctx->bytes_expected);
275   return ctx->bytes_read;
276 }
277 #define FULLREAD(e,ctx,size) do { \
278   int mask, len; \
279   if(!ctx->bytes_expected) { \
280     ctx->bytes_expected = size; \
281     if(ctx->buffer) free(ctx->buffer); \
282     ctx->buffer = malloc(size + 1); \
283     if(ctx->buffer == NULL) { \
284       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
285       goto socket_error; \
286     } \
287     ctx->buffer[size] = '\0'; \
288   } \
289   len = __read_on_ctx(e, ctx, &mask); \
290   if(len < 0) { \
291     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
292     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
293     goto socket_error; \
294   } \
295   ctx->bytes_read = 0; \
296   ctx->bytes_expected = 0; \
297   if(len != size) { \
298     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
299           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
300     goto socket_error; \
301   } \
302 } while(0)
303
304 int
305 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
306                            struct timeval *now) {
307   noit_connection_ctx_t *nctx = closure;
308   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
309   jlog_streamer_ctx_t dummy;
310   int len;
311   jlog_id n_chkpt;
312
313   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
314     if(write(e->fd, e, 0) == -1)
315       noitL(noit_error, "socket error: %s\n", strerror(errno));
316  socket_error:
317     ctx->state = JLOG_STREAMER_WANT_INITIATE;
318     ctx->count = 0;
319     ctx->bytes_read = 0;
320     ctx->bytes_expected = 0;
321     if(ctx->buffer) free(ctx->buffer);
322     ctx->buffer = NULL;
323     noit_connection_schedule_reattempt(nctx, now);
324     eventer_remove_fd(e->fd);
325     e->opset->close(e->fd, &mask, e);
326     return 0;
327   }
328
329   while(1) {
330     switch(ctx->state) {
331       case JLOG_STREAMER_WANT_INITIATE:
332         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
333                               sizeof(ctx->jlog_feed_cmd),
334                               &mask, e);
335         if(len < 0) {
336           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
337           goto socket_error;
338         }
339         if(len != sizeof(ctx->jlog_feed_cmd)) {
340           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
341                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
342           goto socket_error;
343         }
344         ctx->state = JLOG_STREAMER_WANT_COUNT;
345         break;
346
347       case JLOG_STREAMER_WANT_ERROR:
348         FULLREAD(e, ctx, 0 - ctx->count);
349         noitL(noit_error, "[%s] %.*s\n", nctx->remote_str,
350               0 - ctx->count, ctx->buffer);
351         free(ctx->buffer); ctx->buffer = NULL;
352         goto socket_error;
353         break;
354
355       case JLOG_STREAMER_WANT_COUNT:
356         FULLREAD(e, ctx, sizeof(u_int32_t));
357         memcpy(&dummy.count, ctx->buffer, sizeof(u_int32_t));
358         ctx->count = ntohl(dummy.count);
359         free(ctx->buffer); ctx->buffer = NULL;
360         if(ctx->count < 0)
361           ctx->state = JLOG_STREAMER_WANT_ERROR;
362         else
363           ctx->state = JLOG_STREAMER_WANT_HEADER;
364         break;
365
366       case JLOG_STREAMER_WANT_HEADER:
367         if(ctx->count == 0) {
368           ctx->state = JLOG_STREAMER_WANT_COUNT;
369           break;
370         }
371         FULLREAD(e, ctx, sizeof(ctx->header));
372         memcpy(&dummy.header, ctx->buffer, sizeof(ctx->header));
373         ctx->header.chkpt.log = ntohl(dummy.header.chkpt.log);
374         ctx->header.chkpt.marker = ntohl(dummy.header.chkpt.marker);
375         ctx->header.tv_sec = ntohl(dummy.header.tv_sec);
376         ctx->header.tv_usec = ntohl(dummy.header.tv_usec);
377         ctx->header.message_len = ntohl(dummy.header.message_len);
378         free(ctx->buffer); ctx->buffer = NULL;
379         ctx->state = JLOG_STREAMER_WANT_BODY;
380         break;
381
382       case JLOG_STREAMER_WANT_BODY:
383         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
384         if(ctx->header.message_len > 0)
385           ctx->push(DS_OP_INSERT, &nctx->r.remote, nctx->remote_cn,
386                     ctx->buffer, NULL);
387         else if(ctx->buffer)
388           free(ctx->buffer);
389         /* Don't free the buffer, it's used by the datastore process. */
390         ctx->buffer = NULL;
391         ctx->count--;
392         ctx->total_events++;
393         if(ctx->count == 0) {
394           eventer_t completion_e;
395           eventer_remove_fd(e->fd);
396           completion_e = eventer_alloc();
397           memcpy(completion_e, e, sizeof(*e));
398           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
399           ctx->state = JLOG_STREAMER_IS_ASYNC;
400           ctx->push(DS_OP_CHKPT, &nctx->r.remote, nctx->remote_cn,
401                     NULL, completion_e);
402           noitL(noit_debug, "Pushing batch asynch...\n");
403           return 0;
404         } else
405           ctx->state = JLOG_STREAMER_WANT_HEADER;
406         break;
407
408       case JLOG_STREAMER_IS_ASYNC:
409         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
410       case JLOG_STREAMER_WANT_CHKPT:
411         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
412               ctx->header.chkpt.log, ctx->header.chkpt.marker);
413         n_chkpt.log = htonl(ctx->header.chkpt.log);
414         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
415
416         /* screw short writes.  I'd rather die than not write my data! */
417         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
418                               &mask, e);
419         if(len < 0) {
420           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
421           goto socket_error;
422         }
423         if(len != sizeof(jlog_id)) {
424           noitL(noit_error, "short write on checkpointing stream.\n");
425           goto socket_error;
426         }
427         ctx->state = JLOG_STREAMER_WANT_COUNT;
428         break;
429     }
430   }
431   /* never get here */
432 }
433
434 int
435 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
436                             struct timeval *now) {
437   noit_connection_ctx_t *nctx = closure;
438   int rv;
439
440   rv = eventer_SSL_connect(e, &mask);
441   if(rv > 0) {
442     eventer_ssl_ctx_t *sslctx;
443     e->callback = nctx->consumer_callback;
444     /* We must make a copy of the acceptor_closure_t for each new
445      * connection.
446      */
447     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
448       const char *cn, *end;
449       cn = eventer_ssl_get_peer_subject(sslctx);
450       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
451         cn += 3;
452         end = cn;
453         while(*end && *end != '/') end++;
454         nctx->remote_cn = malloc(end - cn + 1);
455         memcpy(nctx->remote_cn, cn, end - cn);
456         nctx->remote_cn[end-cn] = '\0';
457       }
458     }
459     return e->callback(e, mask, e->closure, now);
460   }
461   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
462
463   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
464   eventer_remove_fd(e->fd);
465   e->opset->close(e->fd, &mask, e);
466   noit_connection_schedule_reattempt(nctx, now);
467   return 0;
468 }
469 int
470 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
471                                  struct timeval *now) {
472   noit_connection_ctx_t *nctx = closure;
473   const char *cert, *key, *ca, *ciphers;
474   char remote_str[128], tmp_str[128];
475   eventer_ssl_ctx_t *sslctx;
476   int aerrno, len;
477   socklen_t aerrno_len = sizeof(aerrno);
478
479   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
480     if(aerrno != 0) goto connect_error;
481   aerrno = 0;
482
483   if(mask & EVENTER_EXCEPTION) {
484     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
485       aerrno = errno;
486  connect_error:
487     switch(nctx->r.remote.sa_family) {
488       case AF_INET:
489         len = sizeof(struct sockaddr_in);
490         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
491                   tmp_str, len);
492         snprintf(remote_str, sizeof(remote_str), "%s:%d",
493                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
494         break;
495       case AF_INET6:
496         len = sizeof(struct sockaddr_in6);
497         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
498                   tmp_str, len);
499         snprintf(remote_str, sizeof(remote_str), "%s:%d",
500                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
501        break;
502       case AF_UNIX:
503         len = SUN_LEN(&(nctx->r.remote_un));
504         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
505         break;
506       default:
507         snprintf(remote_str, sizeof(remote_str), "(unknown)");
508     }
509     noitL(noit_error, "Error connecting to %s: %s\n",
510           remote_str, strerror(aerrno));
511     eventer_remove_fd(e->fd);
512     e->opset->close(e->fd, &mask, e);
513     noit_connection_schedule_reattempt(nctx, now);
514     return 0;
515   }
516
517 #define SSLCONFGET(var,name) do { \
518   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
519                          &var)) var = NULL; } while(0)
520   SSLCONFGET(cert, "certificate_file");
521   SSLCONFGET(key, "key_file");
522   SSLCONFGET(ca, "ca_chain");
523   SSLCONFGET(ciphers, "ciphers");
524   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
525   if(!sslctx) goto connect_error;
526
527   memcpy(&nctx->last_connect, now, sizeof(*now));
528   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
529                              nctx->sslconfig);
530   EVENTER_ATTACH_SSL(e, sslctx);
531   e->callback = noit_connection_ssl_upgrade;
532   return e->callback(e, mask, closure, now);
533 }
534 static void
535 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
536   struct timeval __now;
537   eventer_t e;
538   int rv, fd = -1;
539 #ifdef SO_KEEPALIVE
540   int optval;
541   socklen_t optlen = sizeof(optval);
542 #endif
543
544   if(nctx->wants_permanent_shutdown) {
545     noit_connection_ctx_dealloc(nctx);
546     return;
547   }
548   /* Open a socket */
549   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
550   if(fd < 0) goto reschedule;
551
552   /* Make it non-blocking */
553   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
554 #define set_or_bail(type, opt, val) do { \
555   optval = val; \
556   optlen = sizeof(optval); \
557   if(setsockopt(fd, type, opt, &optval, optlen) < 0) { \
558     noitL(noit_error, "Cannot set " #type "/" #opt " on jlog socket: %s\n", \
559           strerror(errno)); \
560     goto reschedule; \
561   } \
562 } while(0)
563 #ifdef SO_KEEPALIVE
564   set_or_bail(SOL_SOCKET, SO_KEEPALIVE, 1);
565 #endif
566 #ifdef TCP_KEEPALIVE_THRESHOLD
567   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_THRESHOLD, 10 * 1000);
568 #endif
569 #ifdef TCP_KEEPALIVE_ABORT_THRESHOLD
570   set_or_bail(IPPROTO_TCP, TCP_KEEPALIVE_ABORT_THRESHOLD, 30 * 1000);
571 #endif
572 #ifdef TCP_CONN_NOTIFY_THRESHOLD
573   set_or_bail(IPPROTO_TCP, TCP_CONN_NOTIFY_THRESHOLD, 10 * 1000);
574 #endif
575 #ifdef TCP_CONN_ABORT_THRESHOLD
576   set_or_bail(IPPROTO_TCP, TCP_CONN_ABORT_THRESHOLD, 30 * 1000);
577 #endif
578
579   /* Initiate a connection */
580   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
581   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
582
583   /* Register a handler for connection completion */
584   e = eventer_alloc();
585   e->fd = fd;
586   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
587   e->callback = noit_connection_complete_connect;
588   e->closure = nctx;
589   eventer_add(e);
590   return;
591
592  reschedule:
593   if(fd >= 0) close(fd);
594   gettimeofday(&__now, NULL);
595   noit_connection_schedule_reattempt(nctx, &__now);
596   return;
597 }
598
599 int
600 initiate_noit_connection(const char *host, unsigned short port,
601                          noit_hash_table *sslconfig, noit_hash_table *config,
602                          eventer_func_t handler, void *closure,
603                          void (*freefunc)(void *)) {
604   noit_connection_ctx_t *ctx;
605
606   int8_t family;
607   int rv;
608   union {
609     struct in_addr addr4;
610     struct in6_addr addr6;
611   } a;
612
613   if(host[0] == '/') {
614     family = AF_UNIX;
615   }
616   else {
617     family = AF_INET;
618     rv = inet_pton(family, host, &a);
619     if(rv != 1) {
620       family = AF_INET6;
621       rv = inet_pton(family, host, &a);
622       if(rv != 1) {
623         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
624         return -1;
625       }
626     }
627   }
628
629   ctx = noit_connection_ctx_alloc();
630   ctx->remote_str = calloc(1, strlen(host) + 7);
631   snprintf(ctx->remote_str, strlen(host) + 7,
632            "%s:%d", host, port);
633  
634   memset(&ctx->r, 0, sizeof(ctx->r));
635   if(family == AF_UNIX) {
636     struct sockaddr_un *s = &ctx->r.remote_un;
637     s->sun_family = AF_UNIX;
638     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
639     ctx->remote_len = sizeof(*s);
640   }
641   else if(family == AF_INET) {
642     struct sockaddr_in *s = &ctx->r.remote_in;
643     s->sin_family = family;
644     s->sin_port = htons(port);
645     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
646     ctx->remote_len = sizeof(*s);
647   }
648   else {
649     struct sockaddr_in6 *s = &ctx->r.remote_in6;
650     s->sin6_family = family;
651     s->sin6_port = htons(port);
652     memcpy(&s->sin6_addr, &a, sizeof(a));
653     ctx->remote_len = sizeof(*s);
654   }
655
656   if(ctx->sslconfig)
657     noit_hash_delete_all(ctx->sslconfig, free, free);
658   else
659     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
660   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
661   if(ctx->config)
662     noit_hash_delete_all(ctx->config, free, free);
663   else
664     ctx->config = calloc(1, sizeof(noit_hash_table));
665   noit_hash_merge_as_dict(ctx->config, config);
666
667   ctx->consumer_callback = handler;
668   ctx->consumer_free = freefunc;
669   ctx->consumer_ctx = closure;
670   noit_connection_initiate_connection(ctx);
671   return 0;
672 }
673
674 void
675 stratcon_streamer_connection(const char *toplevel, const char *destination,
676                              eventer_func_t handler,
677                              void *(*handler_alloc)(void), void *handler_ctx,
678                              void (*handler_free)(void *)) {
679   int i, cnt = 0;
680   noit_conf_section_t *noit_configs;
681   char path[256];
682
683   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
684   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
685   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
686   for(i=0; i<cnt; i++) {
687     char address[256];
688     unsigned short port;
689     int portint;
690     noit_hash_table *sslconfig, *config;
691
692     if(!noit_conf_get_stringbuf(noit_configs[i],
693                                 "ancestor-or-self::node()/@address",
694                                 address, sizeof(address))) {
695       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
696       continue;
697     }
698     /* if destination is specified, exact match it */
699     if(destination && strcmp(address, destination)) continue;
700
701     if(!noit_conf_get_int(noit_configs[i],
702                           "ancestor-or-self::node()/@port", &portint))
703       portint = 0;
704     port = (unsigned short) portint;
705     if(address[0] != '/' && (portint == 0 || (port != portint))) {
706       /* UNIX sockets don't require a port (they'll ignore it if specified */
707       noitL(noit_stderr,
708             "Invalid port [%d] specified in stanza %d\n", port, i+1);
709       continue;
710     }
711     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
712     config = noit_conf_get_hash(noit_configs[i], "config");
713
714     noitL(noit_error, "initiating to %s\n", address);
715     initiate_noit_connection(address, port, sslconfig, config,
716                              handler,
717                              handler_alloc ? handler_alloc() : handler_ctx,
718                              handler_free);
719     noit_hash_destroy(sslconfig,free,free);
720     free(sslconfig);
721     noit_hash_destroy(config,free,free);
722     free(config);
723   }
724   free(noit_configs);
725 }
726 void
727 stratcon_jlog_streamer_reload(const char *toplevel) {
728   stratcon_streamer_connection(toplevel, NULL,
729                                stratcon_jlog_recv_handler,
730                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
731                                NULL,
732                                jlog_streamer_ctx_free);
733 }
734
735 static int
736 stratcon_console_show_noits(noit_console_closure_t ncct,
737                             int argc, char **argv,
738                             noit_console_state_t *dstate,
739                             void *closure) {
740   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
741   uuid_t key_id;
742   int klen, n = 0, i;
743   void *vconn;
744   noit_connection_ctx_t **ctx;
745
746   pthread_mutex_lock(&noits_lock);
747   ctx = malloc(sizeof(*ctx) * noits.size);
748   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
749                        &vconn)) {
750     ctx[n] = (noit_connection_ctx_t *)vconn;
751     noit_atomic_inc32(&ctx[n]->refcnt);
752     n++;
753   }
754   pthread_mutex_unlock(&noits_lock);
755   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
756   for(i=0; i<n; i++) {
757     nc_print_noit_conn_brief(ncct, ctx[i]);
758     noit_connection_ctx_deref(ctx[i]);
759   }
760   free(ctx);
761   return 0;
762 }
763
764 static int
765 rest_show_noits(noit_http_rest_closure_t *restc,
766                 int npats, char **pats) {
767   xmlDocPtr doc;
768   xmlNodePtr root;
769   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
770   uuid_t key_id;
771   int klen, n = 0, i;
772   void *vconn;
773   noit_connection_ctx_t **ctxs;
774   struct timeval now, diff, last;
775   gettimeofday(&now, NULL);
776
777   pthread_mutex_lock(&noits_lock);
778   ctxs = malloc(sizeof(*ctxs) * noits.size);
779   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
780                        &vconn)) {
781     ctxs[n] = (noit_connection_ctx_t *)vconn;
782     noit_atomic_inc32(&ctxs[n]->refcnt);
783     n++;
784   }
785   pthread_mutex_unlock(&noits_lock);
786   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
787
788   doc = xmlNewDoc((xmlChar *)"1.0");
789   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
790   xmlDocSetRootElement(doc, root);
791   for(i=0; i<n; i++) {
792     char buff[256], *feedtype = "unknown", *state = "unknown";
793     xmlNodePtr node;
794     noit_connection_ctx_t *ctx = ctxs[i];
795     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
796
797     node = xmlNewNode(NULL, (xmlChar *)"noit");
798     snprintf(buff, sizeof(buff), "%llu.%06d",
799              (long long unsigned)ctx->last_connect.tv_sec,
800              (int)ctx->last_connect.tv_usec);
801     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
802     xmlSetProp(node, (xmlChar *)"state", ctx->remote_cn ?
803                (xmlChar *)"connected" :
804                (ctx->timeout_event ? (xmlChar *)"disconnected" :
805                                     (xmlChar *)"connecting"));
806     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
807     switch(ntohl(jctx->jlog_feed_cmd)) {
808       case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
809       case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
810     }
811     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
812     if(ctx->timeout_event) {
813       sub_timeval(ctx->timeout_event->whence, now, &diff);
814       snprintf(buff, sizeof(buff), "%llu.%06d",
815                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
816       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
817     }
818     else if(ctx->remote_cn) {
819       if(ctx->remote_cn)
820         xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
821  
822       switch(jctx->state) {
823         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
824         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
825         case JLOG_STREAMER_WANT_ERROR: state = "waiting for error"; break;
826         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
827         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
828         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
829         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
830       }
831       xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
832       snprintf(buff, sizeof(buff), "%08x:%08x",
833                jctx->header.chkpt.log, jctx->header.chkpt.marker);
834       xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
835       snprintf(buff, sizeof(buff), "%llu",
836                (unsigned long long)jctx->total_events);
837       xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
838       snprintf(buff, sizeof(buff), "%llu",
839                (unsigned long long)jctx->total_bytes_read);
840       xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
841  
842       sub_timeval(now, ctx->last_connect, &diff);
843       snprintf(buff, sizeof(buff), "%lld.%06d",
844                (long long)diff.tv_sec, (int)diff.tv_usec);
845       xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
846  
847       if(jctx->header.tv_sec) {
848         last.tv_sec = jctx->header.tv_sec;
849         last.tv_usec = jctx->header.tv_usec;
850         snprintf(buff, sizeof(buff), "%llu.%06d",
851                  (unsigned long long)last.tv_sec, (int)last.tv_usec);
852         xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
853         sub_timeval(now, last, &diff);
854         snprintf(buff, sizeof(buff), "%lld.%06d",
855                  (long long)diff.tv_sec, (int)diff.tv_usec);
856         xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
857       }
858     }
859
860     xmlAddChild(root, node);
861     noit_connection_ctx_deref(ctx);
862   }
863   free(ctxs);
864
865   noit_http_response_ok(restc->http_ctx, "text/xml");
866   noit_http_response_xml(restc->http_ctx, doc);
867   noit_http_response_end(restc->http_ctx);
868   xmlFreeDoc(doc);
869   return 0;
870 }
871 static int
872 stratcon_add_noit(const char *target, unsigned short port) {
873   int cnt;
874   char path[256];
875   char port_str[6];
876   noit_conf_section_t *noit_configs, parent;
877   xmlNodePtr newnoit;
878
879   snprintf(path, sizeof(path),
880            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
881   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
882   free(noit_configs);
883   if(cnt != 0) return 0;
884
885   parent = noit_conf_get_section(NULL, "//noits");
886   if(!parent) return 0;
887   snprintf(port_str, sizeof(port_str), "%d", port);
888   newnoit = xmlNewNode(NULL, (xmlChar *)"noit");
889   xmlSetProp(newnoit, (xmlChar *)"address", (xmlChar *)target);
890   xmlSetProp(newnoit, (xmlChar *)"port", (xmlChar *)port_str);
891   xmlAddChild(parent, newnoit);
892   stratcon_streamer_connection(NULL, target,
893                                stratcon_jlog_recv_handler,
894                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
895                                NULL,
896                                jlog_streamer_ctx_free);
897   stratcon_streamer_connection(NULL, target,
898                                stratcon_jlog_recv_handler,
899                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
900                                NULL,
901                                jlog_streamer_ctx_free);
902   return 1;
903 }
904 static int
905 stratcon_remove_noit(const char *target, unsigned short port) {
906   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
907   uuid_t key_id;
908   int klen, n = 0, i, cnt = 0;
909   void *vconn;
910   noit_connection_ctx_t **ctx;
911   noit_conf_section_t *noit_configs;
912   char path[256];
913   char remote_str[256];
914
915   snprintf(remote_str, sizeof(remote_str), "%s:%d", target, port);
916
917   snprintf(path, sizeof(path),
918            "//noits//noit[@address=\"%s\" and @port=\"%d\"]", target, port);
919   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
920   for(i=0; i<cnt; i++) {
921     xmlUnlinkNode(noit_configs[i]);
922     xmlFreeNode(noit_configs[i]);
923   }
924   free(noit_configs);
925
926   pthread_mutex_lock(&noits_lock);
927   ctx = malloc(sizeof(*ctx) * noits.size);
928   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
929                        &vconn)) {
930     if(!strcmp(((noit_connection_ctx_t *)vconn)->remote_str, remote_str)) {
931       ctx[n] = (noit_connection_ctx_t *)vconn;
932       noit_atomic_inc32(&ctx[n]->refcnt);
933       n++;
934     }
935   }
936   pthread_mutex_unlock(&noits_lock);
937   for(i=0; i<n; i++) {
938     noit_connection_ctx_dealloc(ctx[i]); /* once for the record */
939     noit_connection_ctx_deref(ctx[i]);   /* once for the aboce inc32 */
940   }
941   free(ctx);
942   return n;
943 }
944 static int
945 rest_set_noit(noit_http_rest_closure_t *restc,
946               int npats, char **pats) {
947   noit_http_session_ctx *ctx = restc->http_ctx;
948   unsigned short port = 43191;
949   if(npats < 1 || npats > 2)
950     noit_http_response_server_error(ctx, "text/xml");
951   if(npats == 2) port = atoi(pats[1]);
952   if(stratcon_add_noit(pats[0], port))
953     noit_http_response_ok(ctx, "text/xml");
954   else
955     noit_http_response_standard(ctx, 409, "EXISTS", "text/xml");
956   if(noit_conf_write_file(NULL) != 0)
957     noitL(noit_error, "local config write failed\n");
958   noit_conf_mark_changed();
959   noit_http_response_end(ctx);
960   return 0;
961 }
962 static int
963 rest_delete_noit(noit_http_rest_closure_t *restc,
964                  int npats, char **pats) {
965   noit_http_session_ctx *ctx = restc->http_ctx;
966   unsigned short port = 43191;
967   if(npats < 1 || npats > 2)
968     noit_http_response_server_error(ctx, "text/xml");
969   if(npats == 2) port = atoi(pats[1]);
970   if(stratcon_remove_noit(pats[0], port))
971     noit_http_response_ok(ctx, "text/xml");
972   else
973     noit_http_response_not_found(ctx, "text/xml");
974   if(noit_conf_write_file(NULL) != 0)
975     noitL(noit_error, "local config write failed\n");
976   noit_conf_mark_changed();
977   noit_http_response_end(ctx);
978   return 0;
979 }
980 static int
981 stratcon_console_conf_noits(noit_console_closure_t ncct,
982                             int argc, char **argv,
983                             noit_console_state_t *dstate,
984                             void *closure) {
985   char *cp, target[128];
986   unsigned short port = 43191;
987   int adding = (int)(vpsized_int)closure;
988   if(argc != 1)
989     return -1;
990
991   cp = strchr(argv[0], ':');
992   if(cp) {
993     strlcpy(target, argv[0], MIN(sizeof(target), cp-argv[0]+1));
994     port = atoi(cp+1);
995   }
996   else strlcpy(target, argv[0], sizeof(target));
997   if(adding) {
998     if(stratcon_add_noit(target, port)) {
999       nc_printf(ncct, "Added noit at %s:%d\n", target, port);
1000     }
1001     else {
1002       nc_printf(ncct, "Failed to add noit at %s:%d\n", target, port);
1003     }
1004   }
1005   else {
1006     if(stratcon_remove_noit(target, port)) {
1007       nc_printf(ncct, "Removed noit at %s:%d\n", target, port);
1008     }
1009     else {
1010       nc_printf(ncct, "Failed to remove noit at %s:%d\n", target, port);
1011     }
1012   }
1013   return 0;
1014 }
1015
1016 static void
1017 register_console_streamer_commands() {
1018   noit_console_state_t *tl;
1019   cmd_info_t *showcmd, *confcmd, *conftcmd, *conftnocmd;
1020
1021   tl = noit_console_state_initial();
1022   showcmd = noit_console_state_get_cmd(tl, "show");
1023   assert(showcmd && showcmd->dstate);
1024   confcmd = noit_console_state_get_cmd(tl, "configure");
1025   conftcmd = noit_console_state_get_cmd(confcmd->dstate, "terminal");
1026   conftnocmd = noit_console_state_get_cmd(conftcmd->dstate, "no");
1027
1028   noit_console_state_add_cmd(conftcmd->dstate,
1029     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)1));
1030   noit_console_state_add_cmd(conftnocmd->dstate,
1031     NCSCMD("noit", stratcon_console_conf_noits, NULL, NULL, (void *)0));
1032
1033   noit_console_state_add_cmd(showcmd->dstate,
1034     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
1035 }
1036
1037 void
1038 stratcon_jlog_streamer_init(const char *toplevel) {
1039   pthread_mutex_init(&noits_lock, NULL);
1040   eventer_name_callback("noit_connection_reinitiate",
1041                         noit_connection_reinitiate);
1042   eventer_name_callback("stratcon_jlog_recv_handler",
1043                         stratcon_jlog_recv_handler);
1044   eventer_name_callback("noit_connection_ssl_upgrade",
1045                         noit_connection_ssl_upgrade);
1046   eventer_name_callback("noit_connection_complete_connect",
1047                         noit_connection_complete_connect);
1048   register_console_streamer_commands();
1049   stratcon_jlog_streamer_reload(toplevel);
1050   assert(noit_http_rest_register_auth(
1051     "GET", "/noits/", "^show$", rest_show_noits,
1052              noit_http_rest_client_cert_auth
1053   ) == 0);
1054   assert(noit_http_rest_register_auth(
1055     "PUT", "/noits/", "^set/([^/:]+)$", rest_set_noit,
1056              noit_http_rest_client_cert_auth
1057   ) == 0);
1058   assert(noit_http_rest_register_auth(
1059     "PUT", "/noits/", "^set/([^/:]+):(\\d+)$", rest_set_noit,
1060              noit_http_rest_client_cert_auth
1061   ) == 0);
1062   assert(noit_http_rest_register_auth(
1063     "DELETE", "/noits/", "^delete/([^/:]+)$", rest_delete_noit,
1064              noit_http_rest_client_cert_auth
1065   ) == 0);
1066   assert(noit_http_rest_register_auth(
1067     "DELETE", "/noits/", "^delete/([^/:]+):(\\d+)$", rest_delete_noit,
1068              noit_http_rest_client_cert_auth
1069   ) == 0);
1070 }
1071
Note: See TracBrowser for help on using the browser.