root/src/stratcon_jlog_streamer.c

Revision 6bb9ef8a16723394cc4e5c511f06fe2173d78a11, 15.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 10 years ago)

closes #52. flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "stratcon_datastore.h"
14
15 #include <unistd.h>
16 #include <assert.h>
17 #include <errno.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #ifdef HAVE_SYS_FILIO_H
21 #include <sys/filio.h>
22 #endif
23 #include <netinet/in.h>
24 #include <sys/un.h>
25 #include <arpa/inet.h>
26
27 noit_hash_table noits = NOIT_HASH_EMPTY;
28
29 typedef struct jlog_streamer_ctx_t {
30   union {
31     struct sockaddr remote;
32     struct sockaddr_un remote_un;
33     struct sockaddr_in remote_in;
34     struct sockaddr_in6 remote_in6;
35   } r;
36   socklen_t remote_len;
37   char *remote_cn;
38   u_int32_t current_backoff;
39   int wants_shutdown;
40   noit_hash_table *config;
41   noit_hash_table *sslconfig;
42   int bytes_expected;
43   int bytes_read;
44   char *buffer;         /* These guys are for doing partial reads */
45
46   enum {
47     WANT_INITIATE = 0,
48     WANT_COUNT = 1,
49     WANT_HEADER = 2,
50     WANT_BODY = 3,
51     WANT_CHKPT = 4,
52   } state;
53   int count;            /* Number of jlog messages we need to read */
54   struct {
55     jlog_id   chkpt;
56     u_int32_t tv_sec;
57     u_int32_t tv_usec;
58     u_int32_t message_len;
59   } header;
60
61   eventer_t timeout_event;
62 } jlog_streamer_ctx_t;
63
64 static void jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx);
65
66 jlog_streamer_ctx_t *
67 jlog_streamer_ctx_alloc(void) {
68   jlog_streamer_ctx_t *ctx;
69   ctx = calloc(1, sizeof(*ctx));
70   return ctx;
71 }
72 int
73 jlog_streamer_reinitiate(eventer_t e, int mask, void *closure,
74                          struct timeval *now) {
75   jlog_streamer_ctx_t *ctx = closure;
76   ctx->timeout_event = NULL;
77   jlog_streamer_initiate_connection(closure);
78   return 0;
79 }
80 void
81 jlog_streamer_schedule_reattempt(jlog_streamer_ctx_t *ctx,
82                                  struct timeval *now) {
83   struct timeval __now, interval;
84   const char *v;
85   u_int32_t min_interval = 1000, max_interval = 60000;
86   if(noit_hash_retrieve(ctx->config,
87                         "reconnect_initial_interval",
88                         strlen("reconnect_initial_interval"),
89                         (void **)&v)) {
90     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
91   }
92   if(noit_hash_retrieve(ctx->config,
93                         "reconnect_maximum_interval",
94                         strlen("reconnect_maximum_interval"),
95                         (void **)&v)) {
96     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
97   }
98   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
99   else {
100     ctx->current_backoff *= 2;
101     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
102     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
103   }
104   if(!now) {
105     gettimeofday(&__now, NULL);
106     now = &__now;
107   }
108   interval.tv_sec = ctx->current_backoff / 1000;
109   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
110   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
111         ctx->current_backoff);
112   if(ctx->timeout_event)
113     eventer_remove(ctx->timeout_event);
114   else
115     ctx->timeout_event = eventer_alloc();
116   ctx->timeout_event->callback = jlog_streamer_reinitiate;
117   ctx->timeout_event->closure = ctx;
118   ctx->timeout_event->mask = EVENTER_TIMER;
119   add_timeval(*now, interval, &ctx->timeout_event->whence);
120   eventer_add(ctx->timeout_event);
121 }
122 void
123 jlog_streamer_ctx_free(jlog_streamer_ctx_t *ctx) {
124   if(ctx->buffer) free(ctx->buffer);
125   if(ctx->remote_cn) free(ctx->remote_cn);
126   if(ctx->timeout_event) {
127     eventer_remove(ctx->timeout_event);
128     eventer_free(ctx->timeout_event);
129   }
130   free(ctx);
131 }
132
133 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
134 static int
135 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
136   int len, mask;
137   while(ctx->bytes_read < ctx->bytes_expected) {
138     len = Eread(ctx->buffer + ctx->bytes_read,
139                 ctx->bytes_expected - ctx->bytes_read);
140     if(len < 0) {
141       *newmask = mask;
142       return -1;
143     }
144     /* if we get 0 inside SSL, and there was a real error, we
145      * will actually get a -1 here.
146      * if(len == 0) return ctx->bytes_read;
147      */
148     ctx->bytes_read += len;
149   }
150   assert(ctx->bytes_read == ctx->bytes_expected);
151   return ctx->bytes_read;
152 }
153 #define FULLREAD(e,ctx,size) do { \
154   int mask, len; \
155   if(!ctx->bytes_expected) { \
156     ctx->bytes_expected = size; \
157     if(ctx->buffer) free(ctx->buffer); \
158     ctx->buffer = malloc(size + 1); \
159     if(ctx->buffer == NULL) { \
160       noitL(noit_error, "malloc(%lu) failed.\n", size + 1); \
161       goto socket_error; \
162     } \
163     ctx->buffer[size] = '\0'; \
164   } \
165   len = __read_on_ctx(e, ctx, &mask); \
166   if(len < 0) { \
167     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
168     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
169     goto socket_error; \
170   } \
171   ctx->bytes_read = 0; \
172   ctx->bytes_expected = 0; \
173   if(len != size) { \
174     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
175           ctx->state, len, size); \
176     goto socket_error; \
177   } \
178 } while(0)
179
180 int
181 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
182                            struct timeval *now) {
183   static u_int32_t jlog_feed_cmd = 0;
184   jlog_streamer_ctx_t *ctx = closure;
185   int len;
186   jlog_id n_chkpt;
187
188   if(!jlog_feed_cmd) jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
189
190   if(mask & EVENTER_EXCEPTION || ctx->wants_shutdown) {
191  socket_error:
192     ctx->state = WANT_INITIATE;
193     ctx->count = 0;
194     ctx->bytes_read = 0;
195     ctx->bytes_expected = 0;
196     if(ctx->buffer) free(ctx->buffer);
197     ctx->buffer = NULL;
198     jlog_streamer_schedule_reattempt(ctx, now);
199     eventer_remove_fd(e->fd);
200     e->opset->close(e->fd, &mask, e);
201     return 0;
202   }
203
204   while(1) {
205     switch(ctx->state) {
206       case WANT_INITIATE:
207         len = e->opset->write(e->fd, &jlog_feed_cmd, sizeof(&jlog_feed_cmd),
208                               &mask, e);
209         if(len < 0) {
210           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
211           goto socket_error;
212         }
213         if(len != sizeof(jlog_feed_cmd)) {
214           noitL(noit_error, "short write on initiating stream.\n");
215           goto socket_error;
216         }
217         ctx->state = WANT_COUNT;
218         break;
219
220       case WANT_COUNT:
221         FULLREAD(e, ctx, sizeof(u_int32_t));
222         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
223         ctx->count = ntohl(ctx->count);
224         free(ctx->buffer); ctx->buffer = NULL;
225         ctx->state = WANT_HEADER;
226         break;
227
228       case WANT_HEADER:
229         if(ctx->count == 0) {
230           ctx->state = WANT_COUNT;
231           break;
232         }
233         FULLREAD(e, ctx, sizeof(ctx->header));
234         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
235         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
236         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
237         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
238         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
239         ctx->header.message_len = ntohl(ctx->header.message_len);
240         free(ctx->buffer); ctx->buffer = NULL;
241         ctx->state = WANT_BODY;
242         break;
243
244       case WANT_BODY:
245         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
246         stratcon_datastore_push(DS_OP_INSERT, &ctx->r.remote, ctx->buffer);
247         /* Don't free the buffer, it's used by the datastore process. */
248         ctx->buffer = NULL;
249         ctx->count--;
250         if(ctx->count == 0) {
251           eventer_t completion_e;
252           eventer_remove_fd(e->fd);
253           completion_e = eventer_alloc();
254           memcpy(completion_e, e, sizeof(*e));
255           completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION;
256           ctx->state = WANT_CHKPT;
257           stratcon_datastore_push(DS_OP_CHKPT, &ctx->r.remote, completion_e);
258           noitL(noit_debug, "Pushing batch asynch...\n");
259           return 0;
260         } else
261           ctx->state = WANT_HEADER;
262         break;
263
264       case WANT_CHKPT:
265         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
266               ctx->header.chkpt.log, ctx->header.chkpt.marker);
267         n_chkpt.log = htonl(ctx->header.chkpt.log);
268         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
269
270         /* screw short writes.  I'd rather die than not write my data! */
271         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
272                               &mask, e);
273         if(len < 0) {
274           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
275           goto socket_error;
276         }
277         if(len != sizeof(jlog_id)) {
278           noitL(noit_error, "short write on checkpointing stream.\n");
279           goto socket_error;
280         }
281         ctx->state = WANT_COUNT;
282         break;
283     }
284   }
285   /* never get here */
286 }
287
288 int
289 jlog_streamer_ssl_upgrade(eventer_t e, int mask, void *closure,
290                           struct timeval *now) {
291   jlog_streamer_ctx_t *ctx = closure;
292   int rv;
293
294   rv = eventer_SSL_connect(e, &mask);
295   if(rv > 0) {
296     eventer_ssl_ctx_t *sslctx;
297     e->callback = stratcon_jlog_recv_handler;
298     /* We must make a copy of the acceptor_closure_t for each new
299      * connection.
300      */
301     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
302       char *cn, *end;
303       cn = eventer_ssl_get_peer_subject(sslctx);
304       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
305         cn += 3;
306         end = cn;
307         while(*end && *end != '/') end++;
308         ctx->remote_cn = malloc(end - cn + 1);
309         memcpy(ctx->remote_cn, cn, end - cn);
310         ctx->remote_cn[end-cn] = '\0';
311       }
312     }
313     return e->callback(e, mask, e->closure, now);
314   }
315   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
316
317   eventer_remove_fd(e->fd);
318   e->opset->close(e->fd, &mask, e);
319   jlog_streamer_schedule_reattempt(ctx, now);
320   return 0;
321 }
322 int
323 jlog_streamer_complete_connect(eventer_t e, int mask, void *closure,
324                                struct timeval *now) {
325   jlog_streamer_ctx_t *ctx = closure;
326   char *cert, *key, *ca, *ciphers;
327   eventer_ssl_ctx_t *sslctx;
328
329   if(mask & EVENTER_EXCEPTION) {
330  connect_error:
331     eventer_remove_fd(e->fd);
332     e->opset->close(e->fd, &mask, e);
333     jlog_streamer_schedule_reattempt(ctx, now);
334     return 0;
335   }
336
337 #define SSLCONFGET(var,name) do { \
338   if(!noit_hash_retrieve(ctx->sslconfig, name, strlen(name), \
339                          (void **)&var)) var = NULL; } while(0)
340   SSLCONFGET(cert, "certificate_file");
341   SSLCONFGET(key, "key_file");
342   SSLCONFGET(ca, "ca_chain");
343   SSLCONFGET(ciphers, "ciphers");
344   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
345   if(!sslctx) goto connect_error;
346
347   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
348                              ctx->sslconfig);
349   EVENTER_ATTACH_SSL(e, sslctx);
350   e->callback = jlog_streamer_ssl_upgrade;
351   return e->callback(e, mask, closure, now);
352 }
353 static void
354 jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx) {
355   struct timeval __now;
356   eventer_t e;
357   int rv, fd = -1;
358   long on;
359
360   /* Open a socket */
361   fd = socket(ctx->r.remote.sa_family, SOCK_STREAM, 0);
362   if(fd < 0) goto reschedule;
363
364   /* Make it non-blocking */
365   on = 1;
366   if(ioctl(fd, FIONBIO, &on)) goto reschedule;
367
368   /* Initiate a connection */
369   rv = connect(fd, &ctx->r.remote, ctx->remote_len);
370   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
371
372   /* Register a handler for connection completion */
373   e = eventer_alloc();
374   e->fd = fd;
375   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
376   e->callback = jlog_streamer_complete_connect;
377   e->closure = ctx;
378   eventer_add(e);
379   return;
380
381  reschedule:
382   if(fd >= 0) close(fd);
383   gettimeofday(&__now, NULL);
384   jlog_streamer_schedule_reattempt(ctx, &__now);
385   return;
386 }
387
388 int
389 initiate_jlog_streamer(const char *host, unsigned short port,
390                        noit_hash_table *sslconfig, noit_hash_table *config) {
391   jlog_streamer_ctx_t *ctx;
392
393   int8_t family;
394   int rv;
395   union {
396     struct in_addr addr4;
397     struct in6_addr addr6;
398   } a;
399
400   if(host[0] == '/') {
401     family = AF_UNIX;
402   }
403   else {
404     family = AF_INET;
405     rv = inet_pton(family, host, &a);
406     if(rv != 1) {
407       family = AF_INET6;
408       rv = inet_pton(family, host, &a);
409       if(rv != 1) {
410         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
411         return -1;
412       }
413     }
414   }
415
416   ctx = jlog_streamer_ctx_alloc();
417  
418   memset(&ctx->r, 0, sizeof(ctx->r));
419   if(family == AF_UNIX) {
420     struct sockaddr_un *s = &ctx->r.remote_un;
421     s->sun_family = AF_UNIX;
422     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
423     ctx->remote_len = sizeof(*s);
424   }
425   else if(family == AF_INET) {
426     struct sockaddr_in *s = &ctx->r.remote_in;
427     s->sin_family = family;
428     s->sin_port = htons(port);
429     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
430     ctx->remote_len = sizeof(*s);
431   }
432   else {
433     struct sockaddr_in6 *s = &ctx->r.remote_in6;
434     s->sin6_family = family;
435     s->sin6_port = htons(port);
436     memcpy(&s->sin6_addr, &a, sizeof(a));
437     ctx->remote_len = sizeof(*s);
438   }
439
440   if(ctx->sslconfig)
441     noit_hash_delete_all(ctx->sslconfig, free, free);
442   else
443     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
444   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
445   if(ctx->config)
446     noit_hash_delete_all(ctx->config, free, free);
447   else
448     ctx->config = calloc(1, sizeof(noit_hash_table));
449   noit_hash_merge_as_dict(ctx->config, config);
450
451   jlog_streamer_initiate_connection(ctx);
452   return 0;
453 }
454
455 void
456 stratcon_jlog_streamer_reload(const char *toplevel) {
457   int i, cnt = 0;
458   noit_conf_section_t *noit_configs;
459   char path[256];
460
461   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
462   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
463   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
464   for(i=0; i<cnt; i++) {
465     char address[256];
466     unsigned short port;
467     int portint;
468     noit_hash_table *sslconfig, *config;
469
470     if(!noit_conf_get_stringbuf(noit_configs[i],
471                                 "ancestor-or-self::node()/@address",
472                                 address, sizeof(address))) {
473       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
474       continue;
475     }
476     if(!noit_conf_get_int(noit_configs[i],
477                           "ancestor-or-self::node()/@port", &portint))
478       portint = 0;
479     port = (unsigned short) portint;
480     if(address[0] != '/' && (portint == 0 || (port != portint))) {
481       /* UNIX sockets don't require a port (they'll ignore it if specified */
482       noitL(noit_stderr,
483             "Invalid port [%d] specified in stanza %d\n", port, i+1);
484       continue;
485     }
486     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
487     config = noit_conf_get_hash(noit_configs[i], "config");
488
489     initiate_jlog_streamer(address, port, sslconfig, config);
490   }
491 }
492
493 void
494 stratcon_jlog_streamer_init(const char *toplevel) {
495   eventer_name_callback("jlog_streamer_reinitiate",
496                         jlog_streamer_reinitiate);
497   eventer_name_callback("stratcon_jlog_recv_handler",
498                         stratcon_jlog_recv_handler);
499   eventer_name_callback("jlog_streamer_ssl_upgrade",
500                         jlog_streamer_ssl_upgrade);
501   eventer_name_callback("jlog_streamer_complete_connect",
502                         jlog_streamer_complete_connect);
503   stratcon_jlog_streamer_reload(toplevel);
504 }
Note: See TracBrowser for help on using the browser.