root/src/stratcon_jlog_streamer.c

Revision 1cd669dc031d6aa16971e3895924d5507e3b76bd, 16.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

fixes #127

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "noit_jlog_listener.h"
12 #include "stratcon_datastore.h"
13 #include "stratcon_jlog_streamer.h"
14
15 #include <unistd.h>
16 #include <assert.h>
17 #include <errno.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #ifdef HAVE_SYS_FILIO_H
21 #include <sys/filio.h>
22 #endif
23 #include <netinet/in.h>
24 #include <sys/un.h>
25 #include <arpa/inet.h>
26
27 noit_hash_table noits = NOIT_HASH_EMPTY;
28
29 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
30
31 jlog_streamer_ctx_t *
32 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
33   jlog_streamer_ctx_t *ctx;
34   ctx = stratcon_jlog_streamer_ctx_alloc();
35   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
36   ctx->push = stratcon_datastore_push;
37   return ctx;
38 }
39 jlog_streamer_ctx_t *
40 stratcon_jlog_streamer_ctx_alloc(void) {
41   jlog_streamer_ctx_t *ctx;
42   ctx = calloc(1, sizeof(*ctx));
43   return ctx;
44 }
45 noit_connection_ctx_t *
46 noit_connection_ctx_alloc(void) {
47   noit_connection_ctx_t *ctx;
48   ctx = calloc(1, sizeof(*ctx));
49   return ctx;
50 }
51 int
52 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
53                          struct timeval *now) {
54   noit_connection_ctx_t *ctx = closure;
55   ctx->timeout_event = NULL;
56   noit_connection_initiate_connection(closure);
57   return 0;
58 }
59 void
60 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
61                                    struct timeval *now) {
62   struct timeval __now, interval;
63   const char *v;
64   u_int32_t min_interval = 1000, max_interval = 8000;
65   if(noit_hash_retr_str(ctx->config,
66                         "reconnect_initial_interval",
67                         strlen("reconnect_initial_interval"),
68                         &v)) {
69     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
70   }
71   if(noit_hash_retr_str(ctx->config,
72                         "reconnect_maximum_interval",
73                         strlen("reconnect_maximum_interval"),
74                         &v)) {
75     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
76   }
77   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
78   else {
79     ctx->current_backoff *= 2;
80     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
81     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
82   }
83   if(!now) {
84     gettimeofday(&__now, NULL);
85     now = &__now;
86   }
87   interval.tv_sec = ctx->current_backoff / 1000;
88   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
89   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
90         ctx->current_backoff);
91   if(ctx->timeout_event)
92     eventer_remove(ctx->timeout_event);
93   else
94     ctx->timeout_event = eventer_alloc();
95   ctx->timeout_event->callback = noit_connection_reinitiate;
96   ctx->timeout_event->closure = ctx;
97   ctx->timeout_event->mask = EVENTER_TIMER;
98   add_timeval(*now, interval, &ctx->timeout_event->whence);
99   eventer_add(ctx->timeout_event);
100 }
101 void
102 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
103   if(ctx->remote_cn) free(ctx->remote_cn);
104   if(ctx->timeout_event) {
105     eventer_remove(ctx->timeout_event);
106     eventer_free(ctx->timeout_event);
107   }
108   ctx->consumer_free(ctx->consumer_ctx);
109   free(ctx);
110 }
111 void
112 jlog_streamer_ctx_free(void *cl) {
113   jlog_streamer_ctx_t *ctx = cl;
114   if(ctx->buffer) free(ctx->buffer);
115   free(ctx);
116 }
117
118 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
119 static int
120 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
121   int len, mask;
122   while(ctx->bytes_read < ctx->bytes_expected) {
123     len = Eread(ctx->buffer + ctx->bytes_read,
124                 ctx->bytes_expected - ctx->bytes_read);
125     if(len < 0) {
126       *newmask = mask;
127       return -1;
128     }
129     /* if we get 0 inside SSL, and there was a real error, we
130      * will actually get a -1 here.
131      * if(len == 0) return ctx->bytes_read;
132      */
133     ctx->bytes_read += len;
134   }
135   assert(ctx->bytes_read == ctx->bytes_expected);
136   return ctx->bytes_read;
137 }
138 #define FULLREAD(e,ctx,size) do { \
139   int mask, len; \
140   if(!ctx->bytes_expected) { \
141     ctx->bytes_expected = size; \
142     if(ctx->buffer) free(ctx->buffer); \
143     ctx->buffer = malloc(size + 1); \
144     if(ctx->buffer == NULL) { \
145       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
146       goto socket_error; \
147     } \
148     ctx->buffer[size] = '\0'; \
149   } \
150   len = __read_on_ctx(e, ctx, &mask); \
151   if(len < 0) { \
152     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
153     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
154     goto socket_error; \
155   } \
156   ctx->bytes_read = 0; \
157   ctx->bytes_expected = 0; \
158   if(len != size) { \
159     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
160           ctx->state, len, (long unsigned int)size); \
161     goto socket_error; \
162   } \
163 } while(0)
164
165 int
166 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
167                            struct timeval *now) {
168   noit_connection_ctx_t *nctx = closure;
169   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
170   int len;
171   jlog_id n_chkpt;
172
173   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
174     if(write(e->fd, e, 0) == -1)
175       noitL(noit_error, "socket error: %s\n", strerror(errno));
176  socket_error:
177     ctx->state = JLOG_STREAMER_WANT_INITIATE;
178     ctx->count = 0;
179     ctx->bytes_read = 0;
180     ctx->bytes_expected = 0;
181     if(ctx->buffer) free(ctx->buffer);
182     ctx->buffer = NULL;
183     noit_connection_schedule_reattempt(nctx, now);
184     eventer_remove_fd(e->fd);
185     e->opset->close(e->fd, &mask, e);
186     return 0;
187   }
188
189   while(1) {
190     switch(ctx->state) {
191       case JLOG_STREAMER_WANT_INITIATE:
192         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
193                               sizeof(&ctx->jlog_feed_cmd),
194                               &mask, e);
195         if(len < 0) {
196           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
197           goto socket_error;
198         }
199         if(len != sizeof(ctx->jlog_feed_cmd)) {
200           noitL(noit_error, "short write on initiating stream.\n");
201           goto socket_error;
202         }
203         ctx->state = JLOG_STREAMER_WANT_COUNT;
204         break;
205
206       case JLOG_STREAMER_WANT_COUNT:
207         FULLREAD(e, ctx, sizeof(u_int32_t));
208         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
209         ctx->count = ntohl(ctx->count);
210         free(ctx->buffer); ctx->buffer = NULL;
211         ctx->state = JLOG_STREAMER_WANT_HEADER;
212         break;
213
214       case JLOG_STREAMER_WANT_HEADER:
215         if(ctx->count == 0) {
216           ctx->state = JLOG_STREAMER_WANT_COUNT;
217           break;
218         }
219         FULLREAD(e, ctx, sizeof(ctx->header));
220         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
221         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
222         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
223         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
224         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
225         ctx->header.message_len = ntohl(ctx->header.message_len);
226         free(ctx->buffer); ctx->buffer = NULL;
227         ctx->state = JLOG_STREAMER_WANT_BODY;
228         break;
229
230       case JLOG_STREAMER_WANT_BODY:
231         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
232         ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);
233         /* Don't free the buffer, it's used by the datastore process. */
234         ctx->buffer = NULL;
235         ctx->count--;
236         if(ctx->count == 0) {
237           eventer_t completion_e;
238           eventer_remove_fd(e->fd);
239           completion_e = eventer_alloc();
240           memcpy(completion_e, e, sizeof(*e));
241           completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION;
242           ctx->state = JLOG_STREAMER_WANT_CHKPT;
243           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e);
244           noitL(noit_debug, "Pushing batch asynch...\n");
245           return 0;
246         } else
247           ctx->state = JLOG_STREAMER_WANT_HEADER;
248         break;
249
250       case JLOG_STREAMER_WANT_CHKPT:
251         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
252               ctx->header.chkpt.log, ctx->header.chkpt.marker);
253         n_chkpt.log = htonl(ctx->header.chkpt.log);
254         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
255
256         /* screw short writes.  I'd rather die than not write my data! */
257         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
258                               &mask, e);
259         if(len < 0) {
260           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
261           goto socket_error;
262         }
263         if(len != sizeof(jlog_id)) {
264           noitL(noit_error, "short write on checkpointing stream.\n");
265           goto socket_error;
266         }
267         ctx->state = JLOG_STREAMER_WANT_COUNT;
268         break;
269     }
270   }
271   /* never get here */
272 }
273
274 int
275 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
276                             struct timeval *now) {
277   noit_connection_ctx_t *nctx = closure;
278   int rv;
279
280   rv = eventer_SSL_connect(e, &mask);
281   if(rv > 0) {
282     eventer_ssl_ctx_t *sslctx;
283     e->callback = nctx->consumer_callback;
284     /* We must make a copy of the acceptor_closure_t for each new
285      * connection.
286      */
287     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
288       char *cn, *end;
289       cn = eventer_ssl_get_peer_subject(sslctx);
290       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
291         cn += 3;
292         end = cn;
293         while(*end && *end != '/') end++;
294         nctx->remote_cn = malloc(end - cn + 1);
295         memcpy(nctx->remote_cn, cn, end - cn);
296         nctx->remote_cn[end-cn] = '\0';
297       }
298     }
299     return e->callback(e, mask, e->closure, now);
300   }
301   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
302
303   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
304   eventer_remove_fd(e->fd);
305   e->opset->close(e->fd, &mask, e);
306   noit_connection_schedule_reattempt(nctx, now);
307   return 0;
308 }
309 int
310 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
311                                  struct timeval *now) {
312   noit_connection_ctx_t *nctx = closure;
313   const char *cert, *key, *ca, *ciphers;
314   eventer_ssl_ctx_t *sslctx;
315
316   if(mask & EVENTER_EXCEPTION) {
317     if(write(e->fd, e, 0) == -1)
318       noitL(noit_error, "socket error: %s\n", strerror(errno));
319  connect_error:
320     eventer_remove_fd(e->fd);
321     e->opset->close(e->fd, &mask, e);
322     noit_connection_schedule_reattempt(nctx, now);
323     return 0;
324   }
325
326 #define SSLCONFGET(var,name) do { \
327   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
328                          &var)) var = NULL; } while(0)
329   SSLCONFGET(cert, "certificate_file");
330   SSLCONFGET(key, "key_file");
331   SSLCONFGET(ca, "ca_chain");
332   SSLCONFGET(ciphers, "ciphers");
333   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
334   if(!sslctx) goto connect_error;
335
336   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
337                              nctx->sslconfig);
338   EVENTER_ATTACH_SSL(e, sslctx);
339   e->callback = noit_connection_ssl_upgrade;
340   return e->callback(e, mask, closure, now);
341 }
342 static void
343 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
344   struct timeval __now;
345   eventer_t e;
346   int rv, fd = -1;
347   long on;
348
349   /* Open a socket */
350   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
351   if(fd < 0) goto reschedule;
352
353   /* Make it non-blocking */
354   on = 1;
355   if(ioctl(fd, FIONBIO, &on)) goto reschedule;
356
357   /* Initiate a connection */
358   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
359   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
360
361   /* Register a handler for connection completion */
362   e = eventer_alloc();
363   e->fd = fd;
364   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
365   e->callback = noit_connection_complete_connect;
366   e->closure = nctx;
367   eventer_add(e);
368   return;
369
370  reschedule:
371   if(fd >= 0) close(fd);
372   gettimeofday(&__now, NULL);
373   noit_connection_schedule_reattempt(nctx, &__now);
374   return;
375 }
376
377 int
378 initiate_noit_connection(const char *host, unsigned short port,
379                          noit_hash_table *sslconfig, noit_hash_table *config,
380                          eventer_func_t handler, void *closure,
381                          void (*freefunc)(void *)) {
382   noit_connection_ctx_t *ctx;
383
384   int8_t family;
385   int rv;
386   union {
387     struct in_addr addr4;
388     struct in6_addr addr6;
389   } a;
390
391   if(host[0] == '/') {
392     family = AF_UNIX;
393   }
394   else {
395     family = AF_INET;
396     rv = inet_pton(family, host, &a);
397     if(rv != 1) {
398       family = AF_INET6;
399       rv = inet_pton(family, host, &a);
400       if(rv != 1) {
401         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
402         return -1;
403       }
404     }
405   }
406
407   ctx = noit_connection_ctx_alloc();
408  
409   memset(&ctx->r, 0, sizeof(ctx->r));
410   if(family == AF_UNIX) {
411     struct sockaddr_un *s = &ctx->r.remote_un;
412     s->sun_family = AF_UNIX;
413     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
414     ctx->remote_len = sizeof(*s);
415   }
416   else if(family == AF_INET) {
417     struct sockaddr_in *s = &ctx->r.remote_in;
418     s->sin_family = family;
419     s->sin_port = htons(port);
420     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
421     ctx->remote_len = sizeof(*s);
422   }
423   else {
424     struct sockaddr_in6 *s = &ctx->r.remote_in6;
425     s->sin6_family = family;
426     s->sin6_port = htons(port);
427     memcpy(&s->sin6_addr, &a, sizeof(a));
428     ctx->remote_len = sizeof(*s);
429   }
430
431   if(ctx->sslconfig)
432     noit_hash_delete_all(ctx->sslconfig, free, free);
433   else
434     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
435   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
436   if(ctx->config)
437     noit_hash_delete_all(ctx->config, free, free);
438   else
439     ctx->config = calloc(1, sizeof(noit_hash_table));
440   noit_hash_merge_as_dict(ctx->config, config);
441
442   ctx->consumer_callback = handler;
443   ctx->consumer_free = freefunc;
444   ctx->consumer_ctx = closure;
445   noit_connection_initiate_connection(ctx);
446   return 0;
447 }
448
449 void
450 stratcon_streamer_connection(const char *toplevel, const char *destination,
451                              eventer_func_t handler,
452                              void *(*handler_alloc)(void), void *handler_ctx,
453                              void (*handler_free)(void *)) {
454   int i, cnt = 0;
455   noit_conf_section_t *noit_configs;
456   char path[256];
457
458   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
459   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
460   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
461   for(i=0; i<cnt; i++) {
462     char address[256];
463     unsigned short port;
464     int portint;
465     noit_hash_table *sslconfig, *config;
466
467     if(!noit_conf_get_stringbuf(noit_configs[i],
468                                 "ancestor-or-self::node()/@address",
469                                 address, sizeof(address))) {
470       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
471       continue;
472     }
473     /* if destination is specified, exact match it */
474     if(destination && strcmp(address, destination)) continue;
475
476     if(!noit_conf_get_int(noit_configs[i],
477                           "ancestor-or-self::node()/@port", &portint))
478       portint = 0;
479     port = (unsigned short) portint;
480     if(address[0] != '/' && (portint == 0 || (port != portint))) {
481       /* UNIX sockets don't require a port (they'll ignore it if specified */
482       noitL(noit_stderr,
483             "Invalid port [%d] specified in stanza %d\n", port, i+1);
484       continue;
485     }
486     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
487     config = noit_conf_get_hash(noit_configs[i], "config");
488
489     initiate_noit_connection(address, port, sslconfig, config,
490                              handler,
491                              handler_alloc ? handler_alloc() : handler_ctx,
492                              handler_free);
493   }
494 }
495 void
496 stratcon_jlog_streamer_reload(const char *toplevel) {
497   stratcon_streamer_connection(toplevel, NULL,
498                                stratcon_jlog_recv_handler,
499                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
500                                NULL,
501                                jlog_streamer_ctx_free);
502 }
503
504 void
505 stratcon_jlog_streamer_init(const char *toplevel) {
506   eventer_name_callback("noit_connection_reinitiate",
507                         noit_connection_reinitiate);
508   eventer_name_callback("stratcon_jlog_recv_handler",
509                         stratcon_jlog_recv_handler);
510   eventer_name_callback("noit_connection_ssl_upgrade",
511                         noit_connection_ssl_upgrade);
512   eventer_name_callback("noit_connection_complete_connect",
513                         noit_connection_complete_connect);
514   stratcon_jlog_streamer_reload(toplevel);
515 }
Note: See TracBrowser for help on using the browser.