root/src/stratcon_jlog_streamer.c

Revision d9fd356e6dcff42e03b313c5d8753a6dff82f36d, 24.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

const here, refs #34

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_jlog_streamer.h"
41
42 #include <unistd.h>
43 #include <assert.h>
44 #include <errno.h>
45 #include <sys/types.h>
46 #include <sys/socket.h>
47 #ifdef HAVE_SYS_FILIO_H
48 #include <sys/filio.h>
49 #endif
50 #include <netinet/in.h>
51 #include <sys/un.h>
52 #include <arpa/inet.h>
53
54 pthread_mutex_t noits_lock;
55 noit_hash_table noits = NOIT_HASH_EMPTY;
56
57 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
58
59 static int
60 remote_str_sort(const void *a, const void *b) {
61   noit_connection_ctx_t * const *actx = a;
62   noit_connection_ctx_t * const *bctx = b;
63   return strcmp((*actx)->remote_str, (*bctx)->remote_str);
64 }
65 static void
66 nc_print_noit_conn_brief(noit_console_closure_t ncct,
67                           noit_connection_ctx_t *ctx) {
68   struct timeval now, diff, session_duration;
69   gettimeofday(&now, NULL);
70   const char *lasttime = "never";
71   if(ctx->last_connect.tv_sec != 0) {
72     char cmdbuf[4096];
73     time_t r = ctx->last_connect.tv_sec;
74     struct tm tbuf, *tm;
75     tm = gmtime_r(&r, &tbuf);
76     strftime(cmdbuf, sizeof(cmdbuf), "%Y-%m-%d %H:%M:%S UTC", tm);
77     lasttime = cmdbuf;
78   }
79   nc_printf(ncct, "%s [%s]:\n\tLast connect: %s\n", ctx->remote_str,
80             ctx->timeout_event ? "disconnected" : "connected", lasttime);
81   if(ctx->timeout_event) {
82     sub_timeval(now, ctx->timeout_event->whence, &diff);
83     nc_printf(ncct, "\tNext attempet in %llu.%06us\n", diff.tv_sec, diff.tv_usec);
84   }
85   else {
86     nc_printf(ncct, "\tRemote CN: '%s'\n",
87               ctx->remote_cn ? ctx->remote_cn : "???");
88     if(ctx->consumer_callback == stratcon_jlog_recv_handler) {
89       jlog_streamer_ctx_t *jctx = ctx->consumer_ctx;
90       struct timeval last;
91       double session_duration_seconds;
92       const char *feedtype = "unknown";
93       const char *state = "unknown";
94
95       switch(ntohl(jctx->jlog_feed_cmd)) {
96         case NOIT_JLOG_DATA_FEED: feedtype = "durable/storage"; break;
97         case NOIT_JLOG_DATA_TEMP_FEED: feedtype = "transient/iep"; break;
98       }
99       switch(jctx->state) {
100         case JLOG_STREAMER_WANT_INITIATE: state = "initiate"; break;
101         case JLOG_STREAMER_WANT_COUNT: state = "waiting for next batch"; break;
102         case JLOG_STREAMER_WANT_HEADER: state = "reading header"; break;
103         case JLOG_STREAMER_WANT_BODY: state = "reading body"; break;
104         case JLOG_STREAMER_IS_ASYNC: state = "asynchronously processing"; break;
105         case JLOG_STREAMER_WANT_CHKPT: state = "checkpointing"; break;
106       }
107       last.tv_sec = jctx->header.tv_sec;
108       last.tv_usec = jctx->header.tv_usec;
109       sub_timeval(now, last, &diff);
110       sub_timeval(now, ctx->last_connect, &session_duration);
111       session_duration_seconds = session_duration.tv_sec +
112                                  (double)session_duration.tv_usec/1000000.0;
113       nc_printf(ncct, "\tJLog event streamer [%s]\n\tState: %s\n"
114                       "\tNext checkpoint: [%08x:%08x]\n"
115                       "\tLast event: %llu.%06us ago\n"
116                       "\tEvents this session: %llu (%0.2f/s)\n"
117                       "\tOctets this session: %llu (%0.2f/s)\n",
118                 feedtype, state,
119                 jctx->header.chkpt.log, jctx->header.chkpt.marker,
120                 diff.tv_sec, diff.tv_usec,
121                 jctx->total_events,
122                 (double)jctx->total_events/session_duration_seconds,
123                 jctx->total_bytes_read,
124                 (double)jctx->total_bytes_read/session_duration_seconds);
125     }
126     else {
127       nc_printf(ncct, "\tUnknown type.\n");
128     }
129   }
130 }
131
132 jlog_streamer_ctx_t *
133 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
134   jlog_streamer_ctx_t *ctx;
135   ctx = stratcon_jlog_streamer_ctx_alloc();
136   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
137   ctx->push = stratcon_datastore_push;
138   return ctx;
139 }
140 jlog_streamer_ctx_t *
141 stratcon_jlog_streamer_ctx_alloc(void) {
142   jlog_streamer_ctx_t *ctx;
143   ctx = calloc(1, sizeof(*ctx));
144   return ctx;
145 }
146 noit_connection_ctx_t *
147 noit_connection_ctx_alloc(void) {
148   noit_connection_ctx_t *ctx, **pctx;
149   ctx = calloc(1, sizeof(*ctx));
150   ctx->refcnt = 1;
151   pctx = malloc(sizeof(*pctx));
152   *pctx = ctx;
153   pthread_mutex_lock(&noits_lock);
154   noit_hash_store(&noits, (const char *)pctx, sizeof(*pctx), ctx);
155   pthread_mutex_unlock(&noits_lock);
156   return ctx;
157 }
158 int
159 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
160                          struct timeval *now) {
161   noit_connection_ctx_t *ctx = closure;
162   ctx->timeout_event = NULL;
163   noit_connection_initiate_connection(closure);
164   return 0;
165 }
166 void
167 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
168                                    struct timeval *now) {
169   struct timeval __now, interval;
170   const char *v;
171   u_int32_t min_interval = 1000, max_interval = 8000;
172   if(ctx->remote_cn) {
173     free(ctx->remote_cn);
174     ctx->remote_cn = NULL;
175   }
176   if(noit_hash_retr_str(ctx->config,
177                         "reconnect_initial_interval",
178                         strlen("reconnect_initial_interval"),
179                         &v)) {
180     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
181   }
182   if(noit_hash_retr_str(ctx->config,
183                         "reconnect_maximum_interval",
184                         strlen("reconnect_maximum_interval"),
185                         &v)) {
186     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
187   }
188   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
189   else {
190     ctx->current_backoff *= 2;
191     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
192     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
193   }
194   if(!now) {
195     gettimeofday(&__now, NULL);
196     now = &__now;
197   }
198   interval.tv_sec = ctx->current_backoff / 1000;
199   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
200   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
201         ctx->current_backoff);
202   if(ctx->timeout_event)
203     eventer_remove(ctx->timeout_event);
204   else
205     ctx->timeout_event = eventer_alloc();
206   ctx->timeout_event->callback = noit_connection_reinitiate;
207   ctx->timeout_event->closure = ctx;
208   ctx->timeout_event->mask = EVENTER_TIMER;
209   add_timeval(*now, interval, &ctx->timeout_event->whence);
210   eventer_add(ctx->timeout_event);
211 }
212 static void
213 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
214   if(ctx->remote_cn) free(ctx->remote_cn);
215   if(ctx->remote_str) free(ctx->remote_str);
216   if(ctx->timeout_event) {
217     eventer_remove(ctx->timeout_event);
218     eventer_free(ctx->timeout_event);
219   }
220   ctx->consumer_free(ctx->consumer_ctx);
221   free(ctx);
222 }
223 void
224 noit_connection_ctx_deref(noit_connection_ctx_t *ctx) {
225   if(noit_atomic_dec32(&ctx->refcnt) == 0)
226     noit_connection_ctx_free(ctx);
227 }
228 void
229 noit_connection_ctx_dealloc(noit_connection_ctx_t *ctx) {
230   noit_connection_ctx_t **pctx = &ctx;
231   pthread_mutex_lock(&noits_lock);
232   noit_hash_delete(&noits, (const char *)pctx, sizeof(*pctx),
233                    free, (void (*)(void *))noit_connection_ctx_deref);
234   pthread_mutex_unlock(&noits_lock);
235 }
236 void
237 jlog_streamer_ctx_free(void *cl) {
238   jlog_streamer_ctx_t *ctx = cl;
239   if(ctx->buffer) free(ctx->buffer);
240   free(ctx);
241 }
242
243 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
244 static int
245 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
246   int len, mask;
247   while(ctx->bytes_read < ctx->bytes_expected) {
248     len = Eread(ctx->buffer + ctx->bytes_read,
249                 ctx->bytes_expected - ctx->bytes_read);
250     if(len < 0) {
251       *newmask = mask;
252       return -1;
253     }
254     /* if we get 0 inside SSL, and there was a real error, we
255      * will actually get a -1 here.
256      * if(len == 0) return ctx->bytes_read;
257      */
258     ctx->total_bytes_read += len;
259     ctx->bytes_read += len;
260   }
261   assert(ctx->bytes_read == ctx->bytes_expected);
262   return ctx->bytes_read;
263 }
264 #define FULLREAD(e,ctx,size) do { \
265   int mask, len; \
266   if(!ctx->bytes_expected) { \
267     ctx->bytes_expected = size; \
268     if(ctx->buffer) free(ctx->buffer); \
269     ctx->buffer = malloc(size + 1); \
270     if(ctx->buffer == NULL) { \
271       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
272       goto socket_error; \
273     } \
274     ctx->buffer[size] = '\0'; \
275   } \
276   len = __read_on_ctx(e, ctx, &mask); \
277   if(len < 0) { \
278     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
279     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
280     goto socket_error; \
281   } \
282   ctx->bytes_read = 0; \
283   ctx->bytes_expected = 0; \
284   if(len != size) { \
285     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
286           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
287     goto socket_error; \
288   } \
289 } while(0)
290
291 int
292 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
293                            struct timeval *now) {
294   noit_connection_ctx_t *nctx = closure;
295   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
296   int len;
297   jlog_id n_chkpt;
298
299   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
300     if(write(e->fd, e, 0) == -1)
301       noitL(noit_error, "socket error: %s\n", strerror(errno));
302  socket_error:
303     ctx->state = JLOG_STREAMER_WANT_INITIATE;
304     ctx->count = 0;
305     ctx->bytes_read = 0;
306     ctx->bytes_expected = 0;
307     if(ctx->buffer) free(ctx->buffer);
308     ctx->buffer = NULL;
309     noit_connection_schedule_reattempt(nctx, now);
310     eventer_remove_fd(e->fd);
311     e->opset->close(e->fd, &mask, e);
312     return 0;
313   }
314
315   while(1) {
316     switch(ctx->state) {
317       case JLOG_STREAMER_WANT_INITIATE:
318         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
319                               sizeof(ctx->jlog_feed_cmd),
320                               &mask, e);
321         if(len < 0) {
322           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
323           goto socket_error;
324         }
325         if(len != sizeof(ctx->jlog_feed_cmd)) {
326           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
327                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
328           goto socket_error;
329         }
330         ctx->state = JLOG_STREAMER_WANT_COUNT;
331         break;
332
333       case JLOG_STREAMER_WANT_COUNT:
334         FULLREAD(e, ctx, sizeof(u_int32_t));
335         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
336         ctx->count = ntohl(ctx->count);
337         free(ctx->buffer); ctx->buffer = NULL;
338         ctx->state = JLOG_STREAMER_WANT_HEADER;
339         break;
340
341       case JLOG_STREAMER_WANT_HEADER:
342         if(ctx->count == 0) {
343           ctx->state = JLOG_STREAMER_WANT_COUNT;
344           break;
345         }
346         FULLREAD(e, ctx, sizeof(ctx->header));
347         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
348         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
349         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
350         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
351         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
352         ctx->header.message_len = ntohl(ctx->header.message_len);
353         free(ctx->buffer); ctx->buffer = NULL;
354         ctx->state = JLOG_STREAMER_WANT_BODY;
355         break;
356
357       case JLOG_STREAMER_WANT_BODY:
358         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
359         if(ctx->header.message_len > 0)
360           ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);
361         else if(ctx->buffer)
362           free(ctx->buffer);
363         /* Don't free the buffer, it's used by the datastore process. */
364         ctx->buffer = NULL;
365         ctx->count--;
366         ctx->total_events++;
367         if(ctx->count == 0) {
368           eventer_t completion_e;
369           eventer_remove_fd(e->fd);
370           completion_e = eventer_alloc();
371           memcpy(completion_e, e, sizeof(*e));
372           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
373           ctx->state = JLOG_STREAMER_IS_ASYNC;
374           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e);
375           noitL(noit_debug, "Pushing batch asynch...\n");
376           return 0;
377         } else
378           ctx->state = JLOG_STREAMER_WANT_HEADER;
379         break;
380
381       case JLOG_STREAMER_IS_ASYNC:
382         ctx->state = JLOG_STREAMER_WANT_CHKPT; /* falls through */
383       case JLOG_STREAMER_WANT_CHKPT:
384         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
385               ctx->header.chkpt.log, ctx->header.chkpt.marker);
386         n_chkpt.log = htonl(ctx->header.chkpt.log);
387         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
388
389         /* screw short writes.  I'd rather die than not write my data! */
390         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
391                               &mask, e);
392         if(len < 0) {
393           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
394           goto socket_error;
395         }
396         if(len != sizeof(jlog_id)) {
397           noitL(noit_error, "short write on checkpointing stream.\n");
398           goto socket_error;
399         }
400         ctx->state = JLOG_STREAMER_WANT_COUNT;
401         break;
402     }
403   }
404   /* never get here */
405 }
406
407 int
408 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
409                             struct timeval *now) {
410   noit_connection_ctx_t *nctx = closure;
411   int rv;
412
413   rv = eventer_SSL_connect(e, &mask);
414   if(rv > 0) {
415     eventer_ssl_ctx_t *sslctx;
416     e->callback = nctx->consumer_callback;
417     /* We must make a copy of the acceptor_closure_t for each new
418      * connection.
419      */
420     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
421       const char *cn, *end;
422       cn = eventer_ssl_get_peer_subject(sslctx);
423       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
424         cn += 3;
425         end = cn;
426         while(*end && *end != '/') end++;
427         nctx->remote_cn = malloc(end - cn + 1);
428         memcpy(nctx->remote_cn, cn, end - cn);
429         nctx->remote_cn[end-cn] = '\0';
430       }
431     }
432     return e->callback(e, mask, e->closure, now);
433   }
434   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
435
436   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
437   eventer_remove_fd(e->fd);
438   e->opset->close(e->fd, &mask, e);
439   noit_connection_schedule_reattempt(nctx, now);
440   return 0;
441 }
442 int
443 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
444                                  struct timeval *now) {
445   noit_connection_ctx_t *nctx = closure;
446   const char *cert, *key, *ca, *ciphers;
447   char remote_str[128], tmp_str[128];
448   eventer_ssl_ctx_t *sslctx;
449   int aerrno, len;
450   socklen_t aerrno_len = sizeof(aerrno);
451
452   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
453     if(aerrno != 0) goto connect_error;
454   aerrno = 0;
455
456   if(mask & EVENTER_EXCEPTION) {
457     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
458       aerrno = errno;
459  connect_error:
460     switch(nctx->r.remote.sa_family) {
461       case AF_INET:
462         len = sizeof(struct sockaddr_in);
463         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
464                   tmp_str, len);
465         snprintf(remote_str, sizeof(remote_str), "%s:%d",
466                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
467         break;
468       case AF_INET6:
469         len = sizeof(struct sockaddr_in6);
470         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
471                   tmp_str, len);
472         snprintf(remote_str, sizeof(remote_str), "%s:%d",
473                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
474        break;
475       case AF_UNIX:
476         len = SUN_LEN(&(nctx->r.remote_un));
477         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
478         break;
479       default:
480         snprintf(remote_str, sizeof(remote_str), "(unknown)");
481     }
482     noitL(noit_error, "Error connecting to %s: %s\n",
483           remote_str, strerror(aerrno));
484     eventer_remove_fd(e->fd);
485     e->opset->close(e->fd, &mask, e);
486     noit_connection_schedule_reattempt(nctx, now);
487     return 0;
488   }
489
490 #define SSLCONFGET(var,name) do { \
491   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
492                          &var)) var = NULL; } while(0)
493   SSLCONFGET(cert, "certificate_file");
494   SSLCONFGET(key, "key_file");
495   SSLCONFGET(ca, "ca_chain");
496   SSLCONFGET(ciphers, "ciphers");
497   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
498   if(!sslctx) goto connect_error;
499
500   memcpy(&nctx->last_connect, now, sizeof(*now));
501   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
502                              nctx->sslconfig);
503   EVENTER_ATTACH_SSL(e, sslctx);
504   e->callback = noit_connection_ssl_upgrade;
505   return e->callback(e, mask, closure, now);
506 }
507 static void
508 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
509   struct timeval __now;
510   eventer_t e;
511   int rv, fd = -1;
512
513   if(nctx->wants_permanent_shutdown) {
514     noit_connection_ctx_dealloc(nctx);
515     return;
516   }
517   /* Open a socket */
518   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
519   if(fd < 0) goto reschedule;
520
521   /* Make it non-blocking */
522   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
523
524   /* Initiate a connection */
525   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
526   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
527
528   /* Register a handler for connection completion */
529   e = eventer_alloc();
530   e->fd = fd;
531   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
532   e->callback = noit_connection_complete_connect;
533   e->closure = nctx;
534   eventer_add(e);
535   return;
536
537  reschedule:
538   if(fd >= 0) close(fd);
539   gettimeofday(&__now, NULL);
540   noit_connection_schedule_reattempt(nctx, &__now);
541   return;
542 }
543
544 int
545 initiate_noit_connection(const char *host, unsigned short port,
546                          noit_hash_table *sslconfig, noit_hash_table *config,
547                          eventer_func_t handler, void *closure,
548                          void (*freefunc)(void *)) {
549   noit_connection_ctx_t *ctx;
550
551   int8_t family;
552   int rv;
553   union {
554     struct in_addr addr4;
555     struct in6_addr addr6;
556   } a;
557
558   if(host[0] == '/') {
559     family = AF_UNIX;
560   }
561   else {
562     family = AF_INET;
563     rv = inet_pton(family, host, &a);
564     if(rv != 1) {
565       family = AF_INET6;
566       rv = inet_pton(family, host, &a);
567       if(rv != 1) {
568         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
569         return -1;
570       }
571     }
572   }
573
574   ctx = noit_connection_ctx_alloc();
575   ctx->remote_str = calloc(1, strlen(host) + 7);
576   snprintf(ctx->remote_str, strlen(host) + 7,
577            "%s:%d", host, port);
578  
579   memset(&ctx->r, 0, sizeof(ctx->r));
580   if(family == AF_UNIX) {
581     struct sockaddr_un *s = &ctx->r.remote_un;
582     s->sun_family = AF_UNIX;
583     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
584     ctx->remote_len = sizeof(*s);
585   }
586   else if(family == AF_INET) {
587     struct sockaddr_in *s = &ctx->r.remote_in;
588     s->sin_family = family;
589     s->sin_port = htons(port);
590     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
591     ctx->remote_len = sizeof(*s);
592   }
593   else {
594     struct sockaddr_in6 *s = &ctx->r.remote_in6;
595     s->sin6_family = family;
596     s->sin6_port = htons(port);
597     memcpy(&s->sin6_addr, &a, sizeof(a));
598     ctx->remote_len = sizeof(*s);
599   }
600
601   if(ctx->sslconfig)
602     noit_hash_delete_all(ctx->sslconfig, free, free);
603   else
604     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
605   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
606   if(ctx->config)
607     noit_hash_delete_all(ctx->config, free, free);
608   else
609     ctx->config = calloc(1, sizeof(noit_hash_table));
610   noit_hash_merge_as_dict(ctx->config, config);
611
612   ctx->consumer_callback = handler;
613   ctx->consumer_free = freefunc;
614   ctx->consumer_ctx = closure;
615   noit_connection_initiate_connection(ctx);
616   return 0;
617 }
618
619 void
620 stratcon_streamer_connection(const char *toplevel, const char *destination,
621                              eventer_func_t handler,
622                              void *(*handler_alloc)(void), void *handler_ctx,
623                              void (*handler_free)(void *)) {
624   int i, cnt = 0;
625   noit_conf_section_t *noit_configs;
626   char path[256];
627
628   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
629   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
630   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
631   for(i=0; i<cnt; i++) {
632     char address[256];
633     unsigned short port;
634     int portint;
635     noit_hash_table *sslconfig, *config;
636
637     if(!noit_conf_get_stringbuf(noit_configs[i],
638                                 "ancestor-or-self::node()/@address",
639                                 address, sizeof(address))) {
640       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
641       continue;
642     }
643     /* if destination is specified, exact match it */
644     if(destination && strcmp(address, destination)) continue;
645
646     if(!noit_conf_get_int(noit_configs[i],
647                           "ancestor-or-self::node()/@port", &portint))
648       portint = 0;
649     port = (unsigned short) portint;
650     if(address[0] != '/' && (portint == 0 || (port != portint))) {
651       /* UNIX sockets don't require a port (they'll ignore it if specified */
652       noitL(noit_stderr,
653             "Invalid port [%d] specified in stanza %d\n", port, i+1);
654       continue;
655     }
656     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
657     config = noit_conf_get_hash(noit_configs[i], "config");
658
659     noitL(noit_error, "initiating to %s\n", address);
660     initiate_noit_connection(address, port, sslconfig, config,
661                              handler,
662                              handler_alloc ? handler_alloc() : handler_ctx,
663                              handler_free);
664     noit_hash_destroy(sslconfig,free,free);
665     free(sslconfig);
666     noit_hash_destroy(config,free,free);
667     free(config);
668   }
669   free(noit_configs);
670 }
671 void
672 stratcon_jlog_streamer_reload(const char *toplevel) {
673   stratcon_streamer_connection(toplevel, NULL,
674                                stratcon_jlog_recv_handler,
675                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
676                                NULL,
677                                jlog_streamer_ctx_free);
678 }
679
680 static int
681 stratcon_console_show_noits(noit_console_closure_t ncct,
682                             int argc, char **argv,
683                             noit_console_state_t *dstate,
684                             void *closure) {
685   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
686   uuid_t key_id;
687   int klen, n = 0, i;
688   void *vconn;
689   noit_connection_ctx_t **ctx;
690
691   pthread_mutex_lock(&noits_lock);
692   ctx = malloc(sizeof(*ctx) * noits.size);
693   while(noit_hash_next(&noits, &iter, (const char **)key_id, &klen,
694                        &vconn)) {
695     ctx[n] = (noit_connection_ctx_t *)vconn;
696     noit_atomic_inc32(&ctx[n]->refcnt);
697     n++;
698   }
699   pthread_mutex_unlock(&noits_lock);
700   qsort(ctx, n, sizeof(*ctx), remote_str_sort);
701   for(i=0; i<n; i++) {
702     nc_print_noit_conn_brief(ncct, ctx[i]);
703     noit_connection_ctx_deref(ctx[i]);
704   }
705   return 0;
706 }
707
708 static void
709 register_console_streamer_commands() {
710   noit_console_state_t *tl;
711   cmd_info_t *showcmd;
712
713   tl = noit_console_state_initial();
714   showcmd = noit_console_state_get_cmd(tl, "show");
715   assert(showcmd && showcmd->dstate);
716
717   noit_console_state_add_cmd(showcmd->dstate,
718     NCSCMD("noits", stratcon_console_show_noits, NULL, NULL, NULL));
719 }
720
721 void
722 stratcon_jlog_streamer_init(const char *toplevel) {
723   pthread_mutex_init(&noits_lock, NULL);
724   eventer_name_callback("noit_connection_reinitiate",
725                         noit_connection_reinitiate);
726   eventer_name_callback("stratcon_jlog_recv_handler",
727                         stratcon_jlog_recv_handler);
728   eventer_name_callback("noit_connection_ssl_upgrade",
729                         noit_connection_ssl_upgrade);
730   eventer_name_callback("noit_connection_complete_connect",
731                         noit_connection_complete_connect);
732   register_console_streamer_commands();
733   stratcon_jlog_streamer_reload(toplevel);
734 }
Note: See TracBrowser for help on using the browser.