root/src/noit_jlog_listener.c

Revision 8fc890247213c9b9d2b20d5e0de617f6c3d41136, 12.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

xpg 4.2 is needed for POLLRDNORM

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "jlog/jlog_private.h"
40 #include "noit_jlog_listener.h"
41
42 #include <unistd.h>
43 #define __USE_XOPEN
44 #include <poll.h>
45 #define MAX_ROWS_AT_ONCE 1000
46 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
47
48 static noit_atomic32_t tmpfeedcounter = 0;
49
50 void
51 noit_jlog_listener_init() {
52   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
53   noit_control_dispatch_delegate(noit_control_dispatch,
54                                  NOIT_JLOG_DATA_FEED,
55                                  noit_jlog_handler);
56   noit_control_dispatch_delegate(noit_control_dispatch,
57                                  NOIT_JLOG_DATA_TEMP_FEED,
58                                  noit_jlog_handler);
59 }
60
61 typedef struct {
62   jlog_ctx *jlog;
63   char *subscriber;
64   jlog_id chkpt;
65   jlog_id start;
66   jlog_id finish;
67   int count;
68   int wants_shutdown;
69 } noit_jlog_closure_t;
70
71 noit_jlog_closure_t *
72 noit_jlog_closure_alloc(void) {
73   noit_jlog_closure_t *jcl;
74   jcl = calloc(1, sizeof(*jcl));
75   return jcl;
76 }
77
78 void
79 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
80   if(jcl->jlog) {
81     if(jcl->subscriber) {
82       if(jcl->subscriber[0] == '~')
83         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
84       free(jcl->subscriber);
85     }
86     jlog_ctx_close(jcl->jlog);
87   }
88   free(jcl);
89 }
90
91 static int
92 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
93   int w, sofar = 0;
94   while(l > sofar) {
95     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
96     if(w <= 0) return w;
97     sofar += w;
98   }
99   return sofar;
100 }
101 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
102
103 static int
104 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
105   jlog_message msg;
106   int mask;
107   u_int32_t n_count;
108   n_count = htonl(jcl->count);
109   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
110     return -1;
111   while(jcl->count > 0) {
112     int rv;
113     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
114     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
115       return -1;
116
117     /* Here we actually push the message */
118     payload.chkpt.log = htonl(jcl->start.log);
119     payload.chkpt.marker = htonl(jcl->start.marker);
120     payload.n_sec  = htonl(msg.header->tv_sec);
121     payload.n_usec = htonl(msg.header->tv_usec);
122     payload.n_len  = htonl(msg.mess_len);
123     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
124       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
125             rv, (int)sizeof(payload));
126       return -1;
127     }
128     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
129       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
130             rv, msg.mess_len);
131       return -1;
132     }
133     /* Note what the client must checkpoint */
134     jcl->chkpt = jcl->start;
135
136     JLOG_ID_ADVANCE(&jcl->start);
137     jcl->count--;
138   }
139   return 0;
140 }
141
142 void *
143 noit_jlog_thread_main(void *e_vptr) {
144   int mask, bytes_read;
145   eventer_t e = e_vptr;
146   acceptor_closure_t *ac = e->closure;
147   noit_jlog_closure_t *jcl = ac->service_ctx;
148   char inbuff[sizeof(jlog_id)];
149
150   eventer_set_fd_blocking(e->fd);
151
152   while(1) {
153     jlog_id client_chkpt;
154     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
155                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
156     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
157     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
158     if(jcl->count < 0) {
159       char idxfile[PATH_MAX];
160       noitL(noit_error, "jlog_ctx_read_interval: %s\n",
161             jlog_ctx_err_string(jcl->jlog));
162       switch (jlog_ctx_err(jcl->jlog)) {
163         case JLOG_ERR_FILE_CORRUPT:
164         case JLOG_ERR_IDX_CORRUPT:
165           jlog_repair_datafile(jcl->jlog, jcl->start.log);
166           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
167           noitL(noit_error,
168                 "jlog reconstructed, deleting corresponding index.\n");
169           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
170           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
171           unlink(idxfile);
172           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
173           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
174           unlink(idxfile);
175           goto alldone;
176           break;
177         default:
178           goto alldone;
179       }
180     }
181     if(jcl->count > MAX_ROWS_AT_ONCE) {
182       /* Artificially set down the range to make the batches a bit easier
183        * to handle on the stratcond/postgres end.
184        * However, we must have more data, so drop the sleeptime to 0
185        */
186       jcl->count = MAX_ROWS_AT_ONCE;
187       jcl->finish.marker = jcl->start.marker + jcl->count;
188       sleeptime = 0;
189     }
190     if(jcl->count > 0) {
191       if(noit_jlog_push(e, jcl)) {
192         goto alldone;
193       }
194       /* Read our jlog_id accounting for possibly short reads */
195       bytes_read = 0;
196       while(bytes_read < sizeof(jlog_id)) {
197         int len;
198         if((len = e->opset->read(e->fd, inbuff + bytes_read,
199                                  sizeof(jlog_id) - bytes_read,
200                                  &mask, e)) <= 0)
201           goto alldone;
202         bytes_read += len;
203       }
204       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
205       /* Fix the endian */
206       client_chkpt.log = ntohl(client_chkpt.log);
207       client_chkpt.marker = ntohl(client_chkpt.marker);
208  
209       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
210         noitL(noit_error,
211               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
212               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
213               jcl->chkpt.log, jcl->chkpt.marker);
214         goto alldone;
215       }
216       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
217     }
218     else {
219       /* we have nothing to write -- maybe we have no checks configured...
220        * If this is the case "forever", the remote might disconnect and
221        * we would never know. Do the painful work of detecting a
222        * disconnected client.
223        */
224       struct pollfd pfd;
225       pfd.fd = e->fd;
226       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
227       pfd.revents = 0;
228       if(poll(&pfd, 1, 0) != 0) {
229         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
230          * not be writing to us.  So, we know we can't have any legitimate
231          * data on this socket (true even though this is SSL). So, if we're
232          * here then "shit went wrong"
233          */
234         noitL(noit_error, "jlog client %s disconnected while idle\n",
235               ac->remote_cn);
236         goto alldone;
237       }
238     }
239     if(sleeptime) sleep(sleeptime);
240   }
241
242  alldone:
243   e->opset->close(e->fd, &mask, e);
244   if(jcl) noit_jlog_closure_free(jcl);
245   if(ac) acceptor_closure_free(ac);
246   return NULL;
247 }
248
249 int
250 noit_jlog_handler(eventer_t e, int mask, void *closure,
251                      struct timeval *now) {
252   eventer_t newe;
253   pthread_t tid;
254   pthread_attr_t tattr;
255   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
256   acceptor_closure_t *ac = closure;
257   noit_jlog_closure_t *jcl = ac->service_ctx;
258   char errbuff[256];
259   const char *errstr = "unknown error";
260
261   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
262     int len, nlen;
263 socket_error:
264     /* Exceptions cause us to simply snip the connection */
265     len = strlen(errstr);
266     nlen = htonl(0 - len);
267     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
268     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
269     eventer_remove_fd(e->fd);
270     e->opset->close(e->fd, &newmask, e);
271     if(jcl) noit_jlog_closure_free(jcl);
272     if(ac) acceptor_closure_free(ac);
273     return 0;
274   }
275
276   if(!ac->service_ctx) {
277     noit_log_stream_t ls;
278     const char *logname, *type;
279     char path[PATH_MAX], subscriber[256], *sub;
280     jcl = ac->service_ctx = noit_jlog_closure_alloc();
281     if(!noit_hash_retr_str(ac->config,
282                            "log_transit_feed_name",
283                            strlen("log_transit_feed_name"),
284                            &logname)) {
285       errstr = "No 'log_transit_feed_name' specified in log_transit.";
286       noitL(noit_error, "%s\n", errstr);
287       goto socket_error;
288     }
289     ls = noit_log_stream_find(logname);
290     if(!ls) {
291       snprintf(errbuff, sizeof(errbuff),
292                "Could not find log '%s' for log_transit.", logname);
293       errstr = errbuff;
294       noitL(noit_error, "%s\n", errstr);
295       goto socket_error;
296     }
297     type = noit_log_stream_get_type(ls);
298     if(!type || strcmp(type, "jlog")) {
299       snprintf(errbuff, sizeof(errbuff),
300                "Log '%s' for log_transit is not a jlog.", logname);
301       errstr = errbuff;
302       noitL(noit_error, "%s\n", errstr);
303       goto socket_error;
304     }
305     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
306       if(!ac->remote_cn) {
307         errstr = "jlog transit started to unidentified party.";
308         noitL(noit_error, "%s\n", errstr);
309         goto socket_error;
310       }
311       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
312     }
313     else {
314       snprintf(subscriber, sizeof(subscriber),
315                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
316     }
317     jcl->subscriber = strdup(subscriber);
318
319     strlcpy(path, noit_log_stream_get_path(ls), sizeof(path));
320     sub = strchr(path, '(');
321     if(sub) {
322       char *esub = strchr(sub, ')');
323       if(esub) {
324         *esub = '\0';
325         *sub = '\0';
326       }
327     }
328
329     jcl->jlog = jlog_new(path);
330     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
331       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
332         snprintf(errbuff, sizeof(errbuff),
333                  "jlog reader[%s] error: %s", jcl->subscriber,
334                  jlog_ctx_err_string(jcl->jlog));
335         errstr = errbuff;
336         noitL(noit_error, "%s\n", errstr);
337       }
338     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
339       snprintf(errbuff, sizeof(errbuff),
340                "jlog reader[%s] error: %s", jcl->subscriber,
341                jlog_ctx_err_string(jcl->jlog));
342       errstr = errbuff;
343       noitL(noit_error, "%s\n", errstr);
344       goto socket_error;
345     }
346   }
347
348   /* The jlog stuff is disk I/O and can block us.
349    * We'll create a new thread to just handle this connection.
350    */
351   eventer_remove_fd(e->fd);
352   newe = eventer_alloc();
353   memcpy(newe, e, sizeof(*e));
354   pthread_attr_init(&tattr);
355   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
356   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
357     return 0;
358   }
359
360   /* Undo our dup */
361   eventer_free(newe);
362   /* Creating the thread failed, close it down and deschedule. */
363   e->opset->close(e->fd, &newmask, e);
364   return 0;
365 }
Note: See TracBrowser for help on using the browser.