root/src/noit_jlog_listener.c

Revision e944e6d871dcff933a8da92b6efd7834bce03e59, 11.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 years ago)

attempt jlog repairs inline

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "jlog/jlog_private.h"
40 #include "noit_jlog_listener.h"
41
42 #include <unistd.h>
43 #define MAX_ROWS_AT_ONCE 1000
44 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
45
46 static noit_atomic32_t tmpfeedcounter = 0;
47
48 void
49 noit_jlog_listener_init() {
50   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
51   noit_control_dispatch_delegate(noit_control_dispatch,
52                                  NOIT_JLOG_DATA_FEED,
53                                  noit_jlog_handler);
54   noit_control_dispatch_delegate(noit_control_dispatch,
55                                  NOIT_JLOG_DATA_TEMP_FEED,
56                                  noit_jlog_handler);
57 }
58
59 typedef struct {
60   jlog_ctx *jlog;
61   char *subscriber;
62   jlog_id chkpt;
63   jlog_id start;
64   jlog_id finish;
65   int count;
66   int wants_shutdown;
67 } noit_jlog_closure_t;
68
69 noit_jlog_closure_t *
70 noit_jlog_closure_alloc(void) {
71   noit_jlog_closure_t *jcl;
72   jcl = calloc(1, sizeof(*jcl));
73   return jcl;
74 }
75
76 void
77 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
78   if(jcl->jlog) {
79     if(jcl->subscriber) {
80       if(jcl->subscriber[0] == '~')
81         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
82       free(jcl->subscriber);
83     }
84     jlog_ctx_close(jcl->jlog);
85   }
86   free(jcl);
87 }
88
89 static int
90 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
91   int w, sofar = 0;
92   while(l > sofar) {
93     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
94     if(w <= 0) return w;
95     sofar += w;
96   }
97   return sofar;
98 }
99 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
100
101 static int
102 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
103   jlog_message msg;
104   int mask;
105   u_int32_t n_count;
106   n_count = htonl(jcl->count);
107   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
108     return -1;
109   while(jcl->count > 0) {
110     int rv;
111     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
112     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
113       return -1;
114
115     /* Here we actually push the message */
116     payload.chkpt.log = htonl(jcl->start.log);
117     payload.chkpt.marker = htonl(jcl->start.marker);
118     payload.n_sec  = htonl(msg.header->tv_sec);
119     payload.n_usec = htonl(msg.header->tv_usec);
120     payload.n_len  = htonl(msg.mess_len);
121     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
122       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
123             rv, (int)sizeof(payload));
124       return -1;
125     }
126     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
127       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
128             rv, msg.mess_len);
129       return -1;
130     }
131     /* Note what the client must checkpoint */
132     jcl->chkpt = jcl->start;
133
134     JLOG_ID_ADVANCE(&jcl->start);
135     jcl->count--;
136   }
137   return 0;
138 }
139
140 void *
141 noit_jlog_thread_main(void *e_vptr) {
142   int mask, bytes_read;
143   eventer_t e = e_vptr;
144   acceptor_closure_t *ac = e->closure;
145   noit_jlog_closure_t *jcl = ac->service_ctx;
146   char inbuff[sizeof(jlog_id)];
147
148   eventer_set_fd_blocking(e->fd);
149
150   while(1) {
151     jlog_id client_chkpt;
152     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
153                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
154     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
155     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
156     if(jcl->count < 0) {
157       char idxfile[PATH_MAX];
158       noitL(noit_error, "jlog_ctx_read_interval: %s\n",
159             jlog_ctx_err_string(jcl->jlog));
160       switch (jlog_ctx_err(jcl->jlog)) {
161         case JLOG_ERR_FILE_CORRUPT:
162         case JLOG_ERR_IDX_CORRUPT:
163           jlog_repair_datafile(jcl->jlog, jcl->start.log);
164           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
165           noitL(noit_error,
166                 "jlog reconstructed, deleting corresponding index.\n");
167           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
168           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
169           unlink(idxfile);
170           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
171           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
172           unlink(idxfile);
173           goto alldone;
174           break;
175         default:
176           goto alldone;
177       }
178     }
179     if(jcl->count > MAX_ROWS_AT_ONCE) {
180       /* Artificially set down the range to make the batches a bit easier
181        * to handle on the stratcond/postgres end.
182        * However, we must have more data, so drop the sleeptime to 0
183        */
184       jcl->count = MAX_ROWS_AT_ONCE;
185       jcl->finish.marker = jcl->start.marker + jcl->count;
186       sleeptime = 0;
187     }
188     if(jcl->count > 0) {
189       if(noit_jlog_push(e, jcl)) {
190         goto alldone;
191       }
192       /* Read our jlog_id accounting for possibly short reads */
193       bytes_read = 0;
194       while(bytes_read < sizeof(jlog_id)) {
195         int len;
196         if((len = e->opset->read(e->fd, inbuff + bytes_read,
197                                  sizeof(jlog_id) - bytes_read,
198                                  &mask, e)) <= 0)
199           goto alldone;
200         bytes_read += len;
201       }
202       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
203       /* Fix the endian */
204       client_chkpt.log = ntohl(client_chkpt.log);
205       client_chkpt.marker = ntohl(client_chkpt.marker);
206  
207       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
208         noitL(noit_error,
209               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
210               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
211               jcl->chkpt.log, jcl->chkpt.marker);
212         goto alldone;
213       }
214       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
215     }
216     if(sleeptime) sleep(sleeptime);
217   }
218
219  alldone:
220   e->opset->close(e->fd, &mask, e);
221   if(jcl) noit_jlog_closure_free(jcl);
222   if(ac) acceptor_closure_free(ac);
223   return NULL;
224 }
225
226 int
227 noit_jlog_handler(eventer_t e, int mask, void *closure,
228                      struct timeval *now) {
229   eventer_t newe;
230   pthread_t tid;
231   pthread_attr_t tattr;
232   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
233   acceptor_closure_t *ac = closure;
234   noit_jlog_closure_t *jcl = ac->service_ctx;
235   char errbuff[256];
236   const char *errstr = "unknown error";
237
238   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
239     int len, nlen;
240 socket_error:
241     /* Exceptions cause us to simply snip the connection */
242     len = strlen(errstr);
243     nlen = htonl(0 - len);
244     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
245     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
246     eventer_remove_fd(e->fd);
247     e->opset->close(e->fd, &newmask, e);
248     if(jcl) noit_jlog_closure_free(jcl);
249     if(ac) acceptor_closure_free(ac);
250     return 0;
251   }
252
253   if(!ac->service_ctx) {
254     noit_log_stream_t ls;
255     const char *logname, *type;
256     char path[PATH_MAX], subscriber[256], *sub;
257     jcl = ac->service_ctx = noit_jlog_closure_alloc();
258     if(!noit_hash_retr_str(ac->config,
259                            "log_transit_feed_name",
260                            strlen("log_transit_feed_name"),
261                            &logname)) {
262       errstr = "No 'log_transit_feed_name' specified in log_transit.";
263       noitL(noit_error, "%s\n", errstr);
264       goto socket_error;
265     }
266     ls = noit_log_stream_find(logname);
267     if(!ls) {
268       snprintf(errbuff, sizeof(errbuff),
269                "Could not find log '%s' for log_transit.", logname);
270       errstr = errbuff;
271       noitL(noit_error, "%s\n", errstr);
272       goto socket_error;
273     }
274     type = noit_log_stream_get_type(ls);
275     if(!type || strcmp(type, "jlog")) {
276       snprintf(errbuff, sizeof(errbuff),
277                "Log '%s' for log_transit is not a jlog.", logname);
278       errstr = errbuff;
279       noitL(noit_error, "%s\n", errstr);
280       goto socket_error;
281     }
282     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
283       if(!ac->remote_cn) {
284         errstr = "jlog transit started to unidentified party.";
285         noitL(noit_error, "%s\n", errstr);
286         goto socket_error;
287       }
288       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
289     }
290     else {
291       snprintf(subscriber, sizeof(subscriber),
292                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
293     }
294     jcl->subscriber = strdup(subscriber);
295
296     strlcpy(path, noit_log_stream_get_path(ls), sizeof(path));
297     sub = strchr(path, '(');
298     if(sub) {
299       char *esub = strchr(sub, ')');
300       if(esub) {
301         *esub = '\0';
302         *sub = '\0';
303         sub += 1;
304       }
305     }
306
307     jcl->jlog = jlog_new(path);
308     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
309       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
310         snprintf(errbuff, sizeof(errbuff),
311                  "jlog reader[%s] error: %s", jcl->subscriber,
312                  jlog_ctx_err_string(jcl->jlog));
313         errstr = errbuff;
314         noitL(noit_error, "%s\n", errstr);
315       }
316     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
317       snprintf(errbuff, sizeof(errbuff),
318                "jlog reader[%s] error: %s", jcl->subscriber,
319                jlog_ctx_err_string(jcl->jlog));
320       errstr = errbuff;
321       noitL(noit_error, "%s\n", errstr);
322       goto socket_error;
323     }
324   }
325
326   /* The jlog stuff is disk I/O and can block us.
327    * We'll create a new thread to just handle this connection.
328    */
329   eventer_remove_fd(e->fd);
330   newe = eventer_alloc();
331   memcpy(newe, e, sizeof(*e));
332   pthread_attr_init(&tattr);
333   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
334   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
335     return 0;
336   }
337
338   /* Undo our dup */
339   eventer_free(newe);
340   /* Creating the thread failed, close it down and deschedule. */
341   e->opset->close(e->fd, &newmask, e);
342   return 0;
343 }
Note: See TracBrowser for help on using the browser.