root/src/stratcon_jlog_streamer.c

Revision 76d9e48076935daad9912f8ada1c5976db8f1e85, 19.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

first whack at good error messages on failed connections. refs #164

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_conf.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "noit_jlog_listener.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_jlog_streamer.h"
41
42 #include <unistd.h>
43 #include <assert.h>
44 #include <errno.h>
45 #include <sys/types.h>
46 #include <sys/socket.h>
47 #ifdef HAVE_SYS_FILIO_H
48 #include <sys/filio.h>
49 #endif
50 #include <netinet/in.h>
51 #include <sys/un.h>
52 #include <arpa/inet.h>
53
54 noit_hash_table noits = NOIT_HASH_EMPTY;
55
56 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx);
57
58 jlog_streamer_ctx_t *
59 stratcon_jlog_streamer_datastore_ctx_alloc(void) {
60   jlog_streamer_ctx_t *ctx;
61   ctx = stratcon_jlog_streamer_ctx_alloc();
62   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED);
63   ctx->push = stratcon_datastore_push;
64   return ctx;
65 }
66 jlog_streamer_ctx_t *
67 stratcon_jlog_streamer_ctx_alloc(void) {
68   jlog_streamer_ctx_t *ctx;
69   ctx = calloc(1, sizeof(*ctx));
70   return ctx;
71 }
72 noit_connection_ctx_t *
73 noit_connection_ctx_alloc(void) {
74   noit_connection_ctx_t *ctx;
75   ctx = calloc(1, sizeof(*ctx));
76   return ctx;
77 }
78 int
79 noit_connection_reinitiate(eventer_t e, int mask, void *closure,
80                          struct timeval *now) {
81   noit_connection_ctx_t *ctx = closure;
82   ctx->timeout_event = NULL;
83   noit_connection_initiate_connection(closure);
84   return 0;
85 }
86 void
87 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx,
88                                    struct timeval *now) {
89   struct timeval __now, interval;
90   const char *v;
91   u_int32_t min_interval = 1000, max_interval = 8000;
92   if(noit_hash_retr_str(ctx->config,
93                         "reconnect_initial_interval",
94                         strlen("reconnect_initial_interval"),
95                         &v)) {
96     min_interval = MAX(atoi(v), 100); /* .1 second minimum */
97   }
98   if(noit_hash_retr_str(ctx->config,
99                         "reconnect_maximum_interval",
100                         strlen("reconnect_maximum_interval"),
101                         &v)) {
102     max_interval = MIN(atoi(v), 3600*1000); /* 1 hour maximum */
103   }
104   if(ctx->current_backoff == 0) ctx->current_backoff = min_interval;
105   else {
106     ctx->current_backoff *= 2;
107     ctx->current_backoff = MAX(min_interval, ctx->current_backoff);
108     ctx->current_backoff = MIN(max_interval, ctx->current_backoff);
109   }
110   if(!now) {
111     gettimeofday(&__now, NULL);
112     now = &__now;
113   }
114   interval.tv_sec = ctx->current_backoff / 1000;
115   interval.tv_usec = (ctx->current_backoff % 1000) * 1000;
116   noitL(noit_debug, "Next jlog_streamer attempt in %ums\n",
117         ctx->current_backoff);
118   if(ctx->timeout_event)
119     eventer_remove(ctx->timeout_event);
120   else
121     ctx->timeout_event = eventer_alloc();
122   ctx->timeout_event->callback = noit_connection_reinitiate;
123   ctx->timeout_event->closure = ctx;
124   ctx->timeout_event->mask = EVENTER_TIMER;
125   add_timeval(*now, interval, &ctx->timeout_event->whence);
126   eventer_add(ctx->timeout_event);
127 }
128 void
129 noit_connection_ctx_free(noit_connection_ctx_t *ctx) {
130   if(ctx->remote_cn) free(ctx->remote_cn);
131   if(ctx->remote_str) free(ctx->remote_str);
132   if(ctx->timeout_event) {
133     eventer_remove(ctx->timeout_event);
134     eventer_free(ctx->timeout_event);
135   }
136   ctx->consumer_free(ctx->consumer_ctx);
137   free(ctx);
138 }
139 void
140 jlog_streamer_ctx_free(void *cl) {
141   jlog_streamer_ctx_t *ctx = cl;
142   if(ctx->buffer) free(ctx->buffer);
143   free(ctx);
144 }
145
146 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e)
147 static int
148 __read_on_ctx(eventer_t e, jlog_streamer_ctx_t *ctx, int *newmask) {
149   int len, mask;
150   while(ctx->bytes_read < ctx->bytes_expected) {
151     len = Eread(ctx->buffer + ctx->bytes_read,
152                 ctx->bytes_expected - ctx->bytes_read);
153     if(len < 0) {
154       *newmask = mask;
155       return -1;
156     }
157     /* if we get 0 inside SSL, and there was a real error, we
158      * will actually get a -1 here.
159      * if(len == 0) return ctx->bytes_read;
160      */
161     ctx->bytes_read += len;
162   }
163   assert(ctx->bytes_read == ctx->bytes_expected);
164   return ctx->bytes_read;
165 }
166 #define FULLREAD(e,ctx,size) do { \
167   int mask, len; \
168   if(!ctx->bytes_expected) { \
169     ctx->bytes_expected = size; \
170     if(ctx->buffer) free(ctx->buffer); \
171     ctx->buffer = malloc(size + 1); \
172     if(ctx->buffer == NULL) { \
173       noitL(noit_error, "malloc(%lu) failed.\n", (long unsigned int)size + 1); \
174       goto socket_error; \
175     } \
176     ctx->buffer[size] = '\0'; \
177   } \
178   len = __read_on_ctx(e, ctx, &mask); \
179   if(len < 0) { \
180     if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \
181     noitL(noit_error, "[%s] SSL read error: %s\n", nctx->remote_str, strerror(errno)); \
182     goto socket_error; \
183   } \
184   ctx->bytes_read = 0; \
185   ctx->bytes_expected = 0; \
186   if(len != size) { \
187     noitL(noit_error, "[%s] SSL short read [%d] (%d/%lu).  Reseting connection.\n", \
188           nctx->remote_str, ctx->state, len, (long unsigned int)size); \
189     goto socket_error; \
190   } \
191 } while(0)
192
193 int
194 stratcon_jlog_recv_handler(eventer_t e, int mask, void *closure,
195                            struct timeval *now) {
196   noit_connection_ctx_t *nctx = closure;
197   jlog_streamer_ctx_t *ctx = nctx->consumer_ctx;
198   int len;
199   jlog_id n_chkpt;
200
201   if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) {
202     if(write(e->fd, e, 0) == -1)
203       noitL(noit_error, "socket error: %s\n", strerror(errno));
204  socket_error:
205     ctx->state = JLOG_STREAMER_WANT_INITIATE;
206     ctx->count = 0;
207     ctx->bytes_read = 0;
208     ctx->bytes_expected = 0;
209     if(ctx->buffer) free(ctx->buffer);
210     ctx->buffer = NULL;
211     noit_connection_schedule_reattempt(nctx, now);
212     eventer_remove_fd(e->fd);
213     e->opset->close(e->fd, &mask, e);
214     return 0;
215   }
216
217   while(1) {
218     switch(ctx->state) {
219       case JLOG_STREAMER_WANT_INITIATE:
220         len = e->opset->write(e->fd, &ctx->jlog_feed_cmd,
221                               sizeof(ctx->jlog_feed_cmd),
222                               &mask, e);
223         if(len < 0) {
224           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
225           goto socket_error;
226         }
227         if(len != sizeof(ctx->jlog_feed_cmd)) {
228           noitL(noit_error, "short write [%d/%d] on initiating stream.\n",
229                 (int)len, (int)sizeof(ctx->jlog_feed_cmd));
230           goto socket_error;
231         }
232         ctx->state = JLOG_STREAMER_WANT_COUNT;
233         break;
234
235       case JLOG_STREAMER_WANT_COUNT:
236         FULLREAD(e, ctx, sizeof(u_int32_t));
237         memcpy(&ctx->count, ctx->buffer, sizeof(u_int32_t));
238         ctx->count = ntohl(ctx->count);
239         free(ctx->buffer); ctx->buffer = NULL;
240         ctx->state = JLOG_STREAMER_WANT_HEADER;
241         break;
242
243       case JLOG_STREAMER_WANT_HEADER:
244         if(ctx->count == 0) {
245           ctx->state = JLOG_STREAMER_WANT_COUNT;
246           break;
247         }
248         FULLREAD(e, ctx, sizeof(ctx->header));
249         memcpy(&ctx->header, ctx->buffer, sizeof(ctx->header));
250         ctx->header.chkpt.log = ntohl(ctx->header.chkpt.log);
251         ctx->header.chkpt.marker = ntohl(ctx->header.chkpt.marker);
252         ctx->header.tv_sec = ntohl(ctx->header.tv_sec);
253         ctx->header.tv_usec = ntohl(ctx->header.tv_usec);
254         ctx->header.message_len = ntohl(ctx->header.message_len);
255         free(ctx->buffer); ctx->buffer = NULL;
256         ctx->state = JLOG_STREAMER_WANT_BODY;
257         break;
258
259       case JLOG_STREAMER_WANT_BODY:
260         FULLREAD(e, ctx, (unsigned long)ctx->header.message_len);
261         if(ctx->header.message_len > 0)
262           ctx->push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer);
263         else if(ctx->buffer)
264           free(ctx->buffer);
265         /* Don't free the buffer, it's used by the datastore process. */
266         ctx->buffer = NULL;
267         ctx->count--;
268         if(ctx->count == 0) {
269           eventer_t completion_e;
270           eventer_remove_fd(e->fd);
271           completion_e = eventer_alloc();
272           memcpy(completion_e, e, sizeof(*e));
273           completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION;
274           ctx->state = JLOG_STREAMER_WANT_CHKPT;
275           ctx->push(DS_OP_CHKPT, &nctx->r.remote, completion_e);
276           noitL(noit_debug, "Pushing batch asynch...\n");
277           return 0;
278         } else
279           ctx->state = JLOG_STREAMER_WANT_HEADER;
280         break;
281
282       case JLOG_STREAMER_WANT_CHKPT:
283         noitL(noit_debug, "Pushing checkpoint: [%u/%u]\n",
284               ctx->header.chkpt.log, ctx->header.chkpt.marker);
285         n_chkpt.log = htonl(ctx->header.chkpt.log);
286         n_chkpt.marker = htonl(ctx->header.chkpt.marker);
287
288         /* screw short writes.  I'd rather die than not write my data! */
289         len = e->opset->write(e->fd, &n_chkpt, sizeof(jlog_id),
290                               &mask, e);
291         if(len < 0) {
292           if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
293           goto socket_error;
294         }
295         if(len != sizeof(jlog_id)) {
296           noitL(noit_error, "short write on checkpointing stream.\n");
297           goto socket_error;
298         }
299         ctx->state = JLOG_STREAMER_WANT_COUNT;
300         break;
301     }
302   }
303   /* never get here */
304 }
305
306 int
307 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure,
308                             struct timeval *now) {
309   noit_connection_ctx_t *nctx = closure;
310   int rv;
311
312   rv = eventer_SSL_connect(e, &mask);
313   if(rv > 0) {
314     eventer_ssl_ctx_t *sslctx;
315     e->callback = nctx->consumer_callback;
316     /* We must make a copy of the acceptor_closure_t for each new
317      * connection.
318      */
319     if((sslctx = eventer_get_eventer_ssl_ctx(e)) != NULL) {
320       char *cn, *end;
321       cn = eventer_ssl_get_peer_subject(sslctx);
322       if(cn && (cn = strstr(cn, "CN=")) != NULL) {
323         cn += 3;
324         end = cn;
325         while(*end && *end != '/') end++;
326         nctx->remote_cn = malloc(end - cn + 1);
327         memcpy(nctx->remote_cn, cn, end - cn);
328         nctx->remote_cn[end-cn] = '\0';
329       }
330     }
331     return e->callback(e, mask, e->closure, now);
332   }
333   if(errno == EAGAIN) return mask | EVENTER_EXCEPTION;
334
335   noitL(noit_error, "jlog streamer SSL upgrade failed.\n");
336   eventer_remove_fd(e->fd);
337   e->opset->close(e->fd, &mask, e);
338   noit_connection_schedule_reattempt(nctx, now);
339   return 0;
340 }
341 int
342 noit_connection_complete_connect(eventer_t e, int mask, void *closure,
343                                  struct timeval *now) {
344   noit_connection_ctx_t *nctx = closure;
345   const char *cert, *key, *ca, *ciphers;
346   char remote_str[128], tmp_str[128];
347   eventer_ssl_ctx_t *sslctx;
348   int aerrno, len;
349   socklen_t aerrno_len = sizeof(aerrno);
350
351   if(getsockopt(e->fd,SOL_SOCKET,SO_ERROR, &aerrno, &aerrno_len) == 0)
352     if(aerrno != 0) goto connect_error;
353   aerrno = 0;
354
355   if(mask & EVENTER_EXCEPTION) {
356     if(aerrno == 0 && (write(e->fd, e, 0) == -1))
357       aerrno = errno;
358  connect_error:
359     switch(nctx->r.remote.sa_family) {
360       case AF_INET:
361         len = sizeof(struct sockaddr_in);
362         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in.sin_addr,
363                   tmp_str, len);
364         snprintf(remote_str, sizeof(remote_str), "%s:%d",
365                  tmp_str, ntohs(nctx->r.remote_in.sin_port));
366         break;
367       case AF_INET6:
368         len = sizeof(struct sockaddr_in6);
369         inet_ntop(nctx->r.remote.sa_family, &nctx->r.remote_in6.sin6_addr,
370                   tmp_str, len);
371         snprintf(remote_str, sizeof(remote_str), "%s:%d",
372                  tmp_str, ntohs(nctx->r.remote_in6.sin6_port));
373        break;
374       case AF_UNIX:
375         len = SUN_LEN(&(nctx->r.remote_un));
376         snprintf(remote_str, sizeof(remote_str), "%s", nctx->r.remote_un.sun_path);
377         break;
378       default:
379         snprintf(remote_str, sizeof(remote_str), "(unknown)");
380     }
381     noitL(noit_error, "Error connecting to %s: %s\n",
382           remote_str, strerror(aerrno));
383     eventer_remove_fd(e->fd);
384     e->opset->close(e->fd, &mask, e);
385     noit_connection_schedule_reattempt(nctx, now);
386     return 0;
387   }
388
389 #define SSLCONFGET(var,name) do { \
390   if(!noit_hash_retr_str(nctx->sslconfig, name, strlen(name), \
391                          &var)) var = NULL; } while(0)
392   SSLCONFGET(cert, "certificate_file");
393   SSLCONFGET(key, "key_file");
394   SSLCONFGET(ca, "ca_chain");
395   SSLCONFGET(ciphers, "ciphers");
396   sslctx = eventer_ssl_ctx_new(SSL_CLIENT, cert, key, ca, ciphers);
397   if(!sslctx) goto connect_error;
398
399   eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert,
400                              nctx->sslconfig);
401   EVENTER_ATTACH_SSL(e, sslctx);
402   e->callback = noit_connection_ssl_upgrade;
403   return e->callback(e, mask, closure, now);
404 }
405 static void
406 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) {
407   struct timeval __now;
408   eventer_t e;
409   int rv, fd = -1;
410   long on;
411
412   /* Open a socket */
413   fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0);
414   if(fd < 0) goto reschedule;
415
416   /* Make it non-blocking */
417   on = 1;
418   if(ioctl(fd, FIONBIO, &on)) goto reschedule;
419
420   /* Initiate a connection */
421   rv = connect(fd, &nctx->r.remote, nctx->remote_len);
422   if(rv == -1 && errno != EINPROGRESS) goto reschedule;
423
424   /* Register a handler for connection completion */
425   e = eventer_alloc();
426   e->fd = fd;
427   e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
428   e->callback = noit_connection_complete_connect;
429   e->closure = nctx;
430   eventer_add(e);
431   return;
432
433  reschedule:
434   if(fd >= 0) close(fd);
435   gettimeofday(&__now, NULL);
436   noit_connection_schedule_reattempt(nctx, &__now);
437   return;
438 }
439
440 int
441 initiate_noit_connection(const char *host, unsigned short port,
442                          noit_hash_table *sslconfig, noit_hash_table *config,
443                          eventer_func_t handler, void *closure,
444                          void (*freefunc)(void *)) {
445   noit_connection_ctx_t *ctx;
446
447   int8_t family;
448   int rv;
449   union {
450     struct in_addr addr4;
451     struct in6_addr addr6;
452   } a;
453
454   if(host[0] == '/') {
455     family = AF_UNIX;
456   }
457   else {
458     family = AF_INET;
459     rv = inet_pton(family, host, &a);
460     if(rv != 1) {
461       family = AF_INET6;
462       rv = inet_pton(family, host, &a);
463       if(rv != 1) {
464         noitL(noit_stderr, "Cannot translate '%s' to IP\n", host);
465         return -1;
466       }
467     }
468   }
469
470   ctx = noit_connection_ctx_alloc();
471   ctx->remote_str = calloc(1, strlen(host) + 7);
472   snprintf(ctx->remote_str, strlen(host) + 7,
473            "%s:%d", host, port);
474  
475   memset(&ctx->r, 0, sizeof(ctx->r));
476   if(family == AF_UNIX) {
477     struct sockaddr_un *s = &ctx->r.remote_un;
478     s->sun_family = AF_UNIX;
479     strncpy(s->sun_path, host, sizeof(s->sun_path)-1);
480     ctx->remote_len = sizeof(*s);
481   }
482   else if(family == AF_INET) {
483     struct sockaddr_in *s = &ctx->r.remote_in;
484     s->sin_family = family;
485     s->sin_port = htons(port);
486     memcpy(&s->sin_addr, &a, sizeof(struct in_addr));
487     ctx->remote_len = sizeof(*s);
488   }
489   else {
490     struct sockaddr_in6 *s = &ctx->r.remote_in6;
491     s->sin6_family = family;
492     s->sin6_port = htons(port);
493     memcpy(&s->sin6_addr, &a, sizeof(a));
494     ctx->remote_len = sizeof(*s);
495   }
496
497   if(ctx->sslconfig)
498     noit_hash_delete_all(ctx->sslconfig, free, free);
499   else
500     ctx->sslconfig = calloc(1, sizeof(noit_hash_table));
501   noit_hash_merge_as_dict(ctx->sslconfig, sslconfig);
502   if(ctx->config)
503     noit_hash_delete_all(ctx->config, free, free);
504   else
505     ctx->config = calloc(1, sizeof(noit_hash_table));
506   noit_hash_merge_as_dict(ctx->config, config);
507
508   ctx->consumer_callback = handler;
509   ctx->consumer_free = freefunc;
510   ctx->consumer_ctx = closure;
511   noit_connection_initiate_connection(ctx);
512   return 0;
513 }
514
515 void
516 stratcon_streamer_connection(const char *toplevel, const char *destination,
517                              eventer_func_t handler,
518                              void *(*handler_alloc)(void), void *handler_ctx,
519                              void (*handler_free)(void *)) {
520   int i, cnt = 0;
521   noit_conf_section_t *noit_configs;
522   char path[256];
523
524   snprintf(path, sizeof(path), "/%s/noits//noit", toplevel ? toplevel : "*");
525   noit_configs = noit_conf_get_sections(NULL, path, &cnt);
526   noitL(noit_error, "Found %d %s stanzas\n", cnt, path);
527   for(i=0; i<cnt; i++) {
528     char address[256];
529     unsigned short port;
530     int portint;
531     noit_hash_table *sslconfig, *config;
532
533     if(!noit_conf_get_stringbuf(noit_configs[i],
534                                 "ancestor-or-self::node()/@address",
535                                 address, sizeof(address))) {
536       noitL(noit_error, "address attribute missing in noit %d\n", i+1);
537       continue;
538     }
539     /* if destination is specified, exact match it */
540     if(destination && strcmp(address, destination)) continue;
541
542     if(!noit_conf_get_int(noit_configs[i],
543                           "ancestor-or-self::node()/@port", &portint))
544       portint = 0;
545     port = (unsigned short) portint;
546     if(address[0] != '/' && (portint == 0 || (port != portint))) {
547       /* UNIX sockets don't require a port (they'll ignore it if specified */
548       noitL(noit_stderr,
549             "Invalid port [%d] specified in stanza %d\n", port, i+1);
550       continue;
551     }
552     sslconfig = noit_conf_get_hash(noit_configs[i], "sslconfig");
553     config = noit_conf_get_hash(noit_configs[i], "config");
554
555     noitL(noit_error, "initiating to %s\n", address);
556     initiate_noit_connection(address, port, sslconfig, config,
557                              handler,
558                              handler_alloc ? handler_alloc() : handler_ctx,
559                              handler_free);
560     noit_hash_destroy(sslconfig,free,free);
561     free(sslconfig);
562     noit_hash_destroy(config,free,free);
563     free(config);
564   }
565   free(noit_configs);
566 }
567 void
568 stratcon_jlog_streamer_reload(const char *toplevel) {
569   stratcon_streamer_connection(toplevel, NULL,
570                                stratcon_jlog_recv_handler,
571                                (void *(*)())stratcon_jlog_streamer_datastore_ctx_alloc,
572                                NULL,
573                                jlog_streamer_ctx_free);
574 }
575
576 void
577 stratcon_jlog_streamer_init(const char *toplevel) {
578   eventer_name_callback("noit_connection_reinitiate",
579                         noit_connection_reinitiate);
580   eventer_name_callback("stratcon_jlog_recv_handler",
581                         stratcon_jlog_recv_handler);
582   eventer_name_callback("noit_connection_ssl_upgrade",
583                         noit_connection_ssl_upgrade);
584   eventer_name_callback("noit_connection_complete_connect",
585                         noit_connection_complete_connect);
586   stratcon_jlog_streamer_reload(toplevel);
587 }
Note: See TracBrowser for help on using the browser.