root/src/stratcon_jlog_streamer.c

Revision a7304b5c52e33dfefad9852596a92dc99ff37669, 13.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

jlog pulling

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_conf.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13 #include "stratcon_datastore.h"
14
15 #include <unistd.h>
16 #include <assert.h>
17 #include <errno.h>
18 #include <sys/types.h>
19 #include <sys/socket.h>
20 #include <netinet/in.h>
21 #include <sys/un.h>
22 #include <arpa/inet.h>
23
24 noit_hash_table noits = NOIT_HASH_EMPTY;
25
26 typedef struct jlog_streamer_ctx_t {
27   union {
28     struct sockaddr remote;
29     struct sockaddr_un remote_un;
30     struct sockaddr_in remote_in;
31     struct sockaddr_in6 remote_in6;
32   } r;
33   char *remote_cn;
34   u_int32_t current_backoff;
35   int wants_shutdown;
36   noit_hash_table *config;
37   noit_hash_table *sslconfig;
38   int bytes_expected;
39   int bytes_read;
40   char *buffer;         /* These guys are for doing partial reads */
41
42   enum {
43     WANT_COUNT = 0,
44     WANT_HEADER = 1,
45     WANT_BODY = 2,
46     WANT_CHKPT = 3,
47   } state;
48   int count;            /* Number of jlog messages we need to read */
49   struct {
50     jlog_id   chkpt;
51     u_int32_t tv_sec;
52     u_int32_t tv_usec;
53     u_int32_t message_len;
54   } header;
55
56   eventer_t timeout_event;
57 } jlog_streamer_ctx_t;
58
59 static void jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx);
60
61 jlog_streamer_ctx_t *
62 jlog_streamer_ctx_alloc(void) {
63   jlog_streamer_ctx_t *ctx;
64   ctx = calloc(1, sizeof(*ctx));
65   return ctx;
66 }
67 int
68 jlog_streamer_reinitiate(eventer_t e, int mask, void *closure,
69                          struct timeval *now) {
70   jlog_streamer_ctx_t *ctx = closure;
71   ctx->timeout_event = NULL;
72   jlog_streamer_initiate_connection(closure);
73   return 0;
74 }
75 void
76 jlog_streamer_schedule_reattempt(jlog_streamer_ctx_t *ctx,
77                                  struct timeval *now) {
78   struct timeval __now, interval;
79   const char *v;
80   u_int32_t min_interval = 1000, max_interval = 60000;
81   if(noit_hash_retrieve(ctx->config,
82                         "reconnect_initial_interval",
83                         strlen("reconnect_initial_interval"),
84                         (void **)&v)) {
85     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
86   }
87   if(noit_hash_retrieve(ctx->config,
88                         "reconnect_maximum_interval",
89                         strlen("reconnect_maximum_interval"),
90                         (void **)&v)) {
91     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
92   }
93   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
94   else {
95     ctx->current_backoff *= 2;
96     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
97     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
98   }
99   if(!now) {
100     gettimeofday(&__now, NULL);
101     now = &__now;
102   }
103   interval.tv_sec = ctx->current_backoff / 1000;
104   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
105   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
106         ctx->current_backoff);
107   if(ctx->timeout_event)
108     eventer_remove(ctx->timeout_event);
109   else
110     ctx->timeout_event = eventer_alloc();
111   ctx->timeout_event->callback = jlog_streamer_reinitiate;
112   ctx->timeout_event->closure = ctx;
113   ctx->timeout_event->mask = EVENTER_TIMER;
114   add_timeval(*now, interval, &ctx->timeout_event->whence);
115   eventer_add(ctx->timeout_event);
116 }
117 void
118 jlog_streamer_ctx_free(jlog_streamer_ctx_t *ctx) {
119   if(ctx->buffer) free(ctx->buffer);
120   if(ctx->remote_cn) free(ctx->remote_cn);
121   if(ctx->timeout_event) {
122     eventer_remove(ctx->timeout_event);
123     eventer_free(ctx->timeout_event);
124   }
125   free(ctx);
126 }
127
128 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
129 static int
130 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
131   int len, mask;
132   while(ctx->bytes_read < ctx->bytes_expected) {
133     len = Eread(ctx->buffer + ctx->bytes_read,
134                 ctx->bytes_expected - ctx->bytes_read);
135     if(len < 0) {
136       *newmask = mask;
137       return -1;
138     }
139     ctx->bytes_read += len;
140   }
141   assert(ctx->bytes_read == ctx->bytes_expected);
142   return ctx->bytes_read;
143 }
144 #define FULLREAD(e,ctx,size) do { \
145   int mask, len; \
146   if(!ctx->bytes_expected) { \
147     ctx->bytes_expected = size; \
148     if(ctx->buffer) free(ctx->buffer); \
149     ctx->buffer = malloc(size + 1); \
150     ctx->buffer[size] = '\0'; \
151   } \
152   len = __read_on_ctx(e, ctx, &mask); \
153   if(len < 0) { \
154     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
155     goto socket_error; \
156   } \
157   ctx->bytes_read = 0; \
158   ctx->bytes_expected = 0; \
159 } while(0)
160
161 int
162 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
163                            struct timeval *now) {
164   jlog_streamer_ctx_t *ctx = closure;
165   int len;
166   jlog_id n_chkpt;
167
168   if(mask & EVENTER_EXCEPTION || ctx->wants_shutdown) {
169  socket_error:
170     jlog_streamer_schedule_reattempt(ctx, now);
171     eventer_remove_fd(e->fd);
172     return 0;
173   }
174
175   while(1) {
176     switch(ctx->state) {
177       case WANT_COUNT:
178         FULLREAD(e, ctx, sizeof(u_int32_t));
179         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
180         ctx->count = ntohl(ctx->count);
181         free(ctx->buffer); ctx->buffer = NULL;
182         ctx->state = WANT_HEADER;
183         break;
184
185       case WANT_HEADER:
186         if(ctx->count == 0) {
187           ctx->state = WANT_COUNT;
188           break;
189         }
190         FULLREAD(e, ctx, sizeof(ctx->header));
191         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
192         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
193         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
194         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
195         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
196         ctx->header.message_len = ntohl(ctx->header.message_len);
197         free(ctx->buffer); ctx->buffer = NULL;
198         ctx->state = WANT_BODY;
199         break;
200
201       case WANT_BODY:
202         FULLREAD(e, ctx, ctx->header.message_len);
203         stratcon_datastore_push(DS_OP_INSERT, &ctx->r.remote, ctx->buffer);
204         /* Don't free the buffer, it's used by the datastore process. */
205         ctx->buffer = NULL;
206         ctx->count--;
207         if(ctx->count == 0) {
208           eventer_t completion_e;
209           eventer_remove_fd(e->fd);
210           completion_e = eventer_alloc();
211           memcpy(completion_e, e, sizeof(*e));
212           ctx->state = WANT_CHKPT;
213           stratcon_datastore_push(DS_OP_CHKPT, &ctx->r.remote, completion_e);
214           return 0;
215         } else
216           ctx->state = WANT_HEADER;
217         break;
218
219       case WANT_CHKPT:
220         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
221               ctx->header.chkpt.log, ctx->header.chkpt.marker);
222         n_chkpt.log = htonl(ctx->header.chkpt.log);
223         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
224
225         /* screw short writes.  I'd rather die than not write my data! */
226         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
227                               &mask, e);
228         if(len < 0) {
229           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
230           goto socket_error;
231         }
232         if(len != sizeof(jlog_id)) {
233           noitL(noit_error, "short write on checkpointing stream.\n");
234           goto socket_error;
235         }
236         ctx->state = WANT_COUNT;
237         break;
238     }
239   }
240   /* never get here */
241 }
242
243 int
244 jlog_streamer_ssl_upgrade(eventer_t e, int mask, void *closure,
245                           struct timeval *now) {
246   jlog_streamer_ctx_t *ctx = closure;
247   int rv;
248
249   rv = eventer_SSL_connect(e, &mask);
250   if(rv > 0) {
251     eventer_ssl_ctx_t *sslctx;
252     e->callback = stratcon_jlog_recv_handler;
253     /* We must make a copy of the acceptor_closure_t for each new
254      * connection.
255      */
256     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
257       char *cn, *end;
258       cn = eventer_ssl_get_peer_subject(sslctx);
259       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
260         cn += 3;
261         end = cn;
262         while(*end && *end != '/') end++;
263         ctx->remote_cn = malloc(end - cn + 1);
264         memcpy(ctx->remote_cn, cn, end - cn);
265         ctx->remote_cn[end-cn] = '\0';
266       }
267     }
268     return e->callback(e, mask, e->closure, now);
269   }
270   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
271
272   eventer_remove_fd(e->fd);
273   e->opset->close(e->fd, &mask, e);
274   jlog_streamer_schedule_reattempt(ctx, now);
275   return 0;
276 }
277 int
278 jlog_streamer_complete_connect(eventer_t e, int mask, void *closure,
279                                struct timeval *now) {
280   jlog_streamer_ctx_t *ctx = closure;
281   char *cert, *key, *ca, *ciphers;
282   eventer_ssl_ctx_t *sslctx;
283
284   if(mask & EVENTER_EXCEPTION) {
285  connect_error:
286     eventer_remove_fd(e->fd);
287     e->opset->close(e->fd, &mask, e);
288     jlog_streamer_schedule_reattempt(ctx, now);
289     return 0;
290   }
291
292 #define SSLCONFGET(var,name) do { \
293   if(!noit_hash_retrieve(ctx->sslconfig, name, strlen(name), \
294                          (void **)&var)) var = NULL; } while(0)
295   SSLCONFGET(cert, "certificate_file");
296   SSLCONFGET(key, "key_file");
297   SSLCONFGET(ca, "ca_chain");
298   SSLCONFGET(ciphers, "ciphers");
299   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
300   if(!sslctx) goto connect_error;
301
302   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
303                              ctx->sslconfig);
304   EVENTER_ATTACH_SSL(e, sslctx);
305   e->callback = jlog_streamer_ssl_upgrade;
306   return e->callback(e, mask, closure, now);
307 }
308 static void
309 jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx) {
310   struct timeval __now;
311   eventer_t e;
312   int rv, fd = -1;
313   long on;
314
315   /* Open a socket */
316   fd = socket(ctx->r.remote.sa_family, SOCK_STREAM, 0);
317   if(fd < 0) goto reschedule;
318
319   /* Make it non-blocking */
320   on = 1;
321   if(ioctl(fd, FIONBIO, &on)) goto reschedule;
322
323   /* Initiate a connection */
324   rv = connect(fd, &ctx->r.remote, ctx->r.remote.sa_len);
325   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
326
327   /* Register a handler for connection completion */
328   e = eventer_alloc();
329   e->fd = fd;
330   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
331   e->callback = jlog_streamer_complete_connect;
332   e->closure = ctx;
333   eventer_add(e);
334   return;
335
336  reschedule:
337   if(fd >= 0) close(fd);
338   gettimeofday(&__now, NULL);
339   jlog_streamer_schedule_reattempt(ctx, &__now);
340   return;
341 }
342
343 int
344 initiate_jlog_streamer(const char *host, unsigned short port,
345                        noit_hash_table *sslconfig, noit_hash_table *config) {
346   jlog_streamer_ctx_t *ctx;
347
348   int8_t family;
349   int rv;
350   union {
351     struct in_addr addr4;
352     struct in6_addr addr6;
353   } a;
354
355   if(host[0] == '/') {
356     family = AF_UNIX;
357   }
358   else {
359     family = AF_INET;
360     rv = inet_pton(family, host, &a);
361     if(rv != 1) {
362       family = AF_INET6;
363       rv = inet_pton(family, host, &a);
364       if(rv != 1) {
365         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
366         return -1;
367       }
368     }
369   }
370
371   ctx = jlog_streamer_ctx_alloc();
372  
373   memset(&ctx->r, 0, sizeof(ctx->r));
374   if(family == AF_UNIX) {
375     struct sockaddr_un *s = &ctx->r.remote_un;
376     s->sun_family = AF_UNIX;
377     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
378     s->sun_len = sizeof(*s);
379   }
380   else {
381     struct sockaddr_in6 *s = &ctx->r.remote_in6;
382     s->sin6_family = family;
383     s->sin6_port = htons(port);
384     memcpy(&s->sin6_addr, &a, sizeof(a));
385     s->sin6_len = (family == AF_INET) ?
386                     sizeof(struct sockaddr_in) : sizeof(*s);
387   }
388
389   if(ctx->sslconfig)
390     noit_hash_delete_all(ctx->sslconfig, free, free);
391   else
392     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
393   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
394   if(ctx->config)
395     noit_hash_delete_all(ctx->config, free, free);
396   else
397     ctx->config = calloc(1, sizeof(noit_hash_table));
398   noit_hash_merge_as_dict(ctx->config, config);
399
400   jlog_streamer_initiate_connection(ctx);
401   return 0;
402 }
403
404 void
405 stratcon_jlog_streamer_reload(const char *toplevel) {
406   int i, cnt = 0;
407   noit_conf_section_t *noit_configs;
408   char path[256];
409
410   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
411   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
412   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
413   for(i=0; i<cnt; i++) {
414     char address[256];
415     unsigned short port;
416     int portint;
417     noit_hash_table *sslconfig, *config;
418
419     if(!noit_conf_get_stringbuf(noit_configs[i],
420                                 "ancestor-or-self::node()/@address",
421                                 address, sizeof(address))) {
422       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
423       continue;
424     }
425     if(!noit_conf_get_int(noit_configs[i],
426                           "ancestor-or-self::node()/@port", &portint))
427       portint = 0;
428     port = (unsigned short) portint;
429     if(address[0] != '/' && (portint == 0 || (port != portint))) {
430       /* UNIX sockets don't require a port (they'll ignore it if specified */
431       noitL(noit_stderr,
432             "Invalid port [%d] specified in stanza %d\n", port, i+1);
433       continue;
434     }
435     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
436     config = noit_conf_get_hash(noit_configs[i], "config");
437
438     initiate_jlog_streamer(address, port, sslconfig, config);
439   }
440 }
441
442 void
443 stratcon_jlog_streamer_init(const char *toplevel) {
444   eventer_name_callback("jlog_streamer_reinitiate",
445                         jlog_streamer_reinitiate);
446   eventer_name_callback("stratcon_jlog_recv_handler",
447                         stratcon_jlog_recv_handler);
448   eventer_name_callback("jlog_streamer_ssl_upgrade",
449                         jlog_streamer_ssl_upgrade);
450   eventer_name_callback("jlog_streamer_complete_connect",
451                         jlog_streamer_complete_connect);
452   stratcon_jlog_streamer_reload(toplevel);
453 }
Note: See TracBrowser for help on using the browser.