root/src/stratcon_jlog_streamer.c

Revision fb5f8f9daa6e4df8e9acc3c11501205c21866b72, 14.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

fix up SSL layer to handle termination and make the jlog rcpt side completely reset on error, refs #41

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "stratcon_datastore.h"
14
15 #include <unistd.h>
16 #include <assert.h>
17 #include <errno.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #ifdef HAVE_SYS_FILIO_H
21 #include <sys/filio.h>
22 #endif
23 #include <netinet/in.h>
24 #include <sys/un.h>
25 #include <arpa/inet.h>
26
27 noit_hash_table noits = NOIT_HASH_EMPTY;
28
29 typedef struct jlog_streamer_ctx_t {
30   union {
31     struct sockaddr remote;
32     struct sockaddr_un remote_un;
33     struct sockaddr_in remote_in;
34     struct sockaddr_in6 remote_in6;
35   } r;
36   socklen_t remote_len;
37   char *remote_cn;
38   u_int32_t current_backoff;
39   int wants_shutdown;
40   noit_hash_table *config;
41   noit_hash_table *sslconfig;
42   int bytes_expected;
43   int bytes_read;
44   char *buffer;         /* These guys are for doing partial reads */
45
46   enum {
47     WANT_COUNT = 0,
48     WANT_HEADER = 1,
49     WANT_BODY = 2,
50     WANT_CHKPT = 3,
51   } state;
52   int count;            /* Number of jlog messages we need to read */
53   struct {
54     jlog_id   chkpt;
55     u_int32_t tv_sec;
56     u_int32_t tv_usec;
57     u_int32_t message_len;
58   } header;
59
60   eventer_t timeout_event;
61 } jlog_streamer_ctx_t;
62
63 static void jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx);
64
65 jlog_streamer_ctx_t *
66 jlog_streamer_ctx_alloc(void) {
67   jlog_streamer_ctx_t *ctx;
68   ctx = calloc(1, sizeof(*ctx));
69   return ctx;
70 }
71 int
72 jlog_streamer_reinitiate(eventer_t e, int mask, void *closure,
73                          struct timeval *now) {
74   jlog_streamer_ctx_t *ctx = closure;
75   ctx->timeout_event = NULL;
76   jlog_streamer_initiate_connection(closure);
77   return 0;
78 }
79 void
80 jlog_streamer_schedule_reattempt(jlog_streamer_ctx_t *ctx,
81                                  struct timeval *now) {
82   struct timeval __now, interval;
83   const char *v;
84   u_int32_t min_interval = 1000, max_interval = 60000;
85   if(noit_hash_retrieve(ctx->config,
86                         "reconnect_initial_interval",
87                         strlen("reconnect_initial_interval"),
88                         (void **)&v)) {
89     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
90   }
91   if(noit_hash_retrieve(ctx->config,
92                         "reconnect_maximum_interval",
93                         strlen("reconnect_maximum_interval"),
94                         (void **)&v)) {
95     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
96   }
97   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
98   else {
99     ctx->current_backoff *= 2;
100     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
101     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
102   }
103   if(!now) {
104     gettimeofday(&__now, NULL);
105     now = &__now;
106   }
107   interval.tv_sec = ctx->current_backoff / 1000;
108   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
109   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
110         ctx->current_backoff);
111   if(ctx->timeout_event)
112     eventer_remove(ctx->timeout_event);
113   else
114     ctx->timeout_event = eventer_alloc();
115   ctx->timeout_event->callback = jlog_streamer_reinitiate;
116   ctx->timeout_event->closure = ctx;
117   ctx->timeout_event->mask = EVENTER_TIMER;
118   add_timeval(*now, interval, &ctx->timeout_event->whence);
119   eventer_add(ctx->timeout_event);
120 }
121 void
122 jlog_streamer_ctx_free(jlog_streamer_ctx_t *ctx) {
123   if(ctx->buffer) free(ctx->buffer);
124   if(ctx->remote_cn) free(ctx->remote_cn);
125   if(ctx->timeout_event) {
126     eventer_remove(ctx->timeout_event);
127     eventer_free(ctx->timeout_event);
128   }
129   free(ctx);
130 }
131
132 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
133 static int
134 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
135   int len, mask;
136   while(ctx->bytes_read < ctx->bytes_expected) {
137     len = Eread(ctx->buffer + ctx->bytes_read,
138                 ctx->bytes_expected - ctx->bytes_read);
139     if(len < 0) {
140       *newmask = mask;
141       return -1;
142     }
143     /* if we get 0 inside SSL, and there was a real error, we
144      * will actually get a -1 here.
145      * if(len == 0) return ctx->bytes_read;
146      */
147     ctx->bytes_read += len;
148   }
149   assert(ctx->bytes_read == ctx->bytes_expected);
150   return ctx->bytes_read;
151 }
152 #define FULLREAD(e,ctx,size) do { \
153   int mask, len; \
154   if(!ctx->bytes_expected) { \
155     ctx->bytes_expected = size; \
156     if(ctx->buffer) free(ctx->buffer); \
157     ctx->buffer = malloc(size + 1); \
158     if(ctx->buffer == NULL) { \
159       noitL(noit_error, "malloc(%lu) failed.\n", size + 1); \
160       goto socket_error; \
161     } \
162     ctx->buffer[size] = '\0'; \
163   } \
164   len = __read_on_ctx(e, ctx, &mask); \
165   if(len < 0) { \
166     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
167     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
168     goto socket_error; \
169   } \
170   ctx->bytes_read = 0; \
171   ctx->bytes_expected = 0; \
172   if(len != size) { \
173     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
174           ctx->state, len, size); \
175     goto socket_error; \
176   } \
177 } while(0)
178
179 int
180 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
181                            struct timeval *now) {
182   jlog_streamer_ctx_t *ctx = closure;
183   int len;
184   jlog_id n_chkpt;
185
186   if(mask & EVENTER_EXCEPTION || ctx->wants_shutdown) {
187  socket_error:
188     ctx->state = WANT_COUNT;
189     ctx->count = 0;
190     ctx->bytes_read = 0;
191     ctx->bytes_expected = 0;
192     if(ctx->buffer) free(ctx->buffer);
193     ctx->buffer = NULL;
194     jlog_streamer_schedule_reattempt(ctx, now);
195     eventer_remove_fd(e->fd);
196     e->opset->close(e->fd, &mask, e);
197     return 0;
198   }
199
200   while(1) {
201     switch(ctx->state) {
202       case WANT_COUNT:
203         FULLREAD(e, ctx, sizeof(u_int32_t));
204         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
205         ctx->count = ntohl(ctx->count);
206         free(ctx->buffer); ctx->buffer = NULL;
207         ctx->state = WANT_HEADER;
208         break;
209
210       case WANT_HEADER:
211         if(ctx->count == 0) {
212           ctx->state = WANT_COUNT;
213           break;
214         }
215         FULLREAD(e, ctx, sizeof(ctx->header));
216         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
217         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
218         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
219         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
220         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
221         ctx->header.message_len = ntohl(ctx->header.message_len);
222         free(ctx->buffer); ctx->buffer = NULL;
223         ctx->state = WANT_BODY;
224         break;
225
226       case WANT_BODY:
227         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
228         stratcon_datastore_push(DS_OP_INSERT, &ctx->r.remote, ctx->buffer);
229         /* Don't free the buffer, it's used by the datastore process. */
230         ctx->buffer = NULL;
231         ctx->count--;
232         if(ctx->count == 0) {
233           eventer_t completion_e;
234           eventer_remove_fd(e->fd);
235           completion_e = eventer_alloc();
236           memcpy(completion_e, e, sizeof(*e));
237           completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION;
238           ctx->state = WANT_CHKPT;
239           stratcon_datastore_push(DS_OP_CHKPT, &ctx->r.remote, completion_e);
240           noitL(noit_debug, "Pushing batch asynch...\n");
241           return 0;
242         } else
243           ctx->state = WANT_HEADER;
244         break;
245
246       case WANT_CHKPT:
247         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
248               ctx->header.chkpt.log, ctx->header.chkpt.marker);
249         n_chkpt.log = htonl(ctx->header.chkpt.log);
250         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
251
252         /* screw short writes.  I'd rather die than not write my data! */
253         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
254                               &mask, e);
255         if(len < 0) {
256           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
257           goto socket_error;
258         }
259         if(len != sizeof(jlog_id)) {
260           noitL(noit_error, "short write on checkpointing stream.\n");
261           goto socket_error;
262         }
263         ctx->state = WANT_COUNT;
264         break;
265     }
266   }
267   /* never get here */
268 }
269
270 int
271 jlog_streamer_ssl_upgrade(eventer_t e, int mask, void *closure,
272                           struct timeval *now) {
273   jlog_streamer_ctx_t *ctx = closure;
274   int rv;
275
276   rv = eventer_SSL_connect(e, &mask);
277   if(rv > 0) {
278     eventer_ssl_ctx_t *sslctx;
279     e->callback = stratcon_jlog_recv_handler;
280     /* We must make a copy of the acceptor_closure_t for each new
281      * connection.
282      */
283     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
284       char *cn, *end;
285       cn = eventer_ssl_get_peer_subject(sslctx);
286       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
287         cn += 3;
288         end = cn;
289         while(*end && *end != '/') end++;
290         ctx->remote_cn = malloc(end - cn + 1);
291         memcpy(ctx->remote_cn, cn, end - cn);
292         ctx->remote_cn[end-cn] = '\0';
293       }
294     }
295     return e->callback(e, mask, e->closure, now);
296   }
297   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
298
299   eventer_remove_fd(e->fd);
300   e->opset->close(e->fd, &mask, e);
301   jlog_streamer_schedule_reattempt(ctx, now);
302   return 0;
303 }
304 int
305 jlog_streamer_complete_connect(eventer_t e, int mask, void *closure,
306                                struct timeval *now) {
307   jlog_streamer_ctx_t *ctx = closure;
308   char *cert, *key, *ca, *ciphers;
309   eventer_ssl_ctx_t *sslctx;
310
311   if(mask & EVENTER_EXCEPTION) {
312  connect_error:
313     eventer_remove_fd(e->fd);
314     e->opset->close(e->fd, &mask, e);
315     jlog_streamer_schedule_reattempt(ctx, now);
316     return 0;
317   }
318
319 #define SSLCONFGET(var,name) do { \
320   if(!noit_hash_retrieve(ctx->sslconfig, name, strlen(name), \
321                          (void **)&var)) var = NULL; } while(0)
322   SSLCONFGET(cert, "certificate_file");
323   SSLCONFGET(key, "key_file");
324   SSLCONFGET(ca, "ca_chain");
325   SSLCONFGET(ciphers, "ciphers");
326   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
327   if(!sslctx) goto connect_error;
328
329   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
330                              ctx->sslconfig);
331   EVENTER_ATTACH_SSL(e, sslctx);
332   e->callback = jlog_streamer_ssl_upgrade;
333   return e->callback(e, mask, closure, now);
334 }
335 static void
336 jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx) {
337   struct timeval __now;
338   eventer_t e;
339   int rv, fd = -1;
340   long on;
341
342   /* Open a socket */
343   fd = socket(ctx->r.remote.sa_family, SOCK_STREAM, 0);
344   if(fd < 0) goto reschedule;
345
346   /* Make it non-blocking */
347   on = 1;
348   if(ioctl(fd, FIONBIO, &on)) goto reschedule;
349
350   /* Initiate a connection */
351   rv = connect(fd, &ctx->r.remote, ctx->remote_len);
352   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
353
354   /* Register a handler for connection completion */
355   e = eventer_alloc();
356   e->fd = fd;
357   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
358   e->callback = jlog_streamer_complete_connect;
359   e->closure = ctx;
360   eventer_add(e);
361   return;
362
363  reschedule:
364   if(fd >= 0) close(fd);
365   gettimeofday(&__now, NULL);
366   jlog_streamer_schedule_reattempt(ctx, &__now);
367   return;
368 }
369
370 int
371 initiate_jlog_streamer(const char *host, unsigned short port,
372                        noit_hash_table *sslconfig, noit_hash_table *config) {
373   jlog_streamer_ctx_t *ctx;
374
375   int8_t family;
376   int rv;
377   union {
378     struct in_addr addr4;
379     struct in6_addr addr6;
380   } a;
381
382   if(host[0] == '/') {
383     family = AF_UNIX;
384   }
385   else {
386     family = AF_INET;
387     rv = inet_pton(family, host, &a);
388     if(rv != 1) {
389       family = AF_INET6;
390       rv = inet_pton(family, host, &a);
391       if(rv != 1) {
392         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
393         return -1;
394       }
395     }
396   }
397
398   ctx = jlog_streamer_ctx_alloc();
399  
400   memset(&ctx->r, 0, sizeof(ctx->r));
401   if(family == AF_UNIX) {
402     struct sockaddr_un *s = &ctx->r.remote_un;
403     s->sun_family = AF_UNIX;
404     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
405     ctx->remote_len = sizeof(*s);
406   }
407   else if(family == AF_INET) {
408     struct sockaddr_in *s = &ctx->r.remote_in;
409     s->sin_family = family;
410     s->sin_port = htons(port);
411     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
412     ctx->remote_len = sizeof(*s);
413   }
414   else {
415     struct sockaddr_in6 *s = &ctx->r.remote_in6;
416     s->sin6_family = family;
417     s->sin6_port = htons(port);
418     memcpy(&s->sin6_addr, &a, sizeof(a));
419     ctx->remote_len = sizeof(*s);
420   }
421
422   if(ctx->sslconfig)
423     noit_hash_delete_all(ctx->sslconfig, free, free);
424   else
425     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
426   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
427   if(ctx->config)
428     noit_hash_delete_all(ctx->config, free, free);
429   else
430     ctx->config = calloc(1, sizeof(noit_hash_table));
431   noit_hash_merge_as_dict(ctx->config, config);
432
433   jlog_streamer_initiate_connection(ctx);
434   return 0;
435 }
436
437 void
438 stratcon_jlog_streamer_reload(const char *toplevel) {
439   int i, cnt = 0;
440   noit_conf_section_t *noit_configs;
441   char path[256];
442
443   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
444   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
445   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
446   for(i=0; i<cnt; i++) {
447     char address[256];
448     unsigned short port;
449     int portint;
450     noit_hash_table *sslconfig, *config;
451
452     if(!noit_conf_get_stringbuf(noit_configs[i],
453                                 "ancestor-or-self::node()/@address",
454                                 address, sizeof(address))) {
455       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
456       continue;
457     }
458     if(!noit_conf_get_int(noit_configs[i],
459                           "ancestor-or-self::node()/@port", &portint))
460       portint = 0;
461     port = (unsigned short) portint;
462     if(address[0] != '/' && (portint == 0 || (port != portint))) {
463       /* UNIX sockets don't require a port (they'll ignore it if specified */
464       noitL(noit_stderr,
465             "Invalid port [%d] specified in stanza %d\n", port, i+1);
466       continue;
467     }
468     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
469     config = noit_conf_get_hash(noit_configs[i], "config");
470
471     initiate_jlog_streamer(address, port, sslconfig, config);
472   }
473 }
474
475 void
476 stratcon_jlog_streamer_init(const char *toplevel) {
477   eventer_name_callback("jlog_streamer_reinitiate",
478                         jlog_streamer_reinitiate);
479   eventer_name_callback("stratcon_jlog_recv_handler",
480                         stratcon_jlog_recv_handler);
481   eventer_name_callback("jlog_streamer_ssl_upgrade",
482                         jlog_streamer_ssl_upgrade);
483   eventer_name_callback("jlog_streamer_complete_connect",
484                         jlog_streamer_complete_connect);
485   stratcon_jlog_streamer_reload(toplevel);
486 }
Note: See TracBrowser for help on using the browser.