root/src/noit_jlog_listener.c

Revision 88a71780101cbf23034aa0cb840f9f0368fda2dd, 9.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fixes #126

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "noit_jlog_listener.h"
40
41 #include <unistd.h>
42 #include <sys/ioctl.h>
43 #define MAX_ROWS_AT_ONCE 1000
44 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
45
46 static noit_atomic32_t tmpfeedcounter = 0;
47
48 void
49 noit_jlog_listener_init() {
50   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
51   noit_control_dispatch_delegate(noit_control_dispatch,
52                                  NOIT_JLOG_DATA_FEED,
53                                  noit_jlog_handler);
54   noit_control_dispatch_delegate(noit_control_dispatch,
55                                  NOIT_JLOG_DATA_TEMP_FEED,
56                                  noit_jlog_handler);
57 }
58
59 typedef struct {
60   jlog_ctx *jlog;
61   char *subscriber;
62   jlog_id chkpt;
63   jlog_id start;
64   jlog_id finish;
65   int count;
66   int wants_shutdown;
67 } noit_jlog_closure_t;
68
69 noit_jlog_closure_t *
70 noit_jlog_closure_alloc(void) {
71   noit_jlog_closure_t *jcl;
72   jcl = calloc(1, sizeof(*jcl));
73   return jcl;
74 }
75
76 void
77 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
78   if(jcl->jlog) {
79     if(jcl->subscriber) {
80       if(jcl->subscriber[0] == '~')
81         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
82       free(jcl->subscriber);
83     }
84     jlog_ctx_close(jcl->jlog);
85   }
86   free(jcl);
87 }
88
89 static int
90 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
91   int w, sofar = 0;
92   while(l > sofar) {
93     w = e->opset->write(e->fd, b + sofar, l - sofar, mask, e);
94     if(w <= 0) return w;
95     sofar += w;
96   }
97   return sofar;
98 }
99 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
100
101 static int
102 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
103   jlog_message msg;
104   int mask;
105   u_int32_t n_count;
106   n_count = htonl(jcl->count);
107   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
108     return -1;
109   while(jcl->count > 0) {
110     int rv;
111     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
112     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
113       return -1;
114
115     /* Here we actually push the message */
116     payload.chkpt.log = htonl(jcl->start.log);
117     payload.chkpt.marker = htonl(jcl->start.marker);
118     payload.n_sec  = htonl(msg.header->tv_sec);
119     payload.n_usec = htonl(msg.header->tv_usec);
120     payload.n_len  = htonl(msg.mess_len);
121     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
122       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
123             rv, (int)sizeof(payload));
124       return -1;
125     }
126     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
127       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
128             rv, msg.mess_len);
129       return -1;
130     }
131     /* Note what the client must checkpoint */
132     jcl->chkpt = jcl->start;
133
134     JLOG_ID_ADVANCE(&jcl->start);
135     jcl->count--;
136   }
137   return 0;
138 }
139
140 void *
141 noit_jlog_thread_main(void *e_vptr) {
142   int mask, bytes_read;
143   eventer_t e = e_vptr;
144   acceptor_closure_t *ac = e->closure;
145   noit_jlog_closure_t *jcl = ac->service_ctx;
146   long off = 0;
147   char inbuff[sizeof(jlog_id)];
148
149   /* Go into blocking mode */
150   ioctl(e->fd, FIONBIO, &off);
151
152   while(1) {
153     jlog_id client_chkpt;
154     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
155                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
156     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
157     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
158     if(jcl->count > MAX_ROWS_AT_ONCE) {
159       /* Artificially set down the range to make the batches a bit easier
160        * to handle on the stratcond/postgres end.
161        * However, we must have more data, so drop the sleeptime to 0
162        */
163       jcl->count = MAX_ROWS_AT_ONCE;
164       jcl->finish.marker = jcl->start.marker + jcl->count;
165       sleeptime = 0;
166     }
167     if(jcl->count > 0) {
168       if(noit_jlog_push(e, jcl)) {
169         goto alldone;
170       }
171       /* Read our jlog_id accounting for possibly short reads */
172       bytes_read = 0;
173       while(bytes_read < sizeof(jlog_id)) {
174         int len;
175         if((len = e->opset->read(e->fd, inbuff + bytes_read,
176                                  sizeof(jlog_id) - bytes_read,
177                                  &mask, e)) <= 0)
178           goto alldone;
179         bytes_read += len;
180       }
181       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
182       /* Fix the endian */
183       client_chkpt.log = ntohl(client_chkpt.log);
184       client_chkpt.marker = ntohl(client_chkpt.marker);
185  
186       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
187         noitL(noit_error,
188               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
189               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
190               jcl->chkpt.log, jcl->chkpt.marker);
191         goto alldone;
192       }
193       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
194     }
195     if(sleeptime) sleep(sleeptime);
196   }
197
198  alldone:
199   e->opset->close(e->fd, &mask, e);
200   if(jcl) noit_jlog_closure_free(jcl);
201   if(ac) acceptor_closure_free(ac);
202   return NULL;
203 }
204
205 int
206 noit_jlog_handler(eventer_t e, int mask, void *closure,
207                      struct timeval *now) {
208   eventer_t newe;
209   pthread_t tid;
210   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
211   acceptor_closure_t *ac = closure;
212   noit_jlog_closure_t *jcl = ac->service_ctx;
213
214   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
215 socket_error:
216     /* Exceptions cause us to simply snip the connection */
217     eventer_remove_fd(e->fd);
218     e->opset->close(e->fd, &newmask, e);
219     if(jcl) noit_jlog_closure_free(jcl);
220     if(ac) acceptor_closure_free(ac);
221     return 0;
222   }
223
224   if(!ac->service_ctx) {
225     noit_log_stream_t ls;
226     const char *logname;
227     char path[PATH_MAX], subscriber[32], *sub;
228     jcl = ac->service_ctx = noit_jlog_closure_alloc();
229     if(!noit_hash_retr_str(ac->config,
230                            "log_transit_feed_name",
231                            strlen("log_transit_feed_name"),
232                            &logname)) {
233       noitL(noit_error, "No 'log_transit_feed_name' specified in log_transit.\n");
234       goto socket_error;
235     }
236     ls = noit_log_stream_find(logname);
237     if(!ls) {
238       noitL(noit_error, "Could not find log '%s' for log_transit.\n",
239             logname);
240       goto socket_error;
241     }
242     if(!ls->type || strcmp(ls->type, "jlog")) {
243       noitL(noit_error, "Log '%s' for log_transit is not a jlog.\n",
244             logname);
245       goto socket_error;
246     }
247     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
248       if(!ac->remote_cn) {
249         noitL(noit_error, "jlog transit started to unidentified party.\n");
250         goto socket_error;
251       }
252       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
253     }
254     else {
255       snprintf(subscriber, sizeof(subscriber),
256                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
257     }
258     jcl->subscriber = strdup(subscriber);
259
260     strlcpy(path, ls->path, sizeof(path));
261     sub = strchr(path, '(');
262     if(sub) {
263       char *esub = strchr(sub, ')');
264       if(esub) {
265         *esub = '\0';
266         *sub = '\0';
267         sub += 1;
268       }
269     }
270
271     jcl->jlog = jlog_new(path);
272     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
273       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1)
274         noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber,
275               jlog_ctx_err_string(jcl->jlog));
276     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
277       noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber,
278             jlog_ctx_err_string(jcl->jlog));
279       goto socket_error;
280     }
281   }
282
283   /* The jlog stuff is disk I/O and can block us.
284    * We'll create a new thread to just handle this connection.
285    */
286   eventer_remove_fd(e->fd);
287   newe = eventer_alloc();
288   memcpy(newe, e, sizeof(*e));
289   if(pthread_create(&tid, NULL, noit_jlog_thread_main, newe) == 0) {
290     return 0;
291   }
292
293   /* Undo our dup */
294   eventer_free(newe);
295   /* Creating the thread failed, close it down and deschedule. */
296   e->opset->close(e->fd, &newmask, e);
297   return 0;
298 }
Note: See TracBrowser for help on using the browser.