root/src/stratcon_jlog_streamer.c

Revision 7666f798ce5023fb2d514f6ea370156ce06a1102, 29.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

add some missing info here

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "noit_rest.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_jlog_streamer.h"
42
43 #include <unistd.h>
44 #include <assert.h>
45 #include <errno.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #ifdef HAVE_SYS_FILIO_H
49 #include <sys/filio.h>
50 #endif
51 #include <netinet/in.h>
52 #include <sys/un.h>
53 #include <arpa/inet.h>
54
55 pthread_mutex_t noits_lock;
56 noit_hash_table noits = NOIT_HASH_EMPTY;
57
58 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
59
60 static int
61 remote_str_sort(const void *a, const void *b) {
62   int rv;
63   noit_connection_ctx_t * const *actx = a;
64   noit_connection_ctx_t * const *bctx = b;
65   jlog_streamer_ctx_t *ajctx = (*actx)->consumer_ctx;
66   jlog_streamer_ctx_t *bjctx = (*bctx)->consumer_ctx;
67   rv = strcmp((*actx)->remote_str, (*bctx)->remote_str);
68   if(rv) return rv;
69   return (ajctx->jlog_feed_cmd < bjctx->jlog_feed_cmd) ? -1 :
70            ((ajctx->jlog_feed_cmd == bjctx->jlog_feed_cmd) ? 0 : 1);
71 }
72 static void
73 nc_print_noit_conn_brief(noit_console_closure_t ncct,
74                           noit_connection_ctx_t *ctx) {
75   struct timeval now, diff, session_duration;
76   gettimeofday(&now, NULL);
77   const char *lasttime = "never";
78   if(ctx->last_connect.tv_sec != 0) {
79     char cmdbuf[4096];
80     time_t r = ctx->last_connect.tv_sec;
81     struct tm tbuf, *tm;
82     tm = gmtime_r(&r, &tbuf);
83     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
84     lasttime = cmdbuf;
85   }
86   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
87             ctx->timeout_event ? "disconnected" : "connected", lasttime);
88   if(ctx->timeout_event) {
89     sub_timeval(now, ctx->timeout_event->whence, &diff);
90     nc_printf(ncct, "\tNext attempet in %llu.%06us\n", diff.tv_sec, diff.tv_usec);
91   }
92   else {
93     nc_printf(ncct, "\tRemote CN: '%s'\n",
94               ctx->remote_cn ? ctx->remote_cn : "???");
95     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
96       jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
97       struct timeval last;
98       double session_duration_seconds;
99       const char *feedtype = "unknown";
100       const char *state = "unknown";
101
102       switch(ntohl(jctx->jlog_feed_cmd)) {
103         case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
104         case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
105       }
106       switch(jctx->state) {
107         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
108         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
109         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
110         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
111         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
112         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
113       }
114       last.tv_sec = jctx->header.tv_sec;
115       last.tv_usec = jctx->header.tv_usec;
116       sub_timeval(now, last, &diff);
117       sub_timeval(now, ctx->last_connect, &session_duration);
118       session_duration_seconds = session_duration.tv_sec +
119                                  (double)session_duration.tv_usec/1000000.0;
120       nc_printf(ncct, "\tJLog event streamer [%s]\n\tState: %s\n"
121                       "\tNext checkpoint: [%08x:%08x]\n"
122                       "\tLast event: %llu.%06us ago\n"
123                       "\tEvents this session: %llu (%0.2f/s)\n"
124                       "\tOctets this session: %llu (%0.2f/s)\n",
125                 feedtype, state,
126                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
127                 diff.tv_sec, diff.tv_usec,
128                 jctx->total_events,
129                 (double)jctx->total_events/session_duration_seconds,
130                 jctx->total_bytes_read,
131                 (double)jctx->total_bytes_read/session_duration_seconds);
132     }
133     else {
134       nc_printf(ncct, "\tUnknown type.\n");
135     }
136   }
137 }
138
139 jlog_streamer_ctx_t *
140 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
141   jlog_streamer_ctx_t *ctx;
142   ctx = stratcon_jlog_streamer_ctx_alloc();
143   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
144   ctx->push = stratcon_datastore_push;
145   return ctx;
146 }
147 jlog_streamer_ctx_t *
148 stratcon_jlog_streamer_ctx_alloc(void) {
149   jlog_streamer_ctx_t *ctx;
150   ctx = calloc(1, sizeof(*ctx));
151   return ctx;
152 }
153 noit_connection_ctx_t *
154 noit_connection_ctx_alloc(void) {
155   noit_connection_ctx_t *ctx, **pctx;
156   ctx = calloc(1, sizeof(*ctx));
157   ctx->refcnt = 1;
158   pctx = malloc(sizeof(*pctx));
159   *pctx = ctx;
160   pthread_mutex_lock(&noits_lock);
161   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
162   pthread_mutex_unlock(&noits_lock);
163   return ctx;
164 }
165 int
166 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
167                          struct timeval *now) {
168   noit_connection_ctx_t *ctx = closure;
169   ctx->timeout_event = NULL;
170   noit_connection_initiate_connection(closure);
171   return 0;
172 }
173 void
174 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
175                                    struct timeval *now) {
176   struct timeval __now, interval;
177   const char *v;
178   u_int32_t min_interval = 1000, max_interval = 8000;
179   if(ctx->remote_cn) {
180     free(ctx->remote_cn);
181     ctx->remote_cn = NULL;
182   }
183   if(noit_hash_retr_str(ctx->config,
184                         "reconnect_initial_interval",
185                         strlen("reconnect_initial_interval"),
186                         &v)) {
187     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
188   }
189   if(noit_hash_retr_str(ctx->config,
190                         "reconnect_maximum_interval",
191                         strlen("reconnect_maximum_interval"),
192                         &v)) {
193     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
194   }
195   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
196   else {
197     ctx->current_backoff *= 2;
198     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
199     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
200   }
201   if(!now) {
202     gettimeofday(&__now, NULL);
203     now = &__now;
204   }
205   interval.tv_sec = ctx->current_backoff / 1000;
206   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
207   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
208         ctx->current_backoff);
209   if(ctx->timeout_event)
210     eventer_remove(ctx->timeout_event);
211   else
212     ctx->timeout_event = eventer_alloc();
213   ctx->timeout_event->callback = noit_connection_reinitiate;
214   ctx->timeout_event->closure = ctx;
215   ctx->timeout_event->mask = EVENTER_TIMER;
216   add_timeval(*now, interval, &ctx->timeout_event->whence);
217   eventer_add(ctx->timeout_event);
218 }
219 static void
220 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
221   if(ctx->remote_cn) free(ctx->remote_cn);
222   if(ctx->remote_str) free(ctx->remote_str);
223   if(ctx->timeout_event) {
224     eventer_remove(ctx->timeout_event);
225     eventer_free(ctx->timeout_event);
226   }
227   ctx->consumer_free(ctx->consumer_ctx);
228   free(ctx);
229 }
230 void
231 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
232   if(noit_atomic_dec32(&ctx->refcnt) == 0)
233     noit_connection_ctx_free(ctx);
234 }
235 void
236 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
237   noit_connection_ctx_t **pctx = &ctx;
238   pthread_mutex_lock(&noits_lock);
239   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
240                    free, (void (*)(void *))noit_connection_ctx_deref);
241   pthread_mutex_unlock(&noits_lock);
242 }
243 void
244 jlog_streamer_ctx_free(void *cl) {
245   jlog_streamer_ctx_t *ctx = cl;
246   if(ctx->buffer) free(ctx->buffer);
247   free(ctx);
248 }
249
250 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
251 static int
252 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
253   int len, mask;
254   while(ctx->bytes_read < ctx->bytes_expected) {
255     len = Eread(ctx->buffer + ctx->bytes_read,
256                 ctx->bytes_expected - ctx->bytes_read);
257     if(len < 0) {
258       *newmask = mask;
259       return -1;
260     }
261     /* if we get 0 inside SSL, and there was a real error, we
262      * will actually get a -1 here.
263      * if(len == 0) return ctx->bytes_read;
264      */
265     ctx->total_bytes_read += len;
266     ctx->bytes_read += len;
267   }
268   assert(ctx->bytes_read == ctx->bytes_expected);
269   return ctx->bytes_read;
270 }
271 #define FULLREAD(e,ctx,size) do { \
272   int mask, len; \
273   if(!ctx->bytes_expected) { \
274     ctx->bytes_expected = size; \
275     if(ctx->buffer) free(ctx->buffer); \
276     ctx->buffer = malloc(size + 1); \
277     if(ctx->buffer == NULL) { \
278       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
279       goto socket_error; \
280     } \
281     ctx->buffer[size] = '\0'; \
282   } \
283   len = __read_on_ctx(e, ctx, &mask); \
284   if(len < 0) { \
285     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
286     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
287     goto socket_error; \
288   } \
289   ctx->bytes_read = 0; \
290   ctx->bytes_expected = 0; \
291   if(len != size) { \
292     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
293           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
294     goto socket_error; \
295   } \
296 } while(0)
297
298 int
299 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
300                            struct timeval *now) {
301   noit_connection_ctx_t *nctx = closure;
302   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
303   int len;
304   jlog_id n_chkpt;
305
306   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
307     if(write(e->fd, e, 0) == -1)
308       noitL(noit_error, "socket error: %s\n", strerror(errno));
309  socket_error:
310     ctx->state = JLOG_STREAMER_WANT_INITIATE;
311     ctx->count = 0;
312     ctx->bytes_read = 0;
313     ctx->bytes_expected = 0;
314     if(ctx->buffer) free(ctx->buffer);
315     ctx->buffer = NULL;
316     noit_connection_schedule_reattempt(nctx, now);
317     eventer_remove_fd(e->fd);
318     e->opset->close(e->fd, &mask, e);
319     return 0;
320   }
321
322   while(1) {
323     switch(ctx->state) {
324       case JLOG_STREAMER_WANT_INITIATE:
325         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
326                               sizeof(ctx->jlog_feed_cmd),
327                               &mask, e);
328         if(len < 0) {
329           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
330           goto socket_error;
331         }
332         if(len != sizeof(ctx->jlog_feed_cmd)) {
333           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
334                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
335           goto socket_error;
336         }
337         ctx->state = JLOG_STREAMER_WANT_COUNT;
338         break;
339
340       case JLOG_STREAMER_WANT_COUNT:
341         FULLREAD(e, ctx, sizeof(u_int32_t));
342         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
343         ctx->count = ntohl(ctx->count);
344         free(ctx->buffer); ctx->buffer = NULL;
345         ctx->state = JLOG_STREAMER_WANT_HEADER;
346         break;
347
348       case JLOG_STREAMER_WANT_HEADER:
349         if(ctx->count == 0) {
350           ctx->state = JLOG_STREAMER_WANT_COUNT;
351           break;
352         }
353         FULLREAD(e, ctx, sizeof(ctx->header));
354         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
355         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
356         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
357         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
358         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
359         ctx->header.message_len = ntohl(ctx->header.message_len);
360         free(ctx->buffer); ctx->buffer = NULL;
361         ctx->state = JLOG_STREAMER_WANT_BODY;
362         break;
363
364       case JLOG_STREAMER_WANT_BODY:
365         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
366         if(ctx->header.message_len > 0)
367           ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);
368         else if(ctx->buffer)
369           free(ctx->buffer);
370         /* Don't free the buffer, it's used by the datastore process. */
371         ctx->buffer = NULL;
372         ctx->count--;
373         ctx->total_events++;
374         if(ctx->count == 0) {
375           eventer_t completion_e;
376           eventer_remove_fd(e->fd);
377           completion_e = eventer_alloc();
378           memcpy(completion_e, e, sizeof(*e));
379           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
380           ctx->state = JLOG_STREAMER_IS_ASYNC;
381           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e);
382           noitL(noit_debug, "Pushing batch asynch...\n");
383           return 0;
384         } else
385           ctx->state = JLOG_STREAMER_WANT_HEADER;
386         break;
387
388       case JLOG_STREAMER_IS_ASYNC:
389         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
390       case JLOG_STREAMER_WANT_CHKPT:
391         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
392               ctx->header.chkpt.log, ctx->header.chkpt.marker);
393         n_chkpt.log = htonl(ctx->header.chkpt.log);
394         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
395
396         /* screw short writes.  I'd rather die than not write my data! */
397         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
398                               &mask, e);
399         if(len < 0) {
400           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
401           goto socket_error;
402         }
403         if(len != sizeof(jlog_id)) {
404           noitL(noit_error, "short write on checkpointing stream.\n");
405           goto socket_error;
406         }
407         ctx->state = JLOG_STREAMER_WANT_COUNT;
408         break;
409     }
410   }
411   /* never get here */
412 }
413
414 int
415 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
416                             struct timeval *now) {
417   noit_connection_ctx_t *nctx = closure;
418   int rv;
419
420   rv = eventer_SSL_connect(e, &mask);
421   if(rv > 0) {
422     eventer_ssl_ctx_t *sslctx;
423     e->callback = nctx->consumer_callback;
424     /* We must make a copy of the acceptor_closure_t for each new
425      * connection.
426      */
427     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
428       const char *cn, *end;
429       cn = eventer_ssl_get_peer_subject(sslctx);
430       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
431         cn += 3;
432         end = cn;
433         while(*end && *end != '/') end++;
434         nctx->remote_cn = malloc(end - cn + 1);
435         memcpy(nctx->remote_cn, cn, end - cn);
436         nctx->remote_cn[end-cn] = '\0';
437       }
438     }
439     return e->callback(e, mask, e->closure, now);
440   }
441   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
442
443   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
444   eventer_remove_fd(e->fd);
445   e->opset->close(e->fd, &mask, e);
446   noit_connection_schedule_reattempt(nctx, now);
447   return 0;
448 }
449 int
450 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
451                                  struct timeval *now) {
452   noit_connection_ctx_t *nctx = closure;
453   const char *cert, *key, *ca, *ciphers;
454   char remote_str[128], tmp_str[128];
455   eventer_ssl_ctx_t *sslctx;
456   int aerrno, len;
457   socklen_t aerrno_len = sizeof(aerrno);
458
459   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
460     if(aerrno != 0) goto connect_error;
461   aerrno = 0;
462
463   if(mask & EVENTER_EXCEPTION) {
464     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
465       aerrno = errno;
466  connect_error:
467     switch(nctx->r.remote.sa_family) {
468       case AF_INET:
469         len = sizeof(struct sockaddr_in);
470         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
471                   tmp_str, len);
472         snprintf(remote_str, sizeof(remote_str), "%s:%d",
473                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
474         break;
475       case AF_INET6:
476         len = sizeof(struct sockaddr_in6);
477         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
478                   tmp_str, len);
479         snprintf(remote_str, sizeof(remote_str), "%s:%d",
480                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
481        break;
482       case AF_UNIX:
483         len = SUN_LEN(&(nctx->r.remote_un));
484         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
485         break;
486       default:
487         snprintf(remote_str, sizeof(remote_str), "(unknown)");
488     }
489     noitL(noit_error, "Error connecting to %s: %s\n",
490           remote_str, strerror(aerrno));
491     eventer_remove_fd(e->fd);
492     e->opset->close(e->fd, &mask, e);
493     noit_connection_schedule_reattempt(nctx, now);
494     return 0;
495   }
496
497 #define SSLCONFGET(var,name) do { \
498   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
499                          &var)) var = NULL; } while(0)
500   SSLCONFGET(cert, "certificate_file");
501   SSLCONFGET(key, "key_file");
502   SSLCONFGET(ca, "ca_chain");
503   SSLCONFGET(ciphers, "ciphers");
504   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
505   if(!sslctx) goto connect_error;
506
507   memcpy(&nctx->last_connect, now, sizeof(*now));
508   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
509                              nctx->sslconfig);
510   EVENTER_ATTACH_SSL(e, sslctx);
511   e->callback = noit_connection_ssl_upgrade;
512   return e->callback(e, mask, closure, now);
513 }
514 static void
515 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
516   struct timeval __now;
517   eventer_t e;
518   int rv, fd = -1;
519
520   if(nctx->wants_permanent_shutdown) {
521     noit_connection_ctx_dealloc(nctx);
522     return;
523   }
524   /* Open a socket */
525   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
526   if(fd < 0) goto reschedule;
527
528   /* Make it non-blocking */
529   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
530
531   /* Initiate a connection */
532   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
533   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
534
535   /* Register a handler for connection completion */
536   e = eventer_alloc();
537   e->fd = fd;
538   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
539   e->callback = noit_connection_complete_connect;
540   e->closure = nctx;
541   eventer_add(e);
542   return;
543
544  reschedule:
545   if(fd >= 0) close(fd);
546   gettimeofday(&__now, NULL);
547   noit_connection_schedule_reattempt(nctx, &__now);
548   return;
549 }
550
551 int
552 initiate_noit_connection(const char *host, unsigned short port,
553                          noit_hash_table *sslconfig, noit_hash_table *config,
554                          eventer_func_t handler, void *closure,
555                          void (*freefunc)(void *)) {
556   noit_connection_ctx_t *ctx;
557
558   int8_t family;
559   int rv;
560   union {
561     struct in_addr addr4;
562     struct in6_addr addr6;
563   } a;
564
565   if(host[0] == '/') {
566     family = AF_UNIX;
567   }
568   else {
569     family = AF_INET;
570     rv = inet_pton(family, host, &a);
571     if(rv != 1) {
572       family = AF_INET6;
573       rv = inet_pton(family, host, &a);
574       if(rv != 1) {
575         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
576         return -1;
577       }
578     }
579   }
580
581   ctx = noit_connection_ctx_alloc();
582   ctx->remote_str = calloc(1, strlen(host) + 7);
583   snprintf(ctx->remote_str, strlen(host) + 7,
584            "%s:%d", host, port);
585  
586   memset(&ctx->r, 0, sizeof(ctx->r));
587   if(family == AF_UNIX) {
588     struct sockaddr_un *s = &ctx->r.remote_un;
589     s->sun_family = AF_UNIX;
590     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
591     ctx->remote_len = sizeof(*s);
592   }
593   else if(family == AF_INET) {
594     struct sockaddr_in *s = &ctx->r.remote_in;
595     s->sin_family = family;
596     s->sin_port = htons(port);
597     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
598     ctx->remote_len = sizeof(*s);
599   }
600   else {
601     struct sockaddr_in6 *s = &ctx->r.remote_in6;
602     s->sin6_family = family;
603     s->sin6_port = htons(port);
604     memcpy(&s->sin6_addr, &a, sizeof(a));
605     ctx->remote_len = sizeof(*s);
606   }
607
608   if(ctx->sslconfig)
609     noit_hash_delete_all(ctx->sslconfig, free, free);
610   else
611     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
612   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
613   if(ctx->config)
614     noit_hash_delete_all(ctx->config, free, free);
615   else
616     ctx->config = calloc(1, sizeof(noit_hash_table));
617   noit_hash_merge_as_dict(ctx->config, config);
618
619   ctx->consumer_callback = handler;
620   ctx->consumer_free = freefunc;
621   ctx->consumer_ctx = closure;
622   noit_connection_initiate_connection(ctx);
623   return 0;
624 }
625
626 void
627 stratcon_streamer_connection(const char *toplevel, const char *destination,
628                              eventer_func_t handler,
629                              void *(*handler_alloc)(void), void *handler_ctx,
630                              void (*handler_free)(void *)) {
631   int i, cnt = 0;
632   noit_conf_section_t *noit_configs;
633   char path[256];
634
635   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
636   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
637   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
638   for(i=0; i<cnt; i++) {
639     char address[256];
640     unsigned short port;
641     int portint;
642     noit_hash_table *sslconfig, *config;
643
644     if(!noit_conf_get_stringbuf(noit_configs[i],
645                                 "ancestor-or-self::node()/@address",
646                                 address, sizeof(address))) {
647       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
648       continue;
649     }
650     /* if destination is specified, exact match it */
651     if(destination && strcmp(address, destination)) continue;
652
653     if(!noit_conf_get_int(noit_configs[i],
654                           "ancestor-or-self::node()/@port", &portint))
655       portint = 0;
656     port = (unsigned short) portint;
657     if(address[0] != '/' && (portint == 0 || (port != portint))) {
658       /* UNIX sockets don't require a port (they'll ignore it if specified */
659       noitL(noit_stderr,
660             "Invalid port [%d] specified in stanza %d\n", port, i+1);
661       continue;
662     }
663     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
664     config = noit_conf_get_hash(noit_configs[i], "config");
665
666     noitL(noit_error, "initiating to %s\n", address);
667     initiate_noit_connection(address, port, sslconfig, config,
668                              handler,
669                              handler_alloc ? handler_alloc() : handler_ctx,
670                              handler_free);
671     noit_hash_destroy(sslconfig,free,free);
672     free(sslconfig);
673     noit_hash_destroy(config,free,free);
674     free(config);
675   }
676   free(noit_configs);
677 }
678 void
679 stratcon_jlog_streamer_reload(const char *toplevel) {
680   stratcon_streamer_connection(toplevel, NULL,
681                                stratcon_jlog_recv_handler,
682                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
683                                NULL,
684                                jlog_streamer_ctx_free);
685 }
686
687 static int
688 stratcon_console_show_noits(noit_console_closure_t ncct,
689                             int argc, char **argv,
690                             noit_console_state_t *dstate,
691                             void *closure) {
692   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
693   uuid_t key_id;
694   int klen, n = 0, i;
695   void *vconn;
696   noit_connection_ctx_t **ctx;
697
698   pthread_mutex_lock(&noits_lock);
699   ctx = malloc(sizeof(*ctx) * noits.size);
700   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
701                        &vconn)) {
702     ctx[n] = (noit_connection_ctx_t *)vconn;
703     noit_atomic_inc32(&ctx[n]->refcnt);
704     n++;
705   }
706   pthread_mutex_unlock(&noits_lock);
707   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
708   for(i=0; i<n; i++) {
709     nc_print_noit_conn_brief(ncct, ctx[i]);
710     noit_connection_ctx_deref(ctx[i]);
711   }
712   free(ctx);
713   return 0;
714 }
715
716 static void
717 register_console_streamer_commands() {
718   noit_console_state_t *tl;
719   cmd_info_t *showcmd;
720
721   tl = noit_console_state_initial();
722   showcmd = noit_console_state_get_cmd(tl, "show");
723   assert(showcmd && showcmd->dstate);
724
725   noit_console_state_add_cmd(showcmd->dstate,
726     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
727 }
728
729 static int
730 rest_show_noits(noit_http_rest_closure_t *restc,
731                 int npats, char **pats) {
732   xmlDocPtr doc;
733   xmlNodePtr root;
734   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
735   uuid_t key_id;
736   int klen, n = 0, i;
737   void *vconn;
738   noit_connection_ctx_t **ctxs;
739   struct timeval now, diff, last;
740   gettimeofday(&now, NULL);
741
742   pthread_mutex_lock(&noits_lock);
743   ctxs = malloc(sizeof(*ctxs) * noits.size);
744   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
745                        &vconn)) {
746     ctxs[n] = (noit_connection_ctx_t *)vconn;
747     noit_atomic_inc32(&ctxs[n]->refcnt);
748     n++;
749   }
750   pthread_mutex_unlock(&noits_lock);
751   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
752
753   doc = xmlNewDoc((xmlChar *)"1.0");
754   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
755   xmlDocSetRootElement(doc, root);
756   for(i=0; i<n; i++) {
757     char buff[256], *feedtype = "unknown", *state = "unknown";
758     xmlNodePtr node;
759     noit_connection_ctx_t *ctx = ctxs[i];
760     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
761
762     node = xmlNewNode(NULL, (xmlChar *)"noit");
763     snprintf(buff, sizeof(buff), "%llu.%06d",
764              (long long unsigned)ctx->last_connect.tv_sec,
765              (int)ctx->last_connect.tv_usec);
766     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
767     xmlSetProp(node, (xmlChar *)"state", ctx->timeout_event ?
768                (xmlChar *)"disconnected" : (xmlChar *)"connected");
769     if(ctx->timeout_event) {
770       sub_timeval(now, ctx->timeout_event->whence, &diff);
771       snprintf(buff, sizeof(buff), "%llu.%06d",
772                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
773       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
774     }
775     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
776     if(ctx->remote_cn)
777       xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
778
779     switch(ntohl(jctx->jlog_feed_cmd)) {
780       case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
781       case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
782     }
783     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
784     switch(jctx->state) {
785       case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
786       case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
787       case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
788       case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
789       case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
790       case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
791     }
792     xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
793     snprintf(buff, sizeof(buff), "%08x:%08x",
794              jctx->header.chkpt.log, jctx->header.chkpt.marker);
795     xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
796     snprintf(buff, sizeof(buff), "%llu",
797              (long long unsigned)jctx->total_events);
798     xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
799     snprintf(buff, sizeof(buff), "%llu",
800              (long long unsigned)jctx->total_bytes_read);
801     xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
802
803     sub_timeval(now, ctx->last_connect, &diff);
804     snprintf(buff, sizeof(buff), "%llu.%06d",
805              (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
806     xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
807
808     if(jctx->header.tv_sec) {
809       last.tv_sec = jctx->header.tv_sec;
810       last.tv_usec = jctx->header.tv_usec;
811       snprintf(buff, sizeof(buff), "%llu.%06d",
812                (long long unsigned)last.tv_sec, (int)last.tv_usec);
813       xmlSetProp(node, (xmlChar *)"last_event", (xmlChar *)buff);
814       sub_timeval(now, last, &diff);
815       snprintf(buff, sizeof(buff), "%llu.%06d",
816                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
817       xmlSetProp(node, (xmlChar *)"last_event_age", (xmlChar *)buff);
818     }
819
820     xmlAddChild(root, node);
821     noit_connection_ctx_deref(ctx);
822   }
823   free(ctxs);
824
825   noit_http_response_ok(restc->http_ctx, "text/xml");
826   noit_http_response_xml(restc->http_ctx, doc);
827   noit_http_response_end(restc->http_ctx);
828   xmlFreeDoc(doc);
829   return 0;
830 }
831 void
832 stratcon_jlog_streamer_init(const char *toplevel) {
833   pthread_mutex_init(&noits_lock, NULL);
834   eventer_name_callback("noit_connection_reinitiate",
835                         noit_connection_reinitiate);
836   eventer_name_callback("stratcon_jlog_recv_handler",
837                         stratcon_jlog_recv_handler);
838   eventer_name_callback("noit_connection_ssl_upgrade",
839                         noit_connection_ssl_upgrade);
840   eventer_name_callback("noit_connection_complete_connect",
841                         noit_connection_complete_connect);
842   register_console_streamer_commands();
843   stratcon_jlog_streamer_reload(toplevel);
844   assert(noit_http_rest_register(
845     "GET", "/noits/", "^show$", rest_show_noits
846   ) == 0);
847 }
Note: See TracBrowser for help on using the browser.