root/src/stratcon_jlog_streamer.c

Revision 8504a3bc4b5a70ad76874cc27cff73a2308a0543, 28.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

export noit info over REST

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "noit_rest.h"
40 #include "stratcon_datastore.h"
41 #include "stratcon_jlog_streamer.h"
42
43 #include <unistd.h>
44 #include <assert.h>
45 #include <errno.h>
46 #include <sys/types.h>
47 #include <sys/socket.h>
48 #ifdef HAVE_SYS_FILIO_H
49 #include <sys/filio.h>
50 #endif
51 #include <netinet/in.h>
52 #include <sys/un.h>
53 #include <arpa/inet.h>
54
55 pthread_mutex_t noits_lock;
56 noit_hash_table noits = NOIT_HASH_EMPTY;
57
58 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
59
60 static int
61 remote_str_sort(const void *a, const void *b) {
62   noit_connection_ctx_t * const *actx = a;
63   noit_connection_ctx_t * const *bctx = b;
64   return strcmp((*actx)->remote_str, (*bctx)->remote_str);
65 }
66 static void
67 nc_print_noit_conn_brief(noit_console_closure_t ncct,
68                           noit_connection_ctx_t *ctx) {
69   struct timeval now, diff, session_duration;
70   gettimeofday(&now, NULL);
71   const char *lasttime = "never";
72   if(ctx->last_connect.tv_sec != 0) {
73     char cmdbuf[4096];
74     time_t r = ctx->last_connect.tv_sec;
75     struct tm tbuf, *tm;
76     tm = gmtime_r(&r, &tbuf);
77     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
78     lasttime = cmdbuf;
79   }
80   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
81             ctx->timeout_event ? "disconnected" : "connected", lasttime);
82   if(ctx->timeout_event) {
83     sub_timeval(now, ctx->timeout_event->whence, &diff);
84     nc_printf(ncct, "\tNext attempet in %llu.%06us\n", diff.tv_sec, diff.tv_usec);
85   }
86   else {
87     nc_printf(ncct, "\tRemote CN: '%s'\n",
88               ctx->remote_cn ? ctx->remote_cn : "???");
89     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
90       jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
91       struct timeval last;
92       double session_duration_seconds;
93       const char *feedtype = "unknown";
94       const char *state = "unknown";
95
96       switch(ntohl(jctx->jlog_feed_cmd)) {
97         case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
98         case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
99       }
100       switch(jctx->state) {
101         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
102         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
103         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
104         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
105         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
106         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
107       }
108       last.tv_sec = jctx->header.tv_sec;
109       last.tv_usec = jctx->header.tv_usec;
110       sub_timeval(now, last, &diff);
111       sub_timeval(now, ctx->last_connect, &session_duration);
112       session_duration_seconds = session_duration.tv_sec +
113                                  (double)session_duration.tv_usec/1000000.0;
114       nc_printf(ncct, "\tJLog event streamer [%s]\n\tState: %s\n"
115                       "\tNext checkpoint: [%08x:%08x]\n"
116                       "\tLast event: %llu.%06us ago\n"
117                       "\tEvents this session: %llu (%0.2f/s)\n"
118                       "\tOctets this session: %llu (%0.2f/s)\n",
119                 feedtype, state,
120                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
121                 diff.tv_sec, diff.tv_usec,
122                 jctx->total_events,
123                 (double)jctx->total_events/session_duration_seconds,
124                 jctx->total_bytes_read,
125                 (double)jctx->total_bytes_read/session_duration_seconds);
126     }
127     else {
128       nc_printf(ncct, "\tUnknown type.\n");
129     }
130   }
131 }
132
133 jlog_streamer_ctx_t *
134 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
135   jlog_streamer_ctx_t *ctx;
136   ctx = stratcon_jlog_streamer_ctx_alloc();
137   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
138   ctx->push = stratcon_datastore_push;
139   return ctx;
140 }
141 jlog_streamer_ctx_t *
142 stratcon_jlog_streamer_ctx_alloc(void) {
143   jlog_streamer_ctx_t *ctx;
144   ctx = calloc(1, sizeof(*ctx));
145   return ctx;
146 }
147 noit_connection_ctx_t *
148 noit_connection_ctx_alloc(void) {
149   noit_connection_ctx_t *ctx, **pctx;
150   ctx = calloc(1, sizeof(*ctx));
151   ctx->refcnt = 1;
152   pctx = malloc(sizeof(*pctx));
153   *pctx = ctx;
154   pthread_mutex_lock(&noits_lock);
155   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
156   pthread_mutex_unlock(&noits_lock);
157   return ctx;
158 }
159 int
160 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
161                          struct timeval *now) {
162   noit_connection_ctx_t *ctx = closure;
163   ctx->timeout_event = NULL;
164   noit_connection_initiate_connection(closure);
165   return 0;
166 }
167 void
168 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
169                                    struct timeval *now) {
170   struct timeval __now, interval;
171   const char *v;
172   u_int32_t min_interval = 1000, max_interval = 8000;
173   if(ctx->remote_cn) {
174     free(ctx->remote_cn);
175     ctx->remote_cn = NULL;
176   }
177   if(noit_hash_retr_str(ctx->config,
178                         "reconnect_initial_interval",
179                         strlen("reconnect_initial_interval"),
180                         &v)) {
181     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
182   }
183   if(noit_hash_retr_str(ctx->config,
184                         "reconnect_maximum_interval",
185                         strlen("reconnect_maximum_interval"),
186                         &v)) {
187     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
188   }
189   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
190   else {
191     ctx->current_backoff *= 2;
192     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
193     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
194   }
195   if(!now) {
196     gettimeofday(&__now, NULL);
197     now = &__now;
198   }
199   interval.tv_sec = ctx->current_backoff / 1000;
200   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
201   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
202         ctx->current_backoff);
203   if(ctx->timeout_event)
204     eventer_remove(ctx->timeout_event);
205   else
206     ctx->timeout_event = eventer_alloc();
207   ctx->timeout_event->callback = noit_connection_reinitiate;
208   ctx->timeout_event->closure = ctx;
209   ctx->timeout_event->mask = EVENTER_TIMER;
210   add_timeval(*now, interval, &ctx->timeout_event->whence);
211   eventer_add(ctx->timeout_event);
212 }
213 static void
214 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
215   if(ctx->remote_cn) free(ctx->remote_cn);
216   if(ctx->remote_str) free(ctx->remote_str);
217   if(ctx->timeout_event) {
218     eventer_remove(ctx->timeout_event);
219     eventer_free(ctx->timeout_event);
220   }
221   ctx->consumer_free(ctx->consumer_ctx);
222   free(ctx);
223 }
224 void
225 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
226   if(noit_atomic_dec32(&ctx->refcnt) == 0)
227     noit_connection_ctx_free(ctx);
228 }
229 void
230 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
231   noit_connection_ctx_t **pctx = &ctx;
232   pthread_mutex_lock(&noits_lock);
233   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
234                    free, (void (*)(void *))noit_connection_ctx_deref);
235   pthread_mutex_unlock(&noits_lock);
236 }
237 void
238 jlog_streamer_ctx_free(void *cl) {
239   jlog_streamer_ctx_t *ctx = cl;
240   if(ctx->buffer) free(ctx->buffer);
241   free(ctx);
242 }
243
244 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
245 static int
246 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
247   int len, mask;
248   while(ctx->bytes_read < ctx->bytes_expected) {
249     len = Eread(ctx->buffer + ctx->bytes_read,
250                 ctx->bytes_expected - ctx->bytes_read);
251     if(len < 0) {
252       *newmask = mask;
253       return -1;
254     }
255     /* if we get 0 inside SSL, and there was a real error, we
256      * will actually get a -1 here.
257      * if(len == 0) return ctx->bytes_read;
258      */
259     ctx->total_bytes_read += len;
260     ctx->bytes_read += len;
261   }
262   assert(ctx->bytes_read == ctx->bytes_expected);
263   return ctx->bytes_read;
264 }
265 #define FULLREAD(e,ctx,size) do { \
266   int mask, len; \
267   if(!ctx->bytes_expected) { \
268     ctx->bytes_expected = size; \
269     if(ctx->buffer) free(ctx->buffer); \
270     ctx->buffer = malloc(size + 1); \
271     if(ctx->buffer == NULL) { \
272       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
273       goto socket_error; \
274     } \
275     ctx->buffer[size] = '\0'; \
276   } \
277   len = __read_on_ctx(e, ctx, &mask); \
278   if(len < 0) { \
279     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
280     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
281     goto socket_error; \
282   } \
283   ctx->bytes_read = 0; \
284   ctx->bytes_expected = 0; \
285   if(len != size) { \
286     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
287           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
288     goto socket_error; \
289   } \
290 } while(0)
291
292 int
293 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
294                            struct timeval *now) {
295   noit_connection_ctx_t *nctx = closure;
296   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
297   int len;
298   jlog_id n_chkpt;
299
300   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
301     if(write(e->fd, e, 0) == -1)
302       noitL(noit_error, "socket error: %s\n", strerror(errno));
303  socket_error:
304     ctx->state = JLOG_STREAMER_WANT_INITIATE;
305     ctx->count = 0;
306     ctx->bytes_read = 0;
307     ctx->bytes_expected = 0;
308     if(ctx->buffer) free(ctx->buffer);
309     ctx->buffer = NULL;
310     noit_connection_schedule_reattempt(nctx, now);
311     eventer_remove_fd(e->fd);
312     e->opset->close(e->fd, &mask, e);
313     return 0;
314   }
315
316   while(1) {
317     switch(ctx->state) {
318       case JLOG_STREAMER_WANT_INITIATE:
319         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
320                               sizeof(ctx->jlog_feed_cmd),
321                               &mask, e);
322         if(len < 0) {
323           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
324           goto socket_error;
325         }
326         if(len != sizeof(ctx->jlog_feed_cmd)) {
327           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
328                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
329           goto socket_error;
330         }
331         ctx->state = JLOG_STREAMER_WANT_COUNT;
332         break;
333
334       case JLOG_STREAMER_WANT_COUNT:
335         FULLREAD(e, ctx, sizeof(u_int32_t));
336         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
337         ctx->count = ntohl(ctx->count);
338         free(ctx->buffer); ctx->buffer = NULL;
339         ctx->state = JLOG_STREAMER_WANT_HEADER;
340         break;
341
342       case JLOG_STREAMER_WANT_HEADER:
343         if(ctx->count == 0) {
344           ctx->state = JLOG_STREAMER_WANT_COUNT;
345           break;
346         }
347         FULLREAD(e, ctx, sizeof(ctx->header));
348         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
349         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
350         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
351         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
352         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
353         ctx->header.message_len = ntohl(ctx->header.message_len);
354         free(ctx->buffer); ctx->buffer = NULL;
355         ctx->state = JLOG_STREAMER_WANT_BODY;
356         break;
357
358       case JLOG_STREAMER_WANT_BODY:
359         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
360         if(ctx->header.message_len > 0)
361           ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);
362         else if(ctx->buffer)
363           free(ctx->buffer);
364         /* Don't free the buffer, it's used by the datastore process. */
365         ctx->buffer = NULL;
366         ctx->count--;
367         ctx->total_events++;
368         if(ctx->count == 0) {
369           eventer_t completion_e;
370           eventer_remove_fd(e->fd);
371           completion_e = eventer_alloc();
372           memcpy(completion_e, e, sizeof(*e));
373           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
374           ctx->state = JLOG_STREAMER_IS_ASYNC;
375           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e);
376           noitL(noit_debug, "Pushing batch asynch...\n");
377           return 0;
378         } else
379           ctx->state = JLOG_STREAMER_WANT_HEADER;
380         break;
381
382       case JLOG_STREAMER_IS_ASYNC:
383         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
384       case JLOG_STREAMER_WANT_CHKPT:
385         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
386               ctx->header.chkpt.log, ctx->header.chkpt.marker);
387         n_chkpt.log = htonl(ctx->header.chkpt.log);
388         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
389
390         /* screw short writes.  I'd rather die than not write my data! */
391         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
392                               &mask, e);
393         if(len < 0) {
394           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
395           goto socket_error;
396         }
397         if(len != sizeof(jlog_id)) {
398           noitL(noit_error, "short write on checkpointing stream.\n");
399           goto socket_error;
400         }
401         ctx->state = JLOG_STREAMER_WANT_COUNT;
402         break;
403     }
404   }
405   /* never get here */
406 }
407
408 int
409 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
410                             struct timeval *now) {
411   noit_connection_ctx_t *nctx = closure;
412   int rv;
413
414   rv = eventer_SSL_connect(e, &mask);
415   if(rv > 0) {
416     eventer_ssl_ctx_t *sslctx;
417     e->callback = nctx->consumer_callback;
418     /* We must make a copy of the acceptor_closure_t for each new
419      * connection.
420      */
421     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
422       const char *cn, *end;
423       cn = eventer_ssl_get_peer_subject(sslctx);
424       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
425         cn += 3;
426         end = cn;
427         while(*end && *end != '/') end++;
428         nctx->remote_cn = malloc(end - cn + 1);
429         memcpy(nctx->remote_cn, cn, end - cn);
430         nctx->remote_cn[end-cn] = '\0';
431       }
432     }
433     return e->callback(e, mask, e->closure, now);
434   }
435   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
436
437   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
438   eventer_remove_fd(e->fd);
439   e->opset->close(e->fd, &mask, e);
440   noit_connection_schedule_reattempt(nctx, now);
441   return 0;
442 }
443 int
444 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
445                                  struct timeval *now) {
446   noit_connection_ctx_t *nctx = closure;
447   const char *cert, *key, *ca, *ciphers;
448   char remote_str[128], tmp_str[128];
449   eventer_ssl_ctx_t *sslctx;
450   int aerrno, len;
451   socklen_t aerrno_len = sizeof(aerrno);
452
453   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
454     if(aerrno != 0) goto connect_error;
455   aerrno = 0;
456
457   if(mask & EVENTER_EXCEPTION) {
458     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
459       aerrno = errno;
460  connect_error:
461     switch(nctx->r.remote.sa_family) {
462       case AF_INET:
463         len = sizeof(struct sockaddr_in);
464         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
465                   tmp_str, len);
466         snprintf(remote_str, sizeof(remote_str), "%s:%d",
467                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
468         break;
469       case AF_INET6:
470         len = sizeof(struct sockaddr_in6);
471         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
472                   tmp_str, len);
473         snprintf(remote_str, sizeof(remote_str), "%s:%d",
474                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
475        break;
476       case AF_UNIX:
477         len = SUN_LEN(&(nctx->r.remote_un));
478         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
479         break;
480       default:
481         snprintf(remote_str, sizeof(remote_str), "(unknown)");
482     }
483     noitL(noit_error, "Error connecting to %s: %s\n",
484           remote_str, strerror(aerrno));
485     eventer_remove_fd(e->fd);
486     e->opset->close(e->fd, &mask, e);
487     noit_connection_schedule_reattempt(nctx, now);
488     return 0;
489   }
490
491 #define SSLCONFGET(var,name) do { \
492   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
493                          &var)) var = NULL; } while(0)
494   SSLCONFGET(cert, "certificate_file");
495   SSLCONFGET(key, "key_file");
496   SSLCONFGET(ca, "ca_chain");
497   SSLCONFGET(ciphers, "ciphers");
498   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
499   if(!sslctx) goto connect_error;
500
501   memcpy(&nctx->last_connect, now, sizeof(*now));
502   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
503                              nctx->sslconfig);
504   EVENTER_ATTACH_SSL(e, sslctx);
505   e->callback = noit_connection_ssl_upgrade;
506   return e->callback(e, mask, closure, now);
507 }
508 static void
509 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
510   struct timeval __now;
511   eventer_t e;
512   int rv, fd = -1;
513
514   if(nctx->wants_permanent_shutdown) {
515     noit_connection_ctx_dealloc(nctx);
516     return;
517   }
518   /* Open a socket */
519   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
520   if(fd < 0) goto reschedule;
521
522   /* Make it non-blocking */
523   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
524
525   /* Initiate a connection */
526   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
527   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
528
529   /* Register a handler for connection completion */
530   e = eventer_alloc();
531   e->fd = fd;
532   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
533   e->callback = noit_connection_complete_connect;
534   e->closure = nctx;
535   eventer_add(e);
536   return;
537
538  reschedule:
539   if(fd >= 0) close(fd);
540   gettimeofday(&__now, NULL);
541   noit_connection_schedule_reattempt(nctx, &__now);
542   return;
543 }
544
545 int
546 initiate_noit_connection(const char *host, unsigned short port,
547                          noit_hash_table *sslconfig, noit_hash_table *config,
548                          eventer_func_t handler, void *closure,
549                          void (*freefunc)(void *)) {
550   noit_connection_ctx_t *ctx;
551
552   int8_t family;
553   int rv;
554   union {
555     struct in_addr addr4;
556     struct in6_addr addr6;
557   } a;
558
559   if(host[0] == '/') {
560     family = AF_UNIX;
561   }
562   else {
563     family = AF_INET;
564     rv = inet_pton(family, host, &a);
565     if(rv != 1) {
566       family = AF_INET6;
567       rv = inet_pton(family, host, &a);
568       if(rv != 1) {
569         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
570         return -1;
571       }
572     }
573   }
574
575   ctx = noit_connection_ctx_alloc();
576   ctx->remote_str = calloc(1, strlen(host) + 7);
577   snprintf(ctx->remote_str, strlen(host) + 7,
578            "%s:%d", host, port);
579  
580   memset(&ctx->r, 0, sizeof(ctx->r));
581   if(family == AF_UNIX) {
582     struct sockaddr_un *s = &ctx->r.remote_un;
583     s->sun_family = AF_UNIX;
584     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
585     ctx->remote_len = sizeof(*s);
586   }
587   else if(family == AF_INET) {
588     struct sockaddr_in *s = &ctx->r.remote_in;
589     s->sin_family = family;
590     s->sin_port = htons(port);
591     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
592     ctx->remote_len = sizeof(*s);
593   }
594   else {
595     struct sockaddr_in6 *s = &ctx->r.remote_in6;
596     s->sin6_family = family;
597     s->sin6_port = htons(port);
598     memcpy(&s->sin6_addr, &a, sizeof(a));
599     ctx->remote_len = sizeof(*s);
600   }
601
602   if(ctx->sslconfig)
603     noit_hash_delete_all(ctx->sslconfig, free, free);
604   else
605     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
606   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
607   if(ctx->config)
608     noit_hash_delete_all(ctx->config, free, free);
609   else
610     ctx->config = calloc(1, sizeof(noit_hash_table));
611   noit_hash_merge_as_dict(ctx->config, config);
612
613   ctx->consumer_callback = handler;
614   ctx->consumer_free = freefunc;
615   ctx->consumer_ctx = closure;
616   noit_connection_initiate_connection(ctx);
617   return 0;
618 }
619
620 void
621 stratcon_streamer_connection(const char *toplevel, const char *destination,
622                              eventer_func_t handler,
623                              void *(*handler_alloc)(void), void *handler_ctx,
624                              void (*handler_free)(void *)) {
625   int i, cnt = 0;
626   noit_conf_section_t *noit_configs;
627   char path[256];
628
629   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
630   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
631   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
632   for(i=0; i<cnt; i++) {
633     char address[256];
634     unsigned short port;
635     int portint;
636     noit_hash_table *sslconfig, *config;
637
638     if(!noit_conf_get_stringbuf(noit_configs[i],
639                                 "ancestor-or-self::node()/@address",
640                                 address, sizeof(address))) {
641       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
642       continue;
643     }
644     /* if destination is specified, exact match it */
645     if(destination && strcmp(address, destination)) continue;
646
647     if(!noit_conf_get_int(noit_configs[i],
648                           "ancestor-or-self::node()/@port", &portint))
649       portint = 0;
650     port = (unsigned short) portint;
651     if(address[0] != '/' && (portint == 0 || (port != portint))) {
652       /* UNIX sockets don't require a port (they'll ignore it if specified */
653       noitL(noit_stderr,
654             "Invalid port [%d] specified in stanza %d\n", port, i+1);
655       continue;
656     }
657     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
658     config = noit_conf_get_hash(noit_configs[i], "config");
659
660     noitL(noit_error, "initiating to %s\n", address);
661     initiate_noit_connection(address, port, sslconfig, config,
662                              handler,
663                              handler_alloc ? handler_alloc() : handler_ctx,
664                              handler_free);
665     noit_hash_destroy(sslconfig,free,free);
666     free(sslconfig);
667     noit_hash_destroy(config,free,free);
668     free(config);
669   }
670   free(noit_configs);
671 }
672 void
673 stratcon_jlog_streamer_reload(const char *toplevel) {
674   stratcon_streamer_connection(toplevel, NULL,
675                                stratcon_jlog_recv_handler,
676                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
677                                NULL,
678                                jlog_streamer_ctx_free);
679 }
680
681 static int
682 stratcon_console_show_noits(noit_console_closure_t ncct,
683                             int argc, char **argv,
684                             noit_console_state_t *dstate,
685                             void *closure) {
686   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
687   uuid_t key_id;
688   int klen, n = 0, i;
689   void *vconn;
690   noit_connection_ctx_t **ctx;
691
692   pthread_mutex_lock(&noits_lock);
693   ctx = malloc(sizeof(*ctx) * noits.size);
694   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
695                        &vconn)) {
696     ctx[n] = (noit_connection_ctx_t *)vconn;
697     noit_atomic_inc32(&ctx[n]->refcnt);
698     n++;
699   }
700   pthread_mutex_unlock(&noits_lock);
701   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
702   for(i=0; i<n; i++) {
703     nc_print_noit_conn_brief(ncct, ctx[i]);
704     noit_connection_ctx_deref(ctx[i]);
705   }
706   free(ctx);
707   return 0;
708 }
709
710 static void
711 register_console_streamer_commands() {
712   noit_console_state_t *tl;
713   cmd_info_t *showcmd;
714
715   tl = noit_console_state_initial();
716   showcmd = noit_console_state_get_cmd(tl, "show");
717   assert(showcmd && showcmd->dstate);
718
719   noit_console_state_add_cmd(showcmd->dstate,
720     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
721 }
722
723 static int
724 rest_show_noits(noit_http_rest_closure_t *restc,
725                 int npats, char **pats) {
726   xmlDocPtr doc;
727   xmlNodePtr root;
728   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
729   uuid_t key_id;
730   int klen, n = 0, i;
731   void *vconn;
732   noit_connection_ctx_t **ctxs;
733   struct timeval now, diff;
734   gettimeofday(&now, NULL);
735
736   pthread_mutex_lock(&noits_lock);
737   ctxs = malloc(sizeof(*ctxs) * noits.size);
738   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
739                        &vconn)) {
740     ctxs[n] = (noit_connection_ctx_t *)vconn;
741     noit_atomic_inc32(&ctxs[n]->refcnt);
742     n++;
743   }
744   pthread_mutex_unlock(&noits_lock);
745   qsort(ctxs, n, sizeof(*ctxs), remote_str_sort);
746
747   doc = xmlNewDoc((xmlChar *)"1.0");
748   root = xmlNewDocNode(doc, NULL, (xmlChar *)"noits", NULL);
749   xmlDocSetRootElement(doc, root);
750   for(i=0; i<n; i++) {
751     char buff[256], *feedtype = "unknown", *state = "unknown";
752     xmlNodePtr node;
753     noit_connection_ctx_t *ctx = ctxs[i];
754     jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
755
756     node = xmlNewNode(NULL, (xmlChar *)"noit");
757     snprintf(buff, sizeof(buff), "%llu.%06d",
758              (long long unsigned)ctx->last_connect.tv_sec,
759              (int)ctx->last_connect.tv_usec);
760     xmlSetProp(node, (xmlChar *)"last_connect", (xmlChar *)buff);
761     xmlSetProp(node, (xmlChar *)"state", ctx->timeout_event ?
762                (xmlChar *)"disconnected" : (xmlChar *)"connected");
763     if(ctx->timeout_event) {
764       sub_timeval(now, ctx->timeout_event->whence, &diff);
765       snprintf(buff, sizeof(buff), "%llu.%06d",
766                (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
767       xmlSetProp(node, (xmlChar *)"next_attempt", (xmlChar *)buff);
768     }
769     xmlSetProp(node, (xmlChar *)"remote", (xmlChar *)ctx->remote_str);
770     if(ctx->remote_cn)
771       xmlSetProp(node, (xmlChar *)"remote_cn", (xmlChar *)ctx->remote_cn);
772
773     switch(ntohl(jctx->jlog_feed_cmd)) {
774       case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
775       case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
776     }
777     xmlSetProp(node, (xmlChar *)"type", (xmlChar *)feedtype);
778     switch(jctx->state) {
779       case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
780       case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
781       case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
782       case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
783       case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
784       case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
785     }
786     xmlSetProp(node, (xmlChar *)"state", (xmlChar *)state);
787     snprintf(buff, sizeof(buff), "%08x:%08x",
788              jctx->header.chkpt.log, jctx->header.chkpt.marker);
789     xmlSetProp(node, (xmlChar *)"checkpoint", (xmlChar *)buff);
790     snprintf(buff, sizeof(buff), "%lu", jctx->total_events);
791     xmlSetProp(node, (xmlChar *)"session_events", (xmlChar *)buff);
792     snprintf(buff, sizeof(buff), "%lu", jctx->total_bytes_read);
793     xmlSetProp(node, (xmlChar *)"session_bytes", (xmlChar *)buff);
794
795     sub_timeval(now, ctx->last_connect, &diff);
796     snprintf(buff, sizeof(buff), "%llu.%06d",
797              (long long unsigned)diff.tv_sec, (int)diff.tv_usec);
798     xmlSetProp(node, (xmlChar *)"session_duration", (xmlChar *)buff);
799
800     xmlAddChild(root, node);
801     noit_connection_ctx_deref(ctx);
802   }
803   free(ctxs);
804
805   noit_http_response_ok(restc->http_ctx, "text/xml");
806   noit_http_response_xml(restc->http_ctx, doc);
807   noit_http_response_end(restc->http_ctx);
808   xmlFreeDoc(doc);
809   return 0;
810 }
811 void
812 stratcon_jlog_streamer_init(const char *toplevel) {
813   pthread_mutex_init(&noits_lock, NULL);
814   eventer_name_callback("noit_connection_reinitiate",
815                         noit_connection_reinitiate);
816   eventer_name_callback("stratcon_jlog_recv_handler",
817                         stratcon_jlog_recv_handler);
818   eventer_name_callback("noit_connection_ssl_upgrade",
819                         noit_connection_ssl_upgrade);
820   eventer_name_callback("noit_connection_complete_connect",
821                         noit_connection_complete_connect);
822   register_console_streamer_commands();
823   stratcon_jlog_streamer_reload(toplevel);
824   assert(noit_http_rest_register(
825     "GET", "/noits/", "^show$", rest_show_noits
826   ) == 0);
827 }
Note: See TracBrowser for help on using the browser.