root/src/stratcon_jlog_streamer.c

Revision e7ae97b8a3932bf5cb12dbfe71a21caa328d49af, 16.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #119, scaffolding is there. I don't like how it is a slave to the postgres stuff. Perhaps separate streams would be good.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "stratcon_datastore.h"
14 #include "stratcon_jlog_streamer.h"
15
16 #include <unistd.h>
17 #include <assert.h>
18 #include <errno.h>
19 #include <sys/types.h>
20 #include <sys/socket.h>
21 #ifdef HAVE_SYS_FILIO_H
22 #include <sys/filio.h>
23 #endif
24 #include <netinet/in.h>
25 #include <sys/un.h>
26 #include <arpa/inet.h>
27
28 noit_hash_table noits = NOIT_HASH_EMPTY;
29
30 typedef struct jlog_streamer_ctx_t {
31   int bytes_expected;
32   int bytes_read;
33   char *buffer;         /* These guys are for doing partial reads */
34
35   enum {
36     WANT_INITIATE = 0,
37     WANT_COUNT = 1,
38     WANT_HEADER = 2,
39     WANT_BODY = 3,
40     WANT_CHKPT = 4,
41   } state;
42   int count;            /* Number of jlog messages we need to read */
43   struct {
44     jlog_id   chkpt;
45     u_int32_t tv_sec;
46     u_int32_t tv_usec;
47     u_int32_t message_len;
48   } header;
49 } jlog_streamer_ctx_t;
50
51 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
52
53 jlog_streamer_ctx_t *
54 jlog_streamer_ctx_alloc(void) {
55   jlog_streamer_ctx_t *ctx;
56   ctx = calloc(1, sizeof(*ctx));
57   return ctx;
58 }
59 noit_connection_ctx_t *
60 noit_connection_ctx_alloc(void) {
61   noit_connection_ctx_t *ctx;
62   ctx = calloc(1, sizeof(*ctx));
63   return ctx;
64 }
65 int
66 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
67                          struct timeval *now) {
68   noit_connection_ctx_t *ctx = closure;
69   ctx->timeout_event = NULL;
70   noit_connection_initiate_connection(closure);
71   return 0;
72 }
73 void
74 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
75                                    struct timeval *now) {
76   struct timeval __now, interval;
77   const char *v;
78   u_int32_t min_interval = 1000, max_interval = 8000;
79   if(noit_hash_retr_str(ctx->config,
80                         "reconnect_initial_interval",
81                         strlen("reconnect_initial_interval"),
82                         &v)) {
83     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
84   }
85   if(noit_hash_retr_str(ctx->config,
86                         "reconnect_maximum_interval",
87                         strlen("reconnect_maximum_interval"),
88                         &v)) {
89     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
90   }
91   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
92   else {
93     ctx->current_backoff *= 2;
94     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
95     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
96   }
97   if(!now) {
98     gettimeofday(&__now, NULL);
99     now = &__now;
100   }
101   interval.tv_sec = ctx->current_backoff / 1000;
102   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
103   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
104         ctx->current_backoff);
105   if(ctx->timeout_event)
106     eventer_remove(ctx->timeout_event);
107   else
108     ctx->timeout_event = eventer_alloc();
109   ctx->timeout_event->callback = noit_connection_reinitiate;
110   ctx->timeout_event->closure = ctx;
111   ctx->timeout_event->mask = EVENTER_TIMER;
112   add_timeval(*now, interval, &ctx->timeout_event->whence);
113   eventer_add(ctx->timeout_event);
114 }
115 void
116 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
117   if(ctx->remote_cn) free(ctx->remote_cn);
118   if(ctx->timeout_event) {
119     eventer_remove(ctx->timeout_event);
120     eventer_free(ctx->timeout_event);
121   }
122   ctx->consumer_free(ctx->consumer_ctx);
123   free(ctx);
124 }
125 void
126 jlog_streamer_ctx_free(void *cl) {
127   jlog_streamer_ctx_t *ctx = cl;
128   if(ctx->buffer) free(ctx->buffer);
129   free(ctx);
130 }
131
132 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
133 static int
134 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
135   int len, mask;
136   while(ctx->bytes_read < ctx->bytes_expected) {
137     len = Eread(ctx->buffer + ctx->bytes_read,
138                 ctx->bytes_expected - ctx->bytes_read);
139     if(len < 0) {
140       *newmask = mask;
141       return -1;
142     }
143     /* if we get 0 inside SSL, and there was a real error, we
144      * will actually get a -1 here.
145      * if(len == 0) return ctx->bytes_read;
146      */
147     ctx->bytes_read += len;
148   }
149   assert(ctx->bytes_read == ctx->bytes_expected);
150   return ctx->bytes_read;
151 }
152 #define FULLREAD(e,ctx,size) do { \
153   int mask, len; \
154   if(!ctx->bytes_expected) { \
155     ctx->bytes_expected = size; \
156     if(ctx->buffer) free(ctx->buffer); \
157     ctx->buffer = malloc(size + 1); \
158     if(ctx->buffer == NULL) { \
159       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
160       goto socket_error; \
161     } \
162     ctx->buffer[size] = '\0'; \
163   } \
164   len = __read_on_ctx(e, ctx, &mask); \
165   if(len < 0) { \
166     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
167     noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \
168     goto socket_error; \
169   } \
170   ctx->bytes_read = 0; \
171   ctx->bytes_expected = 0; \
172   if(len != size) { \
173     noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
174           ctx->state, len, (long unsigned int)size); \
175     goto socket_error; \
176   } \
177 } while(0)
178
179 int
180 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
181                            struct timeval *now) {
182   static u_int32_t jlog_feed_cmd = 0;
183   noit_connection_ctx_t *nctx = closure;
184   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
185   int len;
186   jlog_id n_chkpt;
187
188   if(!jlog_feed_cmd) jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
189
190   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
191     if(write(e->fd, e, 0) == -1)
192       noitL(noit_error, "socket error: %s\n", strerror(errno));
193  socket_error:
194     ctx->state = WANT_INITIATE;
195     ctx->count = 0;
196     ctx->bytes_read = 0;
197     ctx->bytes_expected = 0;
198     if(ctx->buffer) free(ctx->buffer);
199     ctx->buffer = NULL;
200     noit_connection_schedule_reattempt(nctx, now);
201     eventer_remove_fd(e->fd);
202     e->opset->close(e->fd, &mask, e);
203     return 0;
204   }
205
206   while(1) {
207     switch(ctx->state) {
208       case WANT_INITIATE:
209         len = e->opset->write(e->fd, &jlog_feed_cmd, sizeof(&jlog_feed_cmd),
210                               &mask, e);
211         if(len < 0) {
212           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
213           goto socket_error;
214         }
215         if(len != sizeof(jlog_feed_cmd)) {
216           noitL(noit_error, "short write on initiating stream.\n");
217           goto socket_error;
218         }
219         ctx->state = WANT_COUNT;
220         break;
221
222       case WANT_COUNT:
223         FULLREAD(e, ctx, sizeof(u_int32_t));
224         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
225         ctx->count = ntohl(ctx->count);
226         free(ctx->buffer); ctx->buffer = NULL;
227         ctx->state = WANT_HEADER;
228         break;
229
230       case WANT_HEADER:
231         if(ctx->count == 0) {
232           ctx->state = WANT_COUNT;
233           break;
234         }
235         FULLREAD(e, ctx, sizeof(ctx->header));
236         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
237         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
238         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
239         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
240         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
241         ctx->header.message_len = ntohl(ctx->header.message_len);
242         free(ctx->buffer); ctx->buffer = NULL;
243         ctx->state = WANT_BODY;
244         break;
245
246       case WANT_BODY:
247         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
248         stratcon_datastore_push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);
249         /* Don't free the buffer, it's used by the datastore process. */
250         ctx->buffer = NULL;
251         ctx->count--;
252         if(ctx->count == 0) {
253           eventer_t completion_e;
254           eventer_remove_fd(e->fd);
255           completion_e = eventer_alloc();
256           memcpy(completion_e, e, sizeof(*e));
257           completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION;
258           ctx->state = WANT_CHKPT;
259           stratcon_datastore_push(DS_OP_CHKPT, &nctx->r.remote, completion_e);
260           noitL(noit_debug, "Pushing batch asynch...\n");
261           return 0;
262         } else
263           ctx->state = WANT_HEADER;
264         break;
265
266       case WANT_CHKPT:
267         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
268               ctx->header.chkpt.log, ctx->header.chkpt.marker);
269         n_chkpt.log = htonl(ctx->header.chkpt.log);
270         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
271
272         /* screw short writes.  I'd rather die than not write my data! */
273         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
274                               &mask, e);
275         if(len < 0) {
276           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
277           goto socket_error;
278         }
279         if(len != sizeof(jlog_id)) {
280           noitL(noit_error, "short write on checkpointing stream.\n");
281           goto socket_error;
282         }
283         ctx->state = WANT_COUNT;
284         break;
285     }
286   }
287   /* never get here */
288 }
289
290 int
291 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
292                             struct timeval *now) {
293   noit_connection_ctx_t *nctx = closure;
294   int rv;
295
296   rv = eventer_SSL_connect(e, &mask);
297   if(rv > 0) {
298     eventer_ssl_ctx_t *sslctx;
299     e->callback = nctx->consumer_callback;
300     /* We must make a copy of the acceptor_closure_t for each new
301      * connection.
302      */
303     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
304       char *cn, *end;
305       cn = eventer_ssl_get_peer_subject(sslctx);
306       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
307         cn += 3;
308         end = cn;
309         while(*end && *end != '/') end++;
310         nctx->remote_cn = malloc(end - cn + 1);
311         memcpy(nctx->remote_cn, cn, end - cn);
312         nctx->remote_cn[end-cn] = '\0';
313       }
314     }
315     return e->callback(e, mask, e->closure, now);
316   }
317   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
318
319   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
320   eventer_remove_fd(e->fd);
321   e->opset->close(e->fd, &mask, e);
322   noit_connection_schedule_reattempt(nctx, now);
323   return 0;
324 }
325 int
326 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
327                                  struct timeval *now) {
328   noit_connection_ctx_t *nctx = closure;
329   const char *cert, *key, *ca, *ciphers;
330   eventer_ssl_ctx_t *sslctx;
331
332   if(mask & EVENTER_EXCEPTION) {
333     if(write(e->fd, e, 0) == -1)
334       noitL(noit_error, "socket error: %s\n", strerror(errno));
335  connect_error:
336     eventer_remove_fd(e->fd);
337     e->opset->close(e->fd, &mask, e);
338     noit_connection_schedule_reattempt(nctx, now);
339     return 0;
340   }
341
342 #define SSLCONFGET(var,name) do { \
343   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
344                          &var)) var = NULL; } while(0)
345   SSLCONFGET(cert, "certificate_file");
346   SSLCONFGET(key, "key_file");
347   SSLCONFGET(ca, "ca_chain");
348   SSLCONFGET(ciphers, "ciphers");
349   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
350   if(!sslctx) goto connect_error;
351
352   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
353                              nctx->sslconfig);
354   EVENTER_ATTACH_SSL(e, sslctx);
355   e->callback = noit_connection_ssl_upgrade;
356   return e->callback(e, mask, closure, now);
357 }
358 static void
359 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
360   struct timeval __now;
361   eventer_t e;
362   int rv, fd = -1;
363   long on;
364
365   /* Open a socket */
366   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
367   if(fd < 0) goto reschedule;
368
369   /* Make it non-blocking */
370   on = 1;
371   if(ioctl(fd, FIONBIO, &on)) goto reschedule;
372
373   /* Initiate a connection */
374   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
375   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
376
377   /* Register a handler for connection completion */
378   e = eventer_alloc();
379   e->fd = fd;
380   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
381   e->callback = noit_connection_complete_connect;
382   e->closure = nctx;
383   eventer_add(e);
384   return;
385
386  reschedule:
387   if(fd >= 0) close(fd);
388   gettimeofday(&__now, NULL);
389   noit_connection_schedule_reattempt(nctx, &__now);
390   return;
391 }
392
393 int
394 initiate_noit_connection(const char *host, unsigned short port,
395                          noit_hash_table *sslconfig, noit_hash_table *config,
396                          eventer_func_t handler, void *closure,
397                          void (*freefunc)(void *)) {
398   noit_connection_ctx_t *ctx;
399
400   int8_t family;
401   int rv;
402   union {
403     struct in_addr addr4;
404     struct in6_addr addr6;
405   } a;
406
407   if(host[0] == '/') {
408     family = AF_UNIX;
409   }
410   else {
411     family = AF_INET;
412     rv = inet_pton(family, host, &a);
413     if(rv != 1) {
414       family = AF_INET6;
415       rv = inet_pton(family, host, &a);
416       if(rv != 1) {
417         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
418         return -1;
419       }
420     }
421   }
422
423   ctx = noit_connection_ctx_alloc();
424  
425   memset(&ctx->r, 0, sizeof(ctx->r));
426   if(family == AF_UNIX) {
427     struct sockaddr_un *s = &ctx->r.remote_un;
428     s->sun_family = AF_UNIX;
429     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
430     ctx->remote_len = sizeof(*s);
431   }
432   else if(family == AF_INET) {
433     struct sockaddr_in *s = &ctx->r.remote_in;
434     s->sin_family = family;
435     s->sin_port = htons(port);
436     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
437     ctx->remote_len = sizeof(*s);
438   }
439   else {
440     struct sockaddr_in6 *s = &ctx->r.remote_in6;
441     s->sin6_family = family;
442     s->sin6_port = htons(port);
443     memcpy(&s->sin6_addr, &a, sizeof(a));
444     ctx->remote_len = sizeof(*s);
445   }
446
447   if(ctx->sslconfig)
448     noit_hash_delete_all(ctx->sslconfig, free, free);
449   else
450     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
451   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
452   if(ctx->config)
453     noit_hash_delete_all(ctx->config, free, free);
454   else
455     ctx->config = calloc(1, sizeof(noit_hash_table));
456   noit_hash_merge_as_dict(ctx->config, config);
457
458   ctx->consumer_callback = handler;
459   ctx->consumer_free = freefunc;
460   ctx->consumer_ctx = closure;
461   noit_connection_initiate_connection(ctx);
462   return 0;
463 }
464
465 void
466 stratcon_streamer_connection(const char *toplevel, const char *destination,
467                              eventer_func_t handler,
468                              void *(*handler_alloc)(void), void *handler_ctx,
469                              void (*handler_free)(void *)) {
470   int i, cnt = 0;
471   noit_conf_section_t *noit_configs;
472   char path[256];
473
474   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
475   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
476   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
477   for(i=0; i<cnt; i++) {
478     char address[256];
479     unsigned short port;
480     int portint;
481     noit_hash_table *sslconfig, *config;
482
483     if(!noit_conf_get_stringbuf(noit_configs[i],
484                                 "ancestor-or-self::node()/@address",
485                                 address, sizeof(address))) {
486       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
487       continue;
488     }
489     /* if destination is specified, exact match it */
490     if(destination && strcmp(address, destination)) continue;
491
492     if(!noit_conf_get_int(noit_configs[i],
493                           "ancestor-or-self::node()/@port", &portint))
494       portint = 0;
495     port = (unsigned short) portint;
496     if(address[0] != '/' && (portint == 0 || (port != portint))) {
497       /* UNIX sockets don't require a port (they'll ignore it if specified */
498       noitL(noit_stderr,
499             "Invalid port [%d] specified in stanza %d\n", port, i+1);
500       continue;
501     }
502     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
503     config = noit_conf_get_hash(noit_configs[i], "config");
504
505     initiate_noit_connection(address, port, sslconfig, config,
506                              handler,
507                              handler_alloc ? handler_alloc() : handler_ctx,
508                              handler_free);
509   }
510 }
511 void
512 stratcon_jlog_streamer_reload(const char *toplevel) {
513   stratcon_streamer_connection(toplevel, NULL,
514                                stratcon_jlog_recv_handler,
515                                (void *(*)())jlog_streamer_ctx_alloc, NULL,
516                                jlog_streamer_ctx_free);
517 }
518
519 void
520 stratcon_jlog_streamer_init(const char *toplevel) {
521   eventer_name_callback("noit_connection_reinitiate",
522                         noit_connection_reinitiate);
523   eventer_name_callback("stratcon_jlog_recv_handler",
524                         stratcon_jlog_recv_handler);
525   eventer_name_callback("noit_connection_ssl_upgrade",
526                         noit_connection_ssl_upgrade);
527   eventer_name_callback("noit_connection_complete_connect",
528                         noit_connection_complete_connect);
529   stratcon_jlog_streamer_reload(toplevel);
530 }
Note: See TracBrowser for help on using the browser.