root/src/noit_jlog_listener.c

Revision b7ab38bf2df9fb0313f170c5ff925e8408c96f6b, 9.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

up the subscriber limit to 256

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "noit_jlog_listener.h"
40
41 #include <unistd.h>
42 #define MAX_ROWS_AT_ONCE 1000
43 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
44
45 static noit_atomic32_t tmpfeedcounter = 0;
46
47 void
48 noit_jlog_listener_init() {
49   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
50   noit_control_dispatch_delegate(noit_control_dispatch,
51                                  NOIT_JLOG_DATA_FEED,
52                                  noit_jlog_handler);
53   noit_control_dispatch_delegate(noit_control_dispatch,
54                                  NOIT_JLOG_DATA_TEMP_FEED,
55                                  noit_jlog_handler);
56 }
57
58 typedef struct {
59   jlog_ctx *jlog;
60   char *subscriber;
61   jlog_id chkpt;
62   jlog_id start;
63   jlog_id finish;
64   int count;
65   int wants_shutdown;
66 } noit_jlog_closure_t;
67
68 noit_jlog_closure_t *
69 noit_jlog_closure_alloc(void) {
70   noit_jlog_closure_t *jcl;
71   jcl = calloc(1, sizeof(*jcl));
72   return jcl;
73 }
74
75 void
76 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
77   if(jcl->jlog) {
78     if(jcl->subscriber) {
79       if(jcl->subscriber[0] == '~')
80         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
81       free(jcl->subscriber);
82     }
83     jlog_ctx_close(jcl->jlog);
84   }
85   free(jcl);
86 }
87
88 static int
89 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
90   int w, sofar = 0;
91   while(l > sofar) {
92     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
93     if(w <= 0) return w;
94     sofar += w;
95   }
96   return sofar;
97 }
98 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
99
100 static int
101 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
102   jlog_message msg;
103   int mask;
104   u_int32_t n_count;
105   n_count = htonl(jcl->count);
106   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
107     return -1;
108   while(jcl->count > 0) {
109     int rv;
110     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
111     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
112       return -1;
113
114     /* Here we actually push the message */
115     payload.chkpt.log = htonl(jcl->start.log);
116     payload.chkpt.marker = htonl(jcl->start.marker);
117     payload.n_sec  = htonl(msg.header->tv_sec);
118     payload.n_usec = htonl(msg.header->tv_usec);
119     payload.n_len  = htonl(msg.mess_len);
120     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
121       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
122             rv, (int)sizeof(payload));
123       return -1;
124     }
125     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
126       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
127             rv, msg.mess_len);
128       return -1;
129     }
130     /* Note what the client must checkpoint */
131     jcl->chkpt = jcl->start;
132
133     JLOG_ID_ADVANCE(&jcl->start);
134     jcl->count--;
135   }
136   return 0;
137 }
138
139 void *
140 noit_jlog_thread_main(void *e_vptr) {
141   int mask, bytes_read;
142   eventer_t e = e_vptr;
143   acceptor_closure_t *ac = e->closure;
144   noit_jlog_closure_t *jcl = ac->service_ctx;
145   char inbuff[sizeof(jlog_id)];
146
147   eventer_set_fd_blocking(e->fd);
148
149   while(1) {
150     jlog_id client_chkpt;
151     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
152                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
153     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
154     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
155     if(jcl->count > MAX_ROWS_AT_ONCE) {
156       /* Artificially set down the range to make the batches a bit easier
157        * to handle on the stratcond/postgres end.
158        * However, we must have more data, so drop the sleeptime to 0
159        */
160       jcl->count = MAX_ROWS_AT_ONCE;
161       jcl->finish.marker = jcl->start.marker + jcl->count;
162       sleeptime = 0;
163     }
164     if(jcl->count > 0) {
165       if(noit_jlog_push(e, jcl)) {
166         goto alldone;
167       }
168       /* Read our jlog_id accounting for possibly short reads */
169       bytes_read = 0;
170       while(bytes_read < sizeof(jlog_id)) {
171         int len;
172         if((len = e->opset->read(e->fd, inbuff + bytes_read,
173                                  sizeof(jlog_id) - bytes_read,
174                                  &mask, e)) <= 0)
175           goto alldone;
176         bytes_read += len;
177       }
178       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
179       /* Fix the endian */
180       client_chkpt.log = ntohl(client_chkpt.log);
181       client_chkpt.marker = ntohl(client_chkpt.marker);
182  
183       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
184         noitL(noit_error,
185               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
186               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
187               jcl->chkpt.log, jcl->chkpt.marker);
188         goto alldone;
189       }
190       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
191     }
192     if(sleeptime) sleep(sleeptime);
193   }
194
195  alldone:
196   e->opset->close(e->fd, &mask, e);
197   if(jcl) noit_jlog_closure_free(jcl);
198   if(ac) acceptor_closure_free(ac);
199   return NULL;
200 }
201
202 int
203 noit_jlog_handler(eventer_t e, int mask, void *closure,
204                      struct timeval *now) {
205   eventer_t newe;
206   pthread_t tid;
207   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
208   acceptor_closure_t *ac = closure;
209   noit_jlog_closure_t *jcl = ac->service_ctx;
210
211   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
212 socket_error:
213     /* Exceptions cause us to simply snip the connection */
214     eventer_remove_fd(e->fd);
215     e->opset->close(e->fd, &newmask, e);
216     if(jcl) noit_jlog_closure_free(jcl);
217     if(ac) acceptor_closure_free(ac);
218     return 0;
219   }
220
221   if(!ac->service_ctx) {
222     noit_log_stream_t ls;
223     const char *logname;
224     char path[PATH_MAX], subscriber[256], *sub;
225     jcl = ac->service_ctx = noit_jlog_closure_alloc();
226     if(!noit_hash_retr_str(ac->config,
227                            "log_transit_feed_name",
228                            strlen("log_transit_feed_name"),
229                            &logname)) {
230       noitL(noit_error, "No 'log_transit_feed_name' specified in log_transit.\n");
231       goto socket_error;
232     }
233     ls = noit_log_stream_find(logname);
234     if(!ls) {
235       noitL(noit_error, "Could not find log '%s' for log_transit.\n",
236             logname);
237       goto socket_error;
238     }
239     if(!ls->type || strcmp(ls->type, "jlog")) {
240       noitL(noit_error, "Log '%s' for log_transit is not a jlog.\n",
241             logname);
242       goto socket_error;
243     }
244     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
245       if(!ac->remote_cn) {
246         noitL(noit_error, "jlog transit started to unidentified party.\n");
247         goto socket_error;
248       }
249       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
250     }
251     else {
252       snprintf(subscriber, sizeof(subscriber),
253                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
254     }
255     jcl->subscriber = strdup(subscriber);
256
257     strlcpy(path, ls->path, sizeof(path));
258     sub = strchr(path, '(');
259     if(sub) {
260       char *esub = strchr(sub, ')');
261       if(esub) {
262         *esub = '\0';
263         *sub = '\0';
264         sub += 1;
265       }
266     }
267
268     jcl->jlog = jlog_new(path);
269     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
270       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1)
271         noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber,
272               jlog_ctx_err_string(jcl->jlog));
273     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
274       noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber,
275             jlog_ctx_err_string(jcl->jlog));
276       goto socket_error;
277     }
278   }
279
280   /* The jlog stuff is disk I/O and can block us.
281    * We'll create a new thread to just handle this connection.
282    */
283   eventer_remove_fd(e->fd);
284   newe = eventer_alloc();
285   memcpy(newe, e, sizeof(*e));
286   if(pthread_create(&tid, NULL, noit_jlog_thread_main, newe) == 0) {
287     return 0;
288   }
289
290   /* Undo our dup */
291   eventer_free(newe);
292   /* Creating the thread failed, close it down and deschedule. */
293   e->opset->close(e->fd, &newmask, e);
294   return 0;
295 }
Note: See TracBrowser for help on using the browser.