root/src/noit_jlog_listener.c

Revision 811c412ce800f3a4eeaa9c2bc4d5c35fe9982384, 13.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 3 years ago)

selfcheck introspection on when data is pulled by stratcon

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "jlog/jlog_private.h"
40 #include "noit_jlog_listener.h"
41
42 #include <unistd.h>
43 #define __USE_XOPEN
44 #include <poll.h>
45 #define MAX_ROWS_AT_ONCE 1000
46 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
47
48 static noit_atomic32_t tmpfeedcounter = 0;
49
50 void
51 noit_jlog_listener_init() {
52   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
53   noit_control_dispatch_delegate(noit_control_dispatch,
54                                  NOIT_JLOG_DATA_FEED,
55                                  noit_jlog_handler);
56   noit_control_dispatch_delegate(noit_control_dispatch,
57                                  NOIT_JLOG_DATA_TEMP_FEED,
58                                  noit_jlog_handler);
59 }
60
61 typedef struct {
62   jlog_ctx *jlog;
63   char *subscriber;
64   jlog_id chkpt;
65   jlog_id start;
66   jlog_id finish;
67   jlog_feed_stats_t *feed_stats;
68   int count;
69   int wants_shutdown;
70 } noit_jlog_closure_t;
71
72 noit_jlog_closure_t *
73 noit_jlog_closure_alloc(void) {
74   noit_jlog_closure_t *jcl;
75   jcl = calloc(1, sizeof(*jcl));
76   return jcl;
77 }
78
79 void
80 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
81   if(jcl->jlog) {
82     if(jcl->subscriber) {
83       if(jcl->subscriber[0] == '~')
84         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
85       free(jcl->subscriber);
86     }
87     jlog_ctx_close(jcl->jlog);
88   }
89   free(jcl);
90 }
91
92 static noit_hash_table feed_stats = NOIT_HASH_EMPTY;
93
94 jlog_feed_stats_t *
95 noit_jlog_feed_stats(const char *sub) {
96   void *vs = NULL;
97   jlog_feed_stats_t *s = NULL;
98   if(noit_hash_retrieve(&feed_stats, sub, strlen(sub), &vs))
99     return (jlog_feed_stats_t *)vs;
100   s = calloc(1, sizeof(*s));
101   s->feed_name = strdup(sub);
102   noit_hash_store(&feed_stats, s->feed_name, strlen(s->feed_name), s);
103   return s;
104 }
105 int
106 noit_jlog_foreach_feed_stats(int (*f)(jlog_feed_stats_t *, void *),
107                              void *c) {
108   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
109   const char *key;
110   int klen, cnt = 0;
111   void *vs;
112   while(noit_hash_next(&feed_stats, &iter, &key, &klen, &vs)) {
113     cnt += f((jlog_feed_stats_t *)vs, c);
114   }
115   return cnt;
116 }
117 static int
118 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
119   int w, sofar = 0;
120   while(l > sofar) {
121     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
122     if(w <= 0) return w;
123     sofar += w;
124   }
125   return sofar;
126 }
127 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
128
129 static int
130 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
131   jlog_message msg;
132   int mask;
133   u_int32_t n_count;
134   n_count = htonl(jcl->count);
135   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
136     return -1;
137   while(jcl->count > 0) {
138     int rv;
139     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
140     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
141       return -1;
142
143     /* Here we actually push the message */
144     payload.chkpt.log = htonl(jcl->start.log);
145     payload.chkpt.marker = htonl(jcl->start.marker);
146     payload.n_sec  = htonl(msg.header->tv_sec);
147     payload.n_usec = htonl(msg.header->tv_usec);
148     payload.n_len  = htonl(msg.mess_len);
149     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
150       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
151             rv, (int)sizeof(payload));
152       return -1;
153     }
154     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
155       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
156             rv, msg.mess_len);
157       return -1;
158     }
159     /* Note what the client must checkpoint */
160     jcl->chkpt = jcl->start;
161
162     JLOG_ID_ADVANCE(&jcl->start);
163     jcl->count--;
164   }
165   return 0;
166 }
167
168 void *
169 noit_jlog_thread_main(void *e_vptr) {
170   int mask, bytes_read;
171   eventer_t e = e_vptr;
172   acceptor_closure_t *ac = e->closure;
173   noit_jlog_closure_t *jcl = ac->service_ctx;
174   char inbuff[sizeof(jlog_id)];
175
176   eventer_set_fd_blocking(e->fd);
177
178   while(1) {
179     jlog_id client_chkpt;
180     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
181                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
182     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
183     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
184     if(jcl->count < 0) {
185       char idxfile[PATH_MAX];
186       noitL(noit_error, "jlog_ctx_read_interval: %s\n",
187             jlog_ctx_err_string(jcl->jlog));
188       switch (jlog_ctx_err(jcl->jlog)) {
189         case JLOG_ERR_FILE_CORRUPT:
190         case JLOG_ERR_IDX_CORRUPT:
191           jlog_repair_datafile(jcl->jlog, jcl->start.log);
192           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
193           noitL(noit_error,
194                 "jlog reconstructed, deleting corresponding index.\n");
195           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
196           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
197           unlink(idxfile);
198           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
199           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
200           unlink(idxfile);
201           goto alldone;
202           break;
203         default:
204           goto alldone;
205       }
206     }
207     if(jcl->count > MAX_ROWS_AT_ONCE) {
208       /* Artificially set down the range to make the batches a bit easier
209        * to handle on the stratcond/postgres end.
210        * However, we must have more data, so drop the sleeptime to 0
211        */
212       jcl->count = MAX_ROWS_AT_ONCE;
213       jcl->finish.marker = jcl->start.marker + jcl->count;
214       sleeptime = 0;
215     }
216     if(jcl->count > 0) {
217       if(noit_jlog_push(e, jcl)) {
218         goto alldone;
219       }
220       /* Read our jlog_id accounting for possibly short reads */
221       bytes_read = 0;
222       while(bytes_read < sizeof(jlog_id)) {
223         int len;
224         if((len = e->opset->read(e->fd, inbuff + bytes_read,
225                                  sizeof(jlog_id) - bytes_read,
226                                  &mask, e)) <= 0)
227           goto alldone;
228         bytes_read += len;
229       }
230       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
231       /* Fix the endian */
232       client_chkpt.log = ntohl(client_chkpt.log);
233       client_chkpt.marker = ntohl(client_chkpt.marker);
234  
235       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
236         noitL(noit_error,
237               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
238               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
239               jcl->chkpt.log, jcl->chkpt.marker);
240         goto alldone;
241       }
242       gettimeofday(&jcl->feed_stats->last_checkpoint, NULL);
243       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
244     }
245     else {
246       /* we have nothing to write -- maybe we have no checks configured...
247        * If this is the case "forever", the remote might disconnect and
248        * we would never know. Do the painful work of detecting a
249        * disconnected client.
250        */
251       struct pollfd pfd;
252       pfd.fd = e->fd;
253       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
254       pfd.revents = 0;
255       if(poll(&pfd, 1, 0) != 0) {
256         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
257          * not be writing to us.  So, we know we can't have any legitimate
258          * data on this socket (true even though this is SSL). So, if we're
259          * here then "shit went wrong"
260          */
261         noitL(noit_error, "jlog client %s disconnected while idle\n",
262               ac->remote_cn);
263         goto alldone;
264       }
265     }
266     if(sleeptime) sleep(sleeptime);
267   }
268
269  alldone:
270   e->opset->close(e->fd, &mask, e);
271   noit_atomic_dec32(&jcl->feed_stats->connections);
272   if(jcl) noit_jlog_closure_free(jcl);
273   if(ac) acceptor_closure_free(ac);
274   return NULL;
275 }
276
277 int
278 noit_jlog_handler(eventer_t e, int mask, void *closure,
279                      struct timeval *now) {
280   eventer_t newe;
281   pthread_t tid;
282   pthread_attr_t tattr;
283   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
284   acceptor_closure_t *ac = closure;
285   noit_jlog_closure_t *jcl = ac->service_ctx;
286   char errbuff[256];
287   const char *errstr = "unknown error";
288
289   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
290     int len, nlen;
291 socket_error:
292     /* Exceptions cause us to simply snip the connection */
293     len = strlen(errstr);
294     nlen = htonl(0 - len);
295     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
296     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
297     eventer_remove_fd(e->fd);
298     e->opset->close(e->fd, &newmask, e);
299     if(jcl) noit_jlog_closure_free(jcl);
300     if(ac) acceptor_closure_free(ac);
301     return 0;
302   }
303
304   if(!ac->service_ctx) {
305     noit_log_stream_t ls;
306     const char *logname, *type;
307     char path[PATH_MAX], subscriber[256], *sub;
308     jcl = ac->service_ctx = noit_jlog_closure_alloc();
309     if(!noit_hash_retr_str(ac->config,
310                            "log_transit_feed_name",
311                            strlen("log_transit_feed_name"),
312                            &logname)) {
313       errstr = "No 'log_transit_feed_name' specified in log_transit.";
314       noitL(noit_error, "%s\n", errstr);
315       goto socket_error;
316     }
317     ls = noit_log_stream_find(logname);
318     if(!ls) {
319       snprintf(errbuff, sizeof(errbuff),
320                "Could not find log '%s' for log_transit.", logname);
321       errstr = errbuff;
322       noitL(noit_error, "%s\n", errstr);
323       goto socket_error;
324     }
325     type = noit_log_stream_get_type(ls);
326     if(!type || strcmp(type, "jlog")) {
327       snprintf(errbuff, sizeof(errbuff),
328                "Log '%s' for log_transit is not a jlog.", logname);
329       errstr = errbuff;
330       noitL(noit_error, "%s\n", errstr);
331       goto socket_error;
332     }
333     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
334       if(!ac->remote_cn) {
335         errstr = "jlog transit started to unidentified party.";
336         noitL(noit_error, "%s\n", errstr);
337         goto socket_error;
338       }
339       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
340       jcl->feed_stats = noit_jlog_feed_stats(subscriber);
341     }
342     else {
343       jcl->feed_stats = noit_jlog_feed_stats("~");
344       snprintf(subscriber, sizeof(subscriber),
345                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
346     }
347     jcl->subscriber = strdup(subscriber);
348
349     strlcpy(path, noit_log_stream_get_path(ls), sizeof(path));
350     sub = strchr(path, '(');
351     if(sub) {
352       char *esub = strchr(sub, ')');
353       if(esub) {
354         *esub = '\0';
355         *sub = '\0';
356       }
357     }
358
359     jcl->jlog = jlog_new(path);
360     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
361       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
362         snprintf(errbuff, sizeof(errbuff),
363                  "jlog reader[%s] error: %s", jcl->subscriber,
364                  jlog_ctx_err_string(jcl->jlog));
365         errstr = errbuff;
366         noitL(noit_error, "%s\n", errstr);
367       }
368     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
369       snprintf(errbuff, sizeof(errbuff),
370                "jlog reader[%s] error: %s", jcl->subscriber,
371                jlog_ctx_err_string(jcl->jlog));
372       errstr = errbuff;
373       noitL(noit_error, "%s\n", errstr);
374       goto socket_error;
375     }
376   }
377
378   /* The jlog stuff is disk I/O and can block us.
379    * We'll create a new thread to just handle this connection.
380    */
381   eventer_remove_fd(e->fd);
382   newe = eventer_alloc();
383   memcpy(newe, e, sizeof(*e));
384   pthread_attr_init(&tattr);
385   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
386   gettimeofday(&jcl->feed_stats->last_connection, NULL);
387   noit_atomic_inc32(&jcl->feed_stats->connections);
388   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
389     return 0;
390   }
391
392   /* Undo our dup */
393   eventer_free(newe);
394   /* Creating the thread failed, close it down and deschedule. */
395   e->opset->close(e->fd, &newmask, e);
396   return 0;
397 }
Note: See TracBrowser for help on using the browser.