root/src/stratcon_jlog_streamer.c

Revision 23cfce8f39a09276852fa5e6455f3de80ab85b87, 19.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

Since this is over SSL, it is possible that we need write to do our read.
We should register the completion with all possible trigger types and then
"try" and set the mask we need upon return of first call.

This is an edge case and I'm convinced this isn't the actual fix to the problem
we're seeing.

refs #58

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_jlog_streamer.h"
41
42 #include <unistd.h>
43 #include <assert.h>
44 #include <errno.h>
45 #include <sys/types.h>
46 #include <sys/socket.h>
47 #ifdef HAVE_SYS_FILIO_H
48 #include <sys/filio.h>
49 #endif
50 #include <netinet/in.h>
51 #include <sys/un.h>
52 #include <arpa/inet.h>
53
54 noit_hash_table noits = NOIT_HASH_EMPTY;
55
56 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
57
58 jlog_streamer_ctx_t *
59 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
60   jlog_streamer_ctx_t *ctx;
61   ctx = stratcon_jlog_streamer_ctx_alloc();
62   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
63   ctx->push = stratcon_datastore_push;
64   return ctx;
65 }
66 jlog_streamer_ctx_t *
67 stratcon_jlog_streamer_ctx_alloc(void) {
68   jlog_streamer_ctx_t *ctx;
69   ctx = calloc(1, sizeof(*ctx));
70   return ctx;
71 }
72 noit_connection_ctx_t *
73 noit_connection_ctx_alloc(void) {
74   noit_connection_ctx_t *ctx;
75   ctx = calloc(1, sizeof(*ctx));
76   return ctx;
77 }
78 int
79 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
80                          struct timeval *now) {
81   noit_connection_ctx_t *ctx = closure;
82   ctx->timeout_event = NULL;
83   noit_connection_initiate_connection(closure);
84   return 0;
85 }
86 void
87 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
88                                    struct timeval *now) {
89   struct timeval __now, interval;
90   const char *v;
91   u_int32_t min_interval = 1000, max_interval = 8000;
92   if(noit_hash_retr_str(ctx->config,
93                         "reconnect_initial_interval",
94                         strlen("reconnect_initial_interval"),
95                         &v)) {
96     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
97   }
98   if(noit_hash_retr_str(ctx->config,
99                         "reconnect_maximum_interval",
100                         strlen("reconnect_maximum_interval"),
101                         &v)) {
102     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
103   }
104   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
105   else {
106     ctx->current_backoff *= 2;
107     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
108     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
109   }
110   if(!now) {
111     gettimeofday(&__now, NULL);
112     now = &__now;
113   }
114   interval.tv_sec = ctx->current_backoff / 1000;
115   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
116   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
117         ctx->current_backoff);
118   if(ctx->timeout_event)
119     eventer_remove(ctx->timeout_event);
120   else
121     ctx->timeout_event = eventer_alloc();
122   ctx->timeout_event->callback = noit_connection_reinitiate;
123   ctx->timeout_event->closure = ctx;
124   ctx->timeout_event->mask = EVENTER_TIMER;
125   add_timeval(*now, interval, &ctx->timeout_event->whence);
126   eventer_add(ctx->timeout_event);
127 }
128 void
129 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
130   if(ctx->remote_cn) free(ctx->remote_cn);
131   if(ctx->remote_str) free(ctx->remote_str);
132   if(ctx->timeout_event) {
133     eventer_remove(ctx->timeout_event);
134     eventer_free(ctx->timeout_event);
135   }
136   ctx->consumer_free(ctx->consumer_ctx);
137   free(ctx);
138 }
139 void
140 jlog_streamer_ctx_free(void *cl) {
141   jlog_streamer_ctx_t *ctx = cl;
142   if(ctx->buffer) free(ctx->buffer);
143   free(ctx);
144 }
145
146 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
147 static int
148 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
149   int len, mask;
150   while(ctx->bytes_read < ctx->bytes_expected) {
151     len = Eread(ctx->buffer + ctx->bytes_read,
152                 ctx->bytes_expected - ctx->bytes_read);
153     if(len < 0) {
154       *newmask = mask;
155       return -1;
156     }
157     /* if we get 0 inside SSL, and there was a real error, we
158      * will actually get a -1 here.
159      * if(len == 0) return ctx->bytes_read;
160      */
161     ctx->bytes_read += len;
162   }
163   assert(ctx->bytes_read == ctx->bytes_expected);
164   return ctx->bytes_read;
165 }
166 #define FULLREAD(e,ctx,size) do { \
167   int mask, len; \
168   if(!ctx->bytes_expected) { \
169     ctx->bytes_expected = size; \
170     if(ctx->buffer) free(ctx->buffer); \
171     ctx->buffer = malloc(size + 1); \
172     if(ctx->buffer == NULL) { \
173       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
174       goto socket_error; \
175     } \
176     ctx->buffer[size] = '\0'; \
177   } \
178   len = __read_on_ctx(e, ctx, &mask); \
179   if(len < 0) { \
180     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
181     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
182     goto socket_error; \
183   } \
184   ctx->bytes_read = 0; \
185   ctx->bytes_expected = 0; \
186   if(len != size) { \
187     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
188           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
189     goto socket_error; \
190   } \
191 } while(0)
192
193 int
194 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
195                            struct timeval *now) {
196   noit_connection_ctx_t *nctx = closure;
197   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
198   int len;
199   jlog_id n_chkpt;
200
201   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
202     if(write(e->fd, e, 0) == -1)
203       noitL(noit_error, "socket error: %s\n", strerror(errno));
204  socket_error:
205     ctx->state = JLOG_STREAMER_WANT_INITIATE;
206     ctx->count = 0;
207     ctx->bytes_read = 0;
208     ctx->bytes_expected = 0;
209     if(ctx->buffer) free(ctx->buffer);
210     ctx->buffer = NULL;
211     noit_connection_schedule_reattempt(nctx, now);
212     eventer_remove_fd(e->fd);
213     e->opset->close(e->fd, &mask, e);
214     return 0;
215   }
216
217   while(1) {
218     switch(ctx->state) {
219       case JLOG_STREAMER_WANT_INITIATE:
220         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
221                               sizeof(ctx->jlog_feed_cmd),
222                               &mask, e);
223         if(len < 0) {
224           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
225           goto socket_error;
226         }
227         if(len != sizeof(ctx->jlog_feed_cmd)) {
228           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
229                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
230           goto socket_error;
231         }
232         ctx->state = JLOG_STREAMER_WANT_COUNT;
233         break;
234
235       case JLOG_STREAMER_WANT_COUNT:
236         FULLREAD(e, ctx, sizeof(u_int32_t));
237         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
238         ctx->count = ntohl(ctx->count);
239         free(ctx->buffer); ctx->buffer = NULL;
240         ctx->state = JLOG_STREAMER_WANT_HEADER;
241         break;
242
243       case JLOG_STREAMER_WANT_HEADER:
244         if(ctx->count == 0) {
245           ctx->state = JLOG_STREAMER_WANT_COUNT;
246           break;
247         }
248         FULLREAD(e, ctx, sizeof(ctx->header));
249         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
250         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
251         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
252         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
253         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
254         ctx->header.message_len = ntohl(ctx->header.message_len);
255         free(ctx->buffer); ctx->buffer = NULL;
256         ctx->state = JLOG_STREAMER_WANT_BODY;
257         break;
258
259       case JLOG_STREAMER_WANT_BODY:
260         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
261         if(ctx->header.message_len > 0)
262           ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);
263         else if(ctx->buffer)
264           free(ctx->buffer);
265         /* Don't free the buffer, it's used by the datastore process. */
266         ctx->buffer = NULL;
267         ctx->count--;
268         if(ctx->count == 0) {
269           eventer_t completion_e;
270           eventer_remove_fd(e->fd);
271           completion_e = eventer_alloc();
272           memcpy(completion_e, e, sizeof(*e));
273           completion_e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
274           ctx->state = JLOG_STREAMER_WANT_CHKPT;
275           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e);
276           noitL(noit_debug, "Pushing batch asynch...\n");
277           return 0;
278         } else
279           ctx->state = JLOG_STREAMER_WANT_HEADER;
280         break;
281
282       case JLOG_STREAMER_WANT_CHKPT:
283         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
284               ctx->header.chkpt.log, ctx->header.chkpt.marker);
285         n_chkpt.log = htonl(ctx->header.chkpt.log);
286         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
287
288         /* screw short writes.  I'd rather die than not write my data! */
289         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
290                               &mask, e);
291         if(len < 0) {
292           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
293           goto socket_error;
294         }
295         if(len != sizeof(jlog_id)) {
296           noitL(noit_error, "short write on checkpointing stream.\n");
297           goto socket_error;
298         }
299         ctx->state = JLOG_STREAMER_WANT_COUNT;
300         break;
301     }
302   }
303   /* never get here */
304 }
305
306 int
307 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
308                             struct timeval *now) {
309   noit_connection_ctx_t *nctx = closure;
310   int rv;
311
312   rv = eventer_SSL_connect(e, &mask);
313   if(rv > 0) {
314     eventer_ssl_ctx_t *sslctx;
315     e->callback = nctx->consumer_callback;
316     /* We must make a copy of the acceptor_closure_t for each new
317      * connection.
318      */
319     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
320       char *cn, *end;
321       cn = eventer_ssl_get_peer_subject(sslctx);
322       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
323         cn += 3;
324         end = cn;
325         while(*end && *end != '/') end++;
326         nctx->remote_cn = malloc(end - cn + 1);
327         memcpy(nctx->remote_cn, cn, end - cn);
328         nctx->remote_cn[end-cn] = '\0';
329       }
330     }
331     return e->callback(e, mask, e->closure, now);
332   }
333   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
334
335   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
336   eventer_remove_fd(e->fd);
337   e->opset->close(e->fd, &mask, e);
338   noit_connection_schedule_reattempt(nctx, now);
339   return 0;
340 }
341 int
342 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
343                                  struct timeval *now) {
344   noit_connection_ctx_t *nctx = closure;
345   const char *cert, *key, *ca, *ciphers;
346   char remote_str[128], tmp_str[128];
347   eventer_ssl_ctx_t *sslctx;
348   int aerrno, len;
349   socklen_t aerrno_len = sizeof(aerrno);
350
351   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
352     if(aerrno != 0) goto connect_error;
353   aerrno = 0;
354
355   if(mask & EVENTER_EXCEPTION) {
356     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
357       aerrno = errno;
358  connect_error:
359     switch(nctx->r.remote.sa_family) {
360       case AF_INET:
361         len = sizeof(struct sockaddr_in);
362         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
363                   tmp_str, len);
364         snprintf(remote_str, sizeof(remote_str), "%s:%d",
365                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
366         break;
367       case AF_INET6:
368         len = sizeof(struct sockaddr_in6);
369         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
370                   tmp_str, len);
371         snprintf(remote_str, sizeof(remote_str), "%s:%d",
372                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
373        break;
374       case AF_UNIX:
375         len = SUN_LEN(&(nctx->r.remote_un));
376         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
377         break;
378       default:
379         snprintf(remote_str, sizeof(remote_str), "(unknown)");
380     }
381     noitL(noit_error, "Error connecting to %s: %s\n",
382           remote_str, strerror(aerrno));
383     eventer_remove_fd(e->fd);
384     e->opset->close(e->fd, &mask, e);
385     noit_connection_schedule_reattempt(nctx, now);
386     return 0;
387   }
388
389 #define SSLCONFGET(var,name) do { \
390   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
391                          &var)) var = NULL; } while(0)
392   SSLCONFGET(cert, "certificate_file");
393   SSLCONFGET(key, "key_file");
394   SSLCONFGET(ca, "ca_chain");
395   SSLCONFGET(ciphers, "ciphers");
396   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
397   if(!sslctx) goto connect_error;
398
399   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
400                              nctx->sslconfig);
401   EVENTER_ATTACH_SSL(e, sslctx);
402   e->callback = noit_connection_ssl_upgrade;
403   return e->callback(e, mask, closure, now);
404 }
405 static void
406 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
407   struct timeval __now;
408   eventer_t e;
409   int rv, fd = -1;
410
411   /* Open a socket */
412   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
413   if(fd < 0) goto reschedule;
414
415   /* Make it non-blocking */
416   if(eventer_set_fd_nonblocking(fd)) goto reschedule;
417
418   /* Initiate a connection */
419   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
420   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
421
422   /* Register a handler for connection completion */
423   e = eventer_alloc();
424   e->fd = fd;
425   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
426   e->callback = noit_connection_complete_connect;
427   e->closure = nctx;
428   eventer_add(e);
429   return;
430
431  reschedule:
432   if(fd >= 0) close(fd);
433   gettimeofday(&__now, NULL);
434   noit_connection_schedule_reattempt(nctx, &__now);
435   return;
436 }
437
438 int
439 initiate_noit_connection(const char *host, unsigned short port,
440                          noit_hash_table *sslconfig, noit_hash_table *config,
441                          eventer_func_t handler, void *closure,
442                          void (*freefunc)(void *)) {
443   noit_connection_ctx_t *ctx;
444
445   int8_t family;
446   int rv;
447   union {
448     struct in_addr addr4;
449     struct in6_addr addr6;
450   } a;
451
452   if(host[0] == '/') {
453     family = AF_UNIX;
454   }
455   else {
456     family = AF_INET;
457     rv = inet_pton(family, host, &a);
458     if(rv != 1) {
459       family = AF_INET6;
460       rv = inet_pton(family, host, &a);
461       if(rv != 1) {
462         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
463         return -1;
464       }
465     }
466   }
467
468   ctx = noit_connection_ctx_alloc();
469   ctx->remote_str = calloc(1, strlen(host) + 7);
470   snprintf(ctx->remote_str, strlen(host) + 7,
471            "%s:%d", host, port);
472  
473   memset(&ctx->r, 0, sizeof(ctx->r));
474   if(family == AF_UNIX) {
475     struct sockaddr_un *s = &ctx->r.remote_un;
476     s->sun_family = AF_UNIX;
477     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
478     ctx->remote_len = sizeof(*s);
479   }
480   else if(family == AF_INET) {
481     struct sockaddr_in *s = &ctx->r.remote_in;
482     s->sin_family = family;
483     s->sin_port = htons(port);
484     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
485     ctx->remote_len = sizeof(*s);
486   }
487   else {
488     struct sockaddr_in6 *s = &ctx->r.remote_in6;
489     s->sin6_family = family;
490     s->sin6_port = htons(port);
491     memcpy(&s->sin6_addr, &a, sizeof(a));
492     ctx->remote_len = sizeof(*s);
493   }
494
495   if(ctx->sslconfig)
496     noit_hash_delete_all(ctx->sslconfig, free, free);
497   else
498     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
499   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
500   if(ctx->config)
501     noit_hash_delete_all(ctx->config, free, free);
502   else
503     ctx->config = calloc(1, sizeof(noit_hash_table));
504   noit_hash_merge_as_dict(ctx->config, config);
505
506   ctx->consumer_callback = handler;
507   ctx->consumer_free = freefunc;
508   ctx->consumer_ctx = closure;
509   noit_connection_initiate_connection(ctx);
510   return 0;
511 }
512
513 void
514 stratcon_streamer_connection(const char *toplevel, const char *destination,
515                              eventer_func_t handler,
516                              void *(*handler_alloc)(void), void *handler_ctx,
517                              void (*handler_free)(void *)) {
518   int i, cnt = 0;
519   noit_conf_section_t *noit_configs;
520   char path[256];
521
522   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
523   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
524   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
525   for(i=0; i<cnt; i++) {
526     char address[256];
527     unsigned short port;
528     int portint;
529     noit_hash_table *sslconfig, *config;
530
531     if(!noit_conf_get_stringbuf(noit_configs[i],
532                                 "ancestor-or-self::node()/@address",
533                                 address, sizeof(address))) {
534       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
535       continue;
536     }
537     /* if destination is specified, exact match it */
538     if(destination && strcmp(address, destination)) continue;
539
540     if(!noit_conf_get_int(noit_configs[i],
541                           "ancestor-or-self::node()/@port", &portint))
542       portint = 0;
543     port = (unsigned short) portint;
544     if(address[0] != '/' && (portint == 0 || (port != portint))) {
545       /* UNIX sockets don't require a port (they'll ignore it if specified */
546       noitL(noit_stderr,
547             "Invalid port [%d] specified in stanza %d\n", port, i+1);
548       continue;
549     }
550     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
551     config = noit_conf_get_hash(noit_configs[i], "config");
552
553     noitL(noit_error, "initiating to %s\n", address);
554     initiate_noit_connection(address, port, sslconfig, config,
555                              handler,
556                              handler_alloc ? handler_alloc() : handler_ctx,
557                              handler_free);
558     noit_hash_destroy(sslconfig,free,free);
559     free(sslconfig);
560     noit_hash_destroy(config,free,free);
561     free(config);
562   }
563   free(noit_configs);
564 }
565 void
566 stratcon_jlog_streamer_reload(const char *toplevel) {
567   stratcon_streamer_connection(toplevel, NULL,
568                                stratcon_jlog_recv_handler,
569                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
570                                NULL,
571                                jlog_streamer_ctx_free);
572 }
573
574 void
575 stratcon_jlog_streamer_init(const char *toplevel) {
576   eventer_name_callback("noit_connection_reinitiate",
577                         noit_connection_reinitiate);
578   eventer_name_callback("stratcon_jlog_recv_handler",
579                         stratcon_jlog_recv_handler);
580   eventer_name_callback("noit_connection_ssl_upgrade",
581                         noit_connection_ssl_upgrade);
582   eventer_name_callback("noit_connection_complete_connect",
583                         noit_connection_complete_connect);
584   stratcon_jlog_streamer_reload(toplevel);
585 }
Note: See TracBrowser for help on using the browser.