root/src/noit_jlog_listener.c

Revision d9cc0191f9140478395206d52f3012337ba58d7d, 17.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 4 days ago)

implement a 1 second linear backoff on jlog sends

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_memory.h"
38 #include "utils/noit_log.h"
39 #include "jlog/jlog.h"
40 #include "jlog/jlog_private.h"
41 #include "noit_jlog_listener.h"
42 #include "noit_rest.h"
43
44 #include <unistd.h>
45 #include <poll.h>
46 #include <assert.h>
47 #define MAX_ROWS_AT_ONCE 1000
48 #define DEFAULT_MSECONDS_BETWEEN_BATCHES 10000
49 #define DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES 500
50
51 static noit_atomic32_t tmpfeedcounter = 0;
52 static int rest_show_feed(noit_http_rest_closure_t *restc,
53                           int npats, char **pats);
54 static int rest_delete_feed(noit_http_rest_closure_t *restc,
55                             int npats, char **pats);
56
57 void
58 noit_jlog_listener_init() {
59   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
60   noit_control_dispatch_delegate(noit_control_dispatch,
61                                  NOIT_JLOG_DATA_FEED,
62                                  noit_jlog_handler);
63   noit_control_dispatch_delegate(noit_control_dispatch,
64                                  NOIT_JLOG_DATA_TEMP_FEED,
65                                  noit_jlog_handler);
66   assert(noit_http_rest_register_auth(
67     "GET", "/", "^feed$",
68     rest_show_feed, noit_http_rest_client_cert_auth
69   ) == 0);
70   assert(noit_http_rest_register_auth(
71     "DELETE", "/feed/", "^(.+)$",
72     rest_delete_feed, noit_http_rest_client_cert_auth
73   ) == 0);
74 }
75
76 typedef struct {
77   jlog_ctx *jlog;
78   char *subscriber;
79   jlog_id chkpt;
80   jlog_id start;
81   jlog_id finish;
82   jlog_feed_stats_t *feed_stats;
83   int count;
84   int wants_shutdown;
85 } noit_jlog_closure_t;
86
87 noit_jlog_closure_t *
88 noit_jlog_closure_alloc(void) {
89   noit_jlog_closure_t *jcl;
90   jcl = calloc(1, sizeof(*jcl));
91   return jcl;
92 }
93
94 void
95 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
96   if(jcl->jlog) {
97     if(jcl->subscriber) {
98       if(jcl->subscriber[0] == '~')
99         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
100       free(jcl->subscriber);
101     }
102     jlog_ctx_close(jcl->jlog);
103   }
104   free(jcl);
105 }
106
107 static noit_hash_table feed_stats = NOIT_HASH_EMPTY;
108
109 jlog_feed_stats_t *
110 noit_jlog_feed_stats(const char *sub) {
111   void *vs = NULL;
112   jlog_feed_stats_t *s = NULL;
113   if(noit_hash_retrieve(&feed_stats, sub, strlen(sub), &vs))
114     return (jlog_feed_stats_t *)vs;
115   s = calloc(1, sizeof(*s));
116   s->feed_name = strdup(sub);
117   noit_hash_store(&feed_stats, s->feed_name, strlen(s->feed_name), s);
118   return s;
119 }
120 int
121 noit_jlog_foreach_feed_stats(int (*f)(jlog_feed_stats_t *, void *),
122                              void *c) {
123   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
124   const char *key;
125   int klen, cnt = 0;
126   void *vs;
127   while(noit_hash_next(&feed_stats, &iter, &key, &klen, &vs)) {
128     cnt += f((jlog_feed_stats_t *)vs, c);
129   }
130   return cnt;
131 }
132 static int
133 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
134   int w, sofar = 0;
135   while(l > sofar) {
136     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
137     if(w <= 0) return w;
138     sofar += w;
139   }
140   return sofar;
141 }
142 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
143
144 static int
145 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
146   jlog_message msg;
147   int mask;
148   u_int32_t n_count;
149   n_count = htonl(jcl->count);
150   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
151     return -1;
152   while(jcl->count > 0) {
153     int rv;
154     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
155     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
156       return -1;
157
158     /* Here we actually push the message */
159     payload.chkpt.log = htonl(jcl->start.log);
160     payload.chkpt.marker = htonl(jcl->start.marker);
161     payload.n_sec  = htonl(msg.header->tv_sec);
162     payload.n_usec = htonl(msg.header->tv_usec);
163     payload.n_len  = htonl(msg.mess_len);
164     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
165       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
166             rv, (int)sizeof(payload));
167       return -1;
168     }
169     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
170       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
171             rv, msg.mess_len);
172       return -1;
173     }
174     /* Note what the client must checkpoint */
175     jcl->chkpt = jcl->start;
176
177     JLOG_ID_ADVANCE(&jcl->start);
178     jcl->count--;
179   }
180   return 0;
181 }
182
183 void *
184 noit_jlog_thread_main(void *e_vptr) {
185   int mask, bytes_read, sleeptime, max_sleeptime;
186   eventer_t e = e_vptr;
187   acceptor_closure_t *ac = e->closure;
188   noit_jlog_closure_t *jcl = ac->service_ctx;
189   char inbuff[sizeof(jlog_id)];
190
191   noit_memory_init_thread();
192   eventer_set_fd_blocking(e->fd);
193
194   max_sleeptime = DEFAULT_MSECONDS_BETWEEN_BATCHES;
195   if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
196     max_sleeptime = DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES;
197
198   sleeptime = max_sleeptime;
199   while(1) {
200     jlog_id client_chkpt;
201     sleeptime = MIN(sleeptime, max_sleeptime);
202     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
203     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
204     if(jcl->count < 0) {
205       char idxfile[PATH_MAX];
206       noitL(noit_error, "jlog_ctx_read_interval: %s\n",
207             jlog_ctx_err_string(jcl->jlog));
208       switch (jlog_ctx_err(jcl->jlog)) {
209         case JLOG_ERR_FILE_CORRUPT:
210         case JLOG_ERR_IDX_CORRUPT:
211           jlog_repair_datafile(jcl->jlog, jcl->start.log);
212           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
213           noitL(noit_error,
214                 "jlog reconstructed, deleting corresponding index.\n");
215           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
216           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
217           unlink(idxfile);
218           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
219           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
220           unlink(idxfile);
221           goto alldone;
222           break;
223         default:
224           goto alldone;
225       }
226     }
227     if(jcl->count > MAX_ROWS_AT_ONCE) {
228       /* Artificially set down the range to make the batches a bit easier
229        * to handle on the stratcond/postgres end.
230        * However, we must have more data, so drop the sleeptime to 0
231        */
232       jcl->count = MAX_ROWS_AT_ONCE;
233       jcl->finish.marker = jcl->start.marker + jcl->count;
234     }
235     if(jcl->count > 0) {
236       sleeptime = 0;
237       if(noit_jlog_push(e, jcl)) {
238         goto alldone;
239       }
240       /* Read our jlog_id accounting for possibly short reads */
241       bytes_read = 0;
242       while(bytes_read < sizeof(jlog_id)) {
243         int len;
244         if((len = e->opset->read(e->fd, inbuff + bytes_read,
245                                  sizeof(jlog_id) - bytes_read,
246                                  &mask, e)) <= 0)
247           goto alldone;
248         bytes_read += len;
249       }
250       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
251       /* Fix the endian */
252       client_chkpt.log = ntohl(client_chkpt.log);
253       client_chkpt.marker = ntohl(client_chkpt.marker);
254  
255       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
256         noitL(noit_error,
257               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
258               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
259               jcl->chkpt.log, jcl->chkpt.marker);
260         goto alldone;
261       }
262       gettimeofday(&jcl->feed_stats->last_checkpoint, NULL);
263       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
264     }
265     else {
266       /* we have nothing to write -- maybe we have no checks configured...
267        * If this is the case "forever", the remote might disconnect and
268        * we would never know. Do the painful work of detecting a
269        * disconnected client.
270        */
271       struct pollfd pfd;
272       pfd.fd = e->fd;
273       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
274       pfd.revents = 0;
275       if(poll(&pfd, 1, 0) != 0) {
276         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
277          * not be writing to us.  So, we know we can't have any legitimate
278          * data on this socket (true even though this is SSL). So, if we're
279          * here then "shit went wrong"
280          */
281         noitL(noit_error, "jlog client %s disconnected while idle\n",
282               ac->remote_cn);
283         goto alldone;
284       }
285     }
286     if(sleeptime) {
287       usleep(sleeptime * 1000); /* us -> ms */
288       sleeptime += 1000; /* 1 s */
289     }
290   }
291
292  alldone:
293   e->opset->close(e->fd, &mask, e);
294   noit_atomic_dec32(&jcl->feed_stats->connections);
295   noit_jlog_closure_free(jcl);
296   acceptor_closure_free(ac);
297   noit_memory_maintenance();
298   return NULL;
299 }
300
301 int
302 noit_jlog_handler(eventer_t e, int mask, void *closure,
303                      struct timeval *now) {
304   eventer_t newe;
305   pthread_t tid;
306   pthread_attr_t tattr;
307   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
308   acceptor_closure_t *ac = closure;
309   noit_jlog_closure_t *jcl = ac->service_ctx;
310   char errbuff[256];
311   const char *errstr = "unknown error";
312
313   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
314     int len, nlen;
315 socket_error:
316     /* Exceptions cause us to simply snip the connection */
317     len = strlen(errstr);
318     nlen = htonl(0 - len);
319     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
320     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
321     eventer_remove_fd(e->fd);
322     e->opset->close(e->fd, &newmask, e);
323     if(jcl) noit_jlog_closure_free(jcl);
324     acceptor_closure_free(ac);
325     return 0;
326   }
327
328   if(!ac->service_ctx) {
329     noit_log_stream_t ls;
330     const char *logname, *type;
331     int first_attempt = 1;
332     char path[PATH_MAX], subscriber[256], *sub;
333     jcl = ac->service_ctx = noit_jlog_closure_alloc();
334     if(!noit_hash_retr_str(ac->config,
335                            "log_transit_feed_name",
336                            strlen("log_transit_feed_name"),
337                            &logname)) {
338       errstr = "No 'log_transit_feed_name' specified in log_transit.";
339       noitL(noit_error, "%s\n", errstr);
340       goto socket_error;
341     }
342     ls = noit_log_stream_find(logname);
343     if(!ls) {
344       snprintf(errbuff, sizeof(errbuff),
345                "Could not find log '%s' for log_transit.", logname);
346       errstr = errbuff;
347       noitL(noit_error, "%s\n", errstr);
348       goto socket_error;
349     }
350     type = noit_log_stream_get_type(ls);
351     if(!type || strcmp(type, "jlog")) {
352       snprintf(errbuff, sizeof(errbuff),
353                "Log '%s' for log_transit is not a jlog.", logname);
354       errstr = errbuff;
355       noitL(noit_error, "%s\n", errstr);
356       goto socket_error;
357     }
358     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
359       if(!ac->remote_cn) {
360         errstr = "jlog transit started to unidentified party.";
361         noitL(noit_error, "%s\n", errstr);
362         goto socket_error;
363       }
364       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
365       jcl->feed_stats = noit_jlog_feed_stats(subscriber);
366     }
367     else {
368       jcl->feed_stats = noit_jlog_feed_stats("~");
369       snprintf(subscriber, sizeof(subscriber),
370                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
371     }
372     jcl->subscriber = strdup(subscriber);
373
374     strlcpy(path, noit_log_stream_get_path(ls), sizeof(path));
375     sub = strchr(path, '(');
376     if(sub) {
377       char *esub = strchr(sub, ')');
378       if(esub) {
379         *esub = '\0';
380         *sub++ = '\0';
381       }
382     }
383
384     jcl->jlog = jlog_new(path);
385     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) {
386  add_sub:
387       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
388         snprintf(errbuff, sizeof(errbuff),
389                  "jlog reader[%s] error: %s", jcl->subscriber,
390                  jlog_ctx_err_string(jcl->jlog));
391         errstr = errbuff;
392         noitL(noit_error, "%s\n", errstr);
393       }
394     }
395     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
396       if(sub && !strcmp(sub, "*")) {
397         if(first_attempt) {
398           jlog_ctx_close(jcl->jlog);
399           jcl->jlog = jlog_new(path);
400           first_attempt = 0;
401           goto add_sub;
402         }
403       }
404       snprintf(errbuff, sizeof(errbuff),
405                "jlog reader[%s] error: %s", jcl->subscriber,
406                jlog_ctx_err_string(jcl->jlog));
407       errstr = errbuff;
408       noitL(noit_error, "%s\n", errstr);
409       goto socket_error;
410     }
411   }
412
413   /* The jlog stuff is disk I/O and can block us.
414    * We'll create a new thread to just handle this connection.
415    */
416   eventer_remove_fd(e->fd);
417   newe = eventer_alloc();
418   memcpy(newe, e, sizeof(*e));
419   pthread_attr_init(&tattr);
420   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
421   gettimeofday(&jcl->feed_stats->last_connection, NULL);
422   noit_atomic_inc32(&jcl->feed_stats->connections);
423   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
424     return 0;
425   }
426
427   /* Undo our dup */
428   eventer_free(newe);
429   /* Creating the thread failed, close it down and deschedule. */
430   e->opset->close(e->fd, &newmask, e);
431   return 0;
432 }
433
434 static int rest_show_feed(noit_http_rest_closure_t *restc,
435                           int npats, char **pats) {
436   noit_http_session_ctx *ctx = restc->http_ctx;
437   const char *err = "unknown error";
438   const char *jpath_with_sub;
439   char jlogpath[PATH_MAX], *cp, **subs = NULL;
440   int nsubs, i;
441   noit_log_stream_t feed;
442   jlog_ctx *jctx = NULL;
443   xmlDocPtr doc = NULL;
444   xmlNodePtr root = NULL, subnodes;
445
446   feed = noit_log_stream_find("feed");
447   if(!feed) { err = "cannot find feed"; goto error; }
448
449   jpath_with_sub = noit_log_stream_get_path(feed);
450   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
451   cp = strchr(jlogpath, '(');
452   if(cp) *cp = '\0';
453
454   jctx = jlog_new(jlogpath);
455   if((nsubs = jlog_ctx_list_subscribers(jctx, &subs)) == -1) {
456     err = jlog_ctx_err_string(jctx);
457     goto error;
458   }
459
460   doc = xmlNewDoc((xmlChar *)"1.0");
461   root = xmlNewDocNode(doc, NULL, (xmlChar *)"feed", NULL);
462   xmlDocSetRootElement(doc, root);
463
464   subnodes = xmlNewNode(NULL, (xmlChar *)"subscribers");
465   for(i=0; i<nsubs; i++) {
466     xmlNewChild(subnodes, NULL, (xmlChar *)"subscriber", (xmlChar *)subs[i]);
467   }
468   xmlAddChild(root, subnodes);
469
470   noit_http_response_ok(restc->http_ctx, "text/xml");
471   noit_http_response_xml(restc->http_ctx, doc);
472   noit_http_response_end(restc->http_ctx);
473   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
474   xmlFreeDoc(doc);
475   jlog_ctx_close(jctx);
476   return 0;
477
478  error:
479   if(doc) xmlFreeDoc(doc);
480   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
481   noit_http_response_server_error(ctx, "text/plain");
482   noit_http_response_append(ctx, err, strlen(err));
483   noit_http_response_end(ctx);
484   if(jctx) jlog_ctx_close(jctx);
485   return 0;
486 }
487
488 static int rest_delete_feed(noit_http_rest_closure_t *restc,
489                             int npats, char **pats) {
490   noit_http_session_ctx *ctx = restc->http_ctx;
491   const char *err = "unknown error";
492   const char *jpath_with_sub;
493   char jlogpath[PATH_MAX], *cp;
494   int rv;
495   noit_log_stream_t feed;
496   jlog_ctx *jctx;
497
498   feed = noit_log_stream_find("feed");
499   if(!feed) { err = "cannot find feed"; goto error; }
500
501   jpath_with_sub = noit_log_stream_get_path(feed);
502   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
503   cp = strchr(jlogpath, '(');
504   if(cp) *cp = '\0';
505
506   jctx = jlog_new(jlogpath);
507   rv = jlog_ctx_remove_subscriber(jctx, pats[0]);
508   jlog_ctx_close(jctx);
509   if(rv < 0) {
510     err = jlog_ctx_err_string(jctx);
511     goto error;
512   }
513
514   /* removed or note, we should do a sweeping cleanup */
515   jlog_clean(jlogpath);
516
517   if(rv == 0) {
518     noit_http_response_not_found(ctx, "text/plain");
519     noit_http_response_end(ctx);
520     return 0;
521   }
522
523   noit_http_response_standard(ctx, 204, "OK", "text/plain");
524   noit_http_response_end(ctx);
525   return 0;
526
527  error:
528   noit_http_response_server_error(ctx, "text/plain");
529   noit_http_response_append(ctx, err, strlen(err));
530   noit_http_response_end(ctx);
531   return 0;
532 }
533
Note: See TracBrowser for help on using the browser.