root/src/stratcon_jlog_streamer.c

Revision a50432366e0c6614924558d1aa82eaa82b967a0b, 14.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 7 years ago)

compiles on linux -- still no eventer, refs #12

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "stratcon_datastore.h"
14
15 #include <unistd.h>
16 #include <assert.h>
17 #include <errno.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <netinet/in.h>
21 #include <sys/un.h>
22 #include <arpa/inet.h>
23
24 noit_hash_table noits = NOIT_HASH_EMPTY;
25
26 typedef struct jlog_streamer_ctx_t {
27   union {
28     struct sockaddr remote;
29     struct sockaddr_un remote_un;
30     struct sockaddr_in remote_in;
31     struct sockaddr_in6 remote_in6;
32   } r;
33   socklen_t remote_len;
34   char *remote_cn;
35   u_int32_t current_backoff;
36   int wants_shutdown;
37   noit_hash_table *config;
38   noit_hash_table *sslconfig;
39   int bytes_expected;
40   int bytes_read;
41   char *buffer;         /* These guys are for doing partial reads */
42
43   enum {
44     WANT_COUNT = 0,
45     WANT_HEADER = 1,
46     WANT_BODY = 2,
47     WANT_CHKPT = 3,
48   } state;
49   int count;            /* Number of jlog messages we need to read */
50   struct {
51     jlog_id   chkpt;
52     u_int32_t tv_sec;
53     u_int32_t tv_usec;
54     u_int32_t message_len;
55   } header;
56
57   eventer_t timeout_event;
58 } jlog_streamer_ctx_t;
59
60 static void jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx);
61
62 jlog_streamer_ctx_t *
63 jlog_streamer_ctx_alloc(void) {
64   jlog_streamer_ctx_t *ctx;
65   ctx = calloc(1, sizeof(*ctx));
66   return ctx;
67 }
68 int
69 jlog_streamer_reinitiate(eventer_t e, int mask, void *closure,
70                          struct timeval *now) {
71   jlog_streamer_ctx_t *ctx = closure;
72   ctx->timeout_event = NULL;
73   jlog_streamer_initiate_connection(closure);
74   return 0;
75 }
76 void
77 jlog_streamer_schedule_reattempt(jlog_streamer_ctx_t *ctx,
78                                  struct timeval *now) {
79   struct timeval __now, interval;
80   const char *v;
81   u_int32_t min_interval = 1000, max_interval = 60000;
82   if(noit_hash_retrieve(ctx->config,
83                         "reconnect_initial_interval",
84                         strlen("reconnect_initial_interval"),
85                         (void **)&v)) {
86     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
87   }
88   if(noit_hash_retrieve(ctx->config,
89                         "reconnect_maximum_interval",
90                         strlen("reconnect_maximum_interval"),
91                         (void **)&v)) {
92     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
93   }
94   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
95   else {
96     ctx->current_backoff *= 2;
97     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
98     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
99   }
100   if(!now) {
101     gettimeofday(&__now, NULL);
102     now = &__now;
103   }
104   interval.tv_sec = ctx->current_backoff / 1000;
105   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
106   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
107         ctx->current_backoff);
108   if(ctx->timeout_event)
109     eventer_remove(ctx->timeout_event);
110   else
111     ctx->timeout_event = eventer_alloc();
112   ctx->timeout_event->callback = jlog_streamer_reinitiate;
113   ctx->timeout_event->closure = ctx;
114   ctx->timeout_event->mask = EVENTER_TIMER;
115   add_timeval(*now, interval, &ctx->timeout_event->whence);
116   eventer_add(ctx->timeout_event);
117 }
118 void
119 jlog_streamer_ctx_free(jlog_streamer_ctx_t *ctx) {
120   if(ctx->buffer) free(ctx->buffer);
121   if(ctx->remote_cn) free(ctx->remote_cn);
122   if(ctx->timeout_event) {
123     eventer_remove(ctx->timeout_event);
124     eventer_free(ctx->timeout_event);
125   }
126   free(ctx);
127 }
128
129 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
130 static int
131 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
132   int len, mask;
133   while(ctx->bytes_read < ctx->bytes_expected) {
134     len = Eread(ctx->buffer + ctx->bytes_read,
135                 ctx->bytes_expected - ctx->bytes_read);
136     if(len < 0) {
137       *newmask = mask;
138       return -1;
139     }
140     if(len == 0) return ctx->bytes_read;
141     ctx->bytes_read += len;
142   }
143   assert(ctx->bytes_read == ctx->bytes_expected);
144   return ctx->bytes_read;
145 }
146 #define FULLREAD(e,ctx,size) do { \
147   int mask, len; \
148   if(!ctx->bytes_expected) { \
149     ctx->bytes_expected = size; \
150     if(ctx->buffer) free(ctx->buffer); \
151     ctx->buffer = malloc(size + 1); \
152     if(ctx->buffer == NULL) { \
153       noitL(noit_error, "malloc(%lu) failed.\n", size + 1); \
154       goto socket_error; \
155     } \
156     ctx->buffer[size] = '\0'; \
157   } \
158   len = __read_on_ctx(e, ctx, &mask); \
159   if(len < 0) { \
160     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
161     goto socket_error; \
162   } \
163   ctx->bytes_read = 0; \
164   ctx->bytes_expected = 0; \
165   if(len != size) { \
166     noitL(noit_error, "SSL short read (%d/%lu).  Reseting connection.\n", \
167           len, size); \
168     goto socket_error; \
169   } \
170 } while(0)
171
172 int
173 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
174                            struct timeval *now) {
175   jlog_streamer_ctx_t *ctx = closure;
176   int len;
177   jlog_id n_chkpt;
178
179   if(mask & EVENTER_EXCEPTION || ctx->wants_shutdown) {
180  socket_error:
181     jlog_streamer_schedule_reattempt(ctx, now);
182     eventer_remove_fd(e->fd);
183     e->opset->close(e->fd, &mask, e);
184     return 0;
185   }
186
187   while(1) {
188     switch(ctx->state) {
189       case WANT_COUNT:
190         FULLREAD(e, ctx, sizeof(u_int32_t));
191         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
192         ctx->count = ntohl(ctx->count);
193         free(ctx->buffer); ctx->buffer = NULL;
194         ctx->state = WANT_HEADER;
195         break;
196
197       case WANT_HEADER:
198         if(ctx->count == 0) {
199           ctx->state = WANT_COUNT;
200           break;
201         }
202         FULLREAD(e, ctx, sizeof(ctx->header));
203         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
204         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
205         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
206         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
207         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
208         ctx->header.message_len = ntohl(ctx->header.message_len);
209         free(ctx->buffer); ctx->buffer = NULL;
210         ctx->state = WANT_BODY;
211         break;
212
213       case WANT_BODY:
214         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
215         stratcon_datastore_push(DS_OP_INSERT, &ctx->r.remote, ctx->buffer);
216         /* Don't free the buffer, it's used by the datastore process. */
217         ctx->buffer = NULL;
218         ctx->count--;
219         if(ctx->count == 0) {
220           eventer_t completion_e;
221           eventer_remove_fd(e->fd);
222           completion_e = eventer_alloc();
223           memcpy(completion_e, e, sizeof(*e));
224           completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION;
225           ctx->state = WANT_CHKPT;
226           stratcon_datastore_push(DS_OP_CHKPT, &ctx->r.remote, completion_e);
227           noitL(noit_debug, "Pushing batch asynch...\n");
228           return 0;
229         } else
230           ctx->state = WANT_HEADER;
231         break;
232
233       case WANT_CHKPT:
234         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
235               ctx->header.chkpt.log, ctx->header.chkpt.marker);
236         n_chkpt.log = htonl(ctx->header.chkpt.log);
237         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
238
239         /* screw short writes.  I'd rather die than not write my data! */
240         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
241                               &mask, e);
242         if(len < 0) {
243           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
244           goto socket_error;
245         }
246         if(len != sizeof(jlog_id)) {
247           noitL(noit_error, "short write on checkpointing stream.\n");
248           goto socket_error;
249         }
250         ctx->state = WANT_COUNT;
251         break;
252     }
253   }
254   /* never get here */
255 }
256
257 int
258 jlog_streamer_ssl_upgrade(eventer_t e, int mask, void *closure,
259                           struct timeval *now) {
260   jlog_streamer_ctx_t *ctx = closure;
261   int rv;
262
263   rv = eventer_SSL_connect(e, &mask);
264   if(rv > 0) {
265     eventer_ssl_ctx_t *sslctx;
266     e->callback = stratcon_jlog_recv_handler;
267     /* We must make a copy of the acceptor_closure_t for each new
268      * connection.
269      */
270     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
271       char *cn, *end;
272       cn = eventer_ssl_get_peer_subject(sslctx);
273       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
274         cn += 3;
275         end = cn;
276         while(*end && *end != '/') end++;
277         ctx->remote_cn = malloc(end - cn + 1);
278         memcpy(ctx->remote_cn, cn, end - cn);
279         ctx->remote_cn[end-cn] = '\0';
280       }
281     }
282     return e->callback(e, mask, e->closure, now);
283   }
284   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
285
286   eventer_remove_fd(e->fd);
287   e->opset->close(e->fd, &mask, e);
288   jlog_streamer_schedule_reattempt(ctx, now);
289   return 0;
290 }
291 int
292 jlog_streamer_complete_connect(eventer_t e, int mask, void *closure,
293                                struct timeval *now) {
294   jlog_streamer_ctx_t *ctx = closure;
295   char *cert, *key, *ca, *ciphers;
296   eventer_ssl_ctx_t *sslctx;
297
298   if(mask & EVENTER_EXCEPTION) {
299  connect_error:
300     eventer_remove_fd(e->fd);
301     e->opset->close(e->fd, &mask, e);
302     jlog_streamer_schedule_reattempt(ctx, now);
303     return 0;
304   }
305
306 #define SSLCONFGET(var,name) do { \
307   if(!noit_hash_retrieve(ctx->sslconfig, name, strlen(name), \
308                          (void **)&var)) var = NULL; } while(0)
309   SSLCONFGET(cert, "certificate_file");
310   SSLCONFGET(key, "key_file");
311   SSLCONFGET(ca, "ca_chain");
312   SSLCONFGET(ciphers, "ciphers");
313   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
314   if(!sslctx) goto connect_error;
315
316   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
317                              ctx->sslconfig);
318   EVENTER_ATTACH_SSL(e, sslctx);
319   e->callback = jlog_streamer_ssl_upgrade;
320   return e->callback(e, mask, closure, now);
321 }
322 static void
323 jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx) {
324   struct timeval __now;
325   eventer_t e;
326   int rv, fd = -1;
327   long on;
328
329   /* Open a socket */
330   fd = socket(ctx->r.remote.sa_family, SOCK_STREAM, 0);
331   if(fd < 0) goto reschedule;
332
333   /* Make it non-blocking */
334   on = 1;
335   if(ioctl(fd, FIONBIO, &on)) goto reschedule;
336
337   /* Initiate a connection */
338   rv = connect(fd, &ctx->r.remote, ctx->remote_len);
339   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
340
341   /* Register a handler for connection completion */
342   e = eventer_alloc();
343   e->fd = fd;
344   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
345   e->callback = jlog_streamer_complete_connect;
346   e->closure = ctx;
347   eventer_add(e);
348   return;
349
350  reschedule:
351   if(fd >= 0) close(fd);
352   gettimeofday(&__now, NULL);
353   jlog_streamer_schedule_reattempt(ctx, &__now);
354   return;
355 }
356
357 int
358 initiate_jlog_streamer(const char *host, unsigned short port,
359                        noit_hash_table *sslconfig, noit_hash_table *config) {
360   jlog_streamer_ctx_t *ctx;
361
362   int8_t family;
363   int rv;
364   union {
365     struct in_addr addr4;
366     struct in6_addr addr6;
367   } a;
368
369   if(host[0] == '/') {
370     family = AF_UNIX;
371   }
372   else {
373     family = AF_INET;
374     rv = inet_pton(family, host, &a);
375     if(rv != 1) {
376       family = AF_INET6;
377       rv = inet_pton(family, host, &a);
378       if(rv != 1) {
379         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
380         return -1;
381       }
382     }
383   }
384
385   ctx = jlog_streamer_ctx_alloc();
386  
387   memset(&ctx->r, 0, sizeof(ctx->r));
388   if(family == AF_UNIX) {
389     struct sockaddr_un *s = &ctx->r.remote_un;
390     s->sun_family = AF_UNIX;
391     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
392     ctx->remote_len = sizeof(*s);
393   }
394   else if(family == AF_INET) {
395     struct sockaddr_in *s = &ctx->r.remote_in;
396     s->sin_family = family;
397     s->sin_port = htons(port);
398     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
399     ctx->remote_len = sizeof(*s);
400   }
401   else {
402     struct sockaddr_in6 *s = &ctx->r.remote_in6;
403     s->sin6_family = family;
404     s->sin6_port = htons(port);
405     memcpy(&s->sin6_addr, &a, sizeof(a));
406     ctx->remote_len = sizeof(*s);
407   }
408
409   if(ctx->sslconfig)
410     noit_hash_delete_all(ctx->sslconfig, free, free);
411   else
412     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
413   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
414   if(ctx->config)
415     noit_hash_delete_all(ctx->config, free, free);
416   else
417     ctx->config = calloc(1, sizeof(noit_hash_table));
418   noit_hash_merge_as_dict(ctx->config, config);
419
420   jlog_streamer_initiate_connection(ctx);
421   return 0;
422 }
423
424 void
425 stratcon_jlog_streamer_reload(const char *toplevel) {
426   int i, cnt = 0;
427   noit_conf_section_t *noit_configs;
428   char path[256];
429
430   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
431   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
432   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
433   for(i=0; i<cnt; i++) {
434     char address[256];
435     unsigned short port;
436     int portint;
437     noit_hash_table *sslconfig, *config;
438
439     if(!noit_conf_get_stringbuf(noit_configs[i],
440                                 "ancestor-or-self::node()/@address",
441                                 address, sizeof(address))) {
442       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
443       continue;
444     }
445     if(!noit_conf_get_int(noit_configs[i],
446                           "ancestor-or-self::node()/@port", &portint))
447       portint = 0;
448     port = (unsigned short) portint;
449     if(address[0] != '/' && (portint == 0 || (port != portint))) {
450       /* UNIX sockets don't require a port (they'll ignore it if specified */
451       noitL(noit_stderr,
452             "Invalid port [%d] specified in stanza %d\n", port, i+1);
453       continue;
454     }
455     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
456     config = noit_conf_get_hash(noit_configs[i], "config");
457
458     initiate_jlog_streamer(address, port, sslconfig, config);
459   }
460 }
461
462 void
463 stratcon_jlog_streamer_init(const char *toplevel) {
464   eventer_name_callback("jlog_streamer_reinitiate",
465                         jlog_streamer_reinitiate);
466   eventer_name_callback("stratcon_jlog_recv_handler",
467                         stratcon_jlog_recv_handler);
468   eventer_name_callback("jlog_streamer_ssl_upgrade",
469                         jlog_streamer_ssl_upgrade);
470   eventer_name_callback("jlog_streamer_complete_connect",
471                         jlog_streamer_complete_connect);
472   stratcon_jlog_streamer_reload(toplevel);
473 }
Note: See TracBrowser for help on using the browser.