root/src/noit_jlog_listener.c

Revision 2148d594359c81ffc07674617e97eef16818690f, 13.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

Support c99 compile

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "jlog/jlog_private.h"
40 #include "noit_jlog_listener.h"
41
42 #include <unistd.h>
43 #include <poll.h>
44 #define MAX_ROWS_AT_ONCE 1000
45 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
46
47 static noit_atomic32_t tmpfeedcounter = 0;
48
49 void
50 noit_jlog_listener_init() {
51   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
52   noit_control_dispatch_delegate(noit_control_dispatch,
53                                  NOIT_JLOG_DATA_FEED,
54                                  noit_jlog_handler);
55   noit_control_dispatch_delegate(noit_control_dispatch,
56                                  NOIT_JLOG_DATA_TEMP_FEED,
57                                  noit_jlog_handler);
58 }
59
60 typedef struct {
61   jlog_ctx *jlog;
62   char *subscriber;
63   jlog_id chkpt;
64   jlog_id start;
65   jlog_id finish;
66   jlog_feed_stats_t *feed_stats;
67   int count;
68   int wants_shutdown;
69 } noit_jlog_closure_t;
70
71 noit_jlog_closure_t *
72 noit_jlog_closure_alloc(void) {
73   noit_jlog_closure_t *jcl;
74   jcl = calloc(1, sizeof(*jcl));
75   return jcl;
76 }
77
78 void
79 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
80   if(jcl->jlog) {
81     if(jcl->subscriber) {
82       if(jcl->subscriber[0] == '~')
83         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
84       free(jcl->subscriber);
85     }
86     jlog_ctx_close(jcl->jlog);
87   }
88   free(jcl);
89 }
90
91 static noit_hash_table feed_stats = NOIT_HASH_EMPTY;
92
93 jlog_feed_stats_t *
94 noit_jlog_feed_stats(const char *sub) {
95   void *vs = NULL;
96   jlog_feed_stats_t *s = NULL;
97   if(noit_hash_retrieve(&feed_stats, sub, strlen(sub), &vs))
98     return (jlog_feed_stats_t *)vs;
99   s = calloc(1, sizeof(*s));
100   s->feed_name = strdup(sub);
101   noit_hash_store(&feed_stats, s->feed_name, strlen(s->feed_name), s);
102   return s;
103 }
104 int
105 noit_jlog_foreach_feed_stats(int (*f)(jlog_feed_stats_t *, void *),
106                              void *c) {
107   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
108   const char *key;
109   int klen, cnt = 0;
110   void *vs;
111   while(noit_hash_next(&feed_stats, &iter, &key, &klen, &vs)) {
112     cnt += f((jlog_feed_stats_t *)vs, c);
113   }
114   return cnt;
115 }
116 static int
117 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
118   int w, sofar = 0;
119   while(l > sofar) {
120     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
121     if(w <= 0) return w;
122     sofar += w;
123   }
124   return sofar;
125 }
126 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
127
128 static int
129 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
130   jlog_message msg;
131   int mask;
132   u_int32_t n_count;
133   n_count = htonl(jcl->count);
134   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
135     return -1;
136   while(jcl->count > 0) {
137     int rv;
138     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
139     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
140       return -1;
141
142     /* Here we actually push the message */
143     payload.chkpt.log = htonl(jcl->start.log);
144     payload.chkpt.marker = htonl(jcl->start.marker);
145     payload.n_sec  = htonl(msg.header->tv_sec);
146     payload.n_usec = htonl(msg.header->tv_usec);
147     payload.n_len  = htonl(msg.mess_len);
148     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
149       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
150             rv, (int)sizeof(payload));
151       return -1;
152     }
153     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
154       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
155             rv, msg.mess_len);
156       return -1;
157     }
158     /* Note what the client must checkpoint */
159     jcl->chkpt = jcl->start;
160
161     JLOG_ID_ADVANCE(&jcl->start);
162     jcl->count--;
163   }
164   return 0;
165 }
166
167 void *
168 noit_jlog_thread_main(void *e_vptr) {
169   int mask, bytes_read;
170   eventer_t e = e_vptr;
171   acceptor_closure_t *ac = e->closure;
172   noit_jlog_closure_t *jcl = ac->service_ctx;
173   char inbuff[sizeof(jlog_id)];
174
175   eventer_set_fd_blocking(e->fd);
176
177   while(1) {
178     jlog_id client_chkpt;
179     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
180                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
181     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
182     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
183     if(jcl->count < 0) {
184       char idxfile[PATH_MAX];
185       noitL(noit_error, "jlog_ctx_read_interval: %s\n",
186             jlog_ctx_err_string(jcl->jlog));
187       switch (jlog_ctx_err(jcl->jlog)) {
188         case JLOG_ERR_FILE_CORRUPT:
189         case JLOG_ERR_IDX_CORRUPT:
190           jlog_repair_datafile(jcl->jlog, jcl->start.log);
191           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
192           noitL(noit_error,
193                 "jlog reconstructed, deleting corresponding index.\n");
194           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
195           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
196           unlink(idxfile);
197           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
198           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
199           unlink(idxfile);
200           goto alldone;
201           break;
202         default:
203           goto alldone;
204       }
205     }
206     if(jcl->count > MAX_ROWS_AT_ONCE) {
207       /* Artificially set down the range to make the batches a bit easier
208        * to handle on the stratcond/postgres end.
209        * However, we must have more data, so drop the sleeptime to 0
210        */
211       jcl->count = MAX_ROWS_AT_ONCE;
212       jcl->finish.marker = jcl->start.marker + jcl->count;
213       sleeptime = 0;
214     }
215     if(jcl->count > 0) {
216       if(noit_jlog_push(e, jcl)) {
217         goto alldone;
218       }
219       /* Read our jlog_id accounting for possibly short reads */
220       bytes_read = 0;
221       while(bytes_read < sizeof(jlog_id)) {
222         int len;
223         if((len = e->opset->read(e->fd, inbuff + bytes_read,
224                                  sizeof(jlog_id) - bytes_read,
225                                  &mask, e)) <= 0)
226           goto alldone;
227         bytes_read += len;
228       }
229       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
230       /* Fix the endian */
231       client_chkpt.log = ntohl(client_chkpt.log);
232       client_chkpt.marker = ntohl(client_chkpt.marker);
233  
234       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
235         noitL(noit_error,
236               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
237               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
238               jcl->chkpt.log, jcl->chkpt.marker);
239         goto alldone;
240       }
241       gettimeofday(&jcl->feed_stats->last_checkpoint, NULL);
242       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
243     }
244     else {
245       /* we have nothing to write -- maybe we have no checks configured...
246        * If this is the case "forever", the remote might disconnect and
247        * we would never know. Do the painful work of detecting a
248        * disconnected client.
249        */
250       struct pollfd pfd;
251       pfd.fd = e->fd;
252       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
253       pfd.revents = 0;
254       if(poll(&pfd, 1, 0) != 0) {
255         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
256          * not be writing to us.  So, we know we can't have any legitimate
257          * data on this socket (true even though this is SSL). So, if we're
258          * here then "shit went wrong"
259          */
260         noitL(noit_error, "jlog client %s disconnected while idle\n",
261               ac->remote_cn);
262         goto alldone;
263       }
264     }
265     if(sleeptime) sleep(sleeptime);
266   }
267
268  alldone:
269   e->opset->close(e->fd, &mask, e);
270   noit_atomic_dec32(&jcl->feed_stats->connections);
271   if(jcl) noit_jlog_closure_free(jcl);
272   if(ac) acceptor_closure_free(ac);
273   return NULL;
274 }
275
276 int
277 noit_jlog_handler(eventer_t e, int mask, void *closure,
278                      struct timeval *now) {
279   eventer_t newe;
280   pthread_t tid;
281   pthread_attr_t tattr;
282   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
283   acceptor_closure_t *ac = closure;
284   noit_jlog_closure_t *jcl = ac->service_ctx;
285   char errbuff[256];
286   const char *errstr = "unknown error";
287
288   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
289     int len, nlen;
290 socket_error:
291     /* Exceptions cause us to simply snip the connection */
292     len = strlen(errstr);
293     nlen = htonl(0 - len);
294     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
295     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
296     eventer_remove_fd(e->fd);
297     e->opset->close(e->fd, &newmask, e);
298     if(jcl) noit_jlog_closure_free(jcl);
299     if(ac) acceptor_closure_free(ac);
300     return 0;
301   }
302
303   if(!ac->service_ctx) {
304     noit_log_stream_t ls;
305     const char *logname, *type;
306     char path[PATH_MAX], subscriber[256], *sub;
307     jcl = ac->service_ctx = noit_jlog_closure_alloc();
308     if(!noit_hash_retr_str(ac->config,
309                            "log_transit_feed_name",
310                            strlen("log_transit_feed_name"),
311                            &logname)) {
312       errstr = "No 'log_transit_feed_name' specified in log_transit.";
313       noitL(noit_error, "%s\n", errstr);
314       goto socket_error;
315     }
316     ls = noit_log_stream_find(logname);
317     if(!ls) {
318       snprintf(errbuff, sizeof(errbuff),
319                "Could not find log '%s' for log_transit.", logname);
320       errstr = errbuff;
321       noitL(noit_error, "%s\n", errstr);
322       goto socket_error;
323     }
324     type = noit_log_stream_get_type(ls);
325     if(!type || strcmp(type, "jlog")) {
326       snprintf(errbuff, sizeof(errbuff),
327                "Log '%s' for log_transit is not a jlog.", logname);
328       errstr = errbuff;
329       noitL(noit_error, "%s\n", errstr);
330       goto socket_error;
331     }
332     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
333       if(!ac->remote_cn) {
334         errstr = "jlog transit started to unidentified party.";
335         noitL(noit_error, "%s\n", errstr);
336         goto socket_error;
337       }
338       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
339       jcl->feed_stats = noit_jlog_feed_stats(subscriber);
340     }
341     else {
342       jcl->feed_stats = noit_jlog_feed_stats("~");
343       snprintf(subscriber, sizeof(subscriber),
344                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
345     }
346     jcl->subscriber = strdup(subscriber);
347
348     strlcpy(path, noit_log_stream_get_path(ls), sizeof(path));
349     sub = strchr(path, '(');
350     if(sub) {
351       char *esub = strchr(sub, ')');
352       if(esub) {
353         *esub = '\0';
354         *sub = '\0';
355       }
356     }
357
358     jcl->jlog = jlog_new(path);
359     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
360       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
361         snprintf(errbuff, sizeof(errbuff),
362                  "jlog reader[%s] error: %s", jcl->subscriber,
363                  jlog_ctx_err_string(jcl->jlog));
364         errstr = errbuff;
365         noitL(noit_error, "%s\n", errstr);
366       }
367     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
368       snprintf(errbuff, sizeof(errbuff),
369                "jlog reader[%s] error: %s", jcl->subscriber,
370                jlog_ctx_err_string(jcl->jlog));
371       errstr = errbuff;
372       noitL(noit_error, "%s\n", errstr);
373       goto socket_error;
374     }
375   }
376
377   /* The jlog stuff is disk I/O and can block us.
378    * We'll create a new thread to just handle this connection.
379    */
380   eventer_remove_fd(e->fd);
381   newe = eventer_alloc();
382   memcpy(newe, e, sizeof(*e));
383   pthread_attr_init(&tattr);
384   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
385   gettimeofday(&jcl->feed_stats->last_connection, NULL);
386   noit_atomic_inc32(&jcl->feed_stats->connections);
387   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
388     return 0;
389   }
390
391   /* Undo our dup */
392   eventer_free(newe);
393   /* Creating the thread failed, close it down and deschedule. */
394   e->opset->close(e->fd, &newmask, e);
395   return 0;
396 }
Note: See TracBrowser for help on using the browser.