root/src/noit_jlog_listener.c

Revision 3d79afbe1acea716e45ec275671496775920260d, 16.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 months ago)

Introduce sleeptime only in the event that we have zero actual rows.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "noit_listener.h"
36 #include "utils/noit_hash.h"
37 #include "utils/noit_log.h"
38 #include "jlog/jlog.h"
39 #include "jlog/jlog_private.h"
40 #include "noit_jlog_listener.h"
41 #include "noit_rest.h"
42
43 #include <unistd.h>
44 #include <poll.h>
45 #include <assert.h>
46 #define MAX_ROWS_AT_ONCE 1000
47 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
48
49 static noit_atomic32_t tmpfeedcounter = 0;
50 static int rest_show_feed(noit_http_rest_closure_t *restc,
51                           int npats, char **pats);
52 static int rest_delete_feed(noit_http_rest_closure_t *restc,
53                             int npats, char **pats);
54
55 void
56 noit_jlog_listener_init() {
57   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
58   noit_control_dispatch_delegate(noit_control_dispatch,
59                                  NOIT_JLOG_DATA_FEED,
60                                  noit_jlog_handler);
61   noit_control_dispatch_delegate(noit_control_dispatch,
62                                  NOIT_JLOG_DATA_TEMP_FEED,
63                                  noit_jlog_handler);
64   assert(noit_http_rest_register_auth(
65     "GET", "/", "^feed$",
66     rest_show_feed, noit_http_rest_client_cert_auth
67   ) == 0);
68   assert(noit_http_rest_register_auth(
69     "DELETE", "/feed/", "^(.+)$",
70     rest_delete_feed, noit_http_rest_client_cert_auth
71   ) == 0);
72 }
73
74 typedef struct {
75   jlog_ctx *jlog;
76   char *subscriber;
77   jlog_id chkpt;
78   jlog_id start;
79   jlog_id finish;
80   jlog_feed_stats_t *feed_stats;
81   int count;
82   int wants_shutdown;
83 } noit_jlog_closure_t;
84
85 noit_jlog_closure_t *
86 noit_jlog_closure_alloc(void) {
87   noit_jlog_closure_t *jcl;
88   jcl = calloc(1, sizeof(*jcl));
89   return jcl;
90 }
91
92 void
93 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
94   if(jcl->jlog) {
95     if(jcl->subscriber) {
96       if(jcl->subscriber[0] == '~')
97         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
98       free(jcl->subscriber);
99     }
100     jlog_ctx_close(jcl->jlog);
101   }
102   free(jcl);
103 }
104
105 static noit_hash_table feed_stats = NOIT_HASH_EMPTY;
106
107 jlog_feed_stats_t *
108 noit_jlog_feed_stats(const char *sub) {
109   void *vs = NULL;
110   jlog_feed_stats_t *s = NULL;
111   if(noit_hash_retrieve(&feed_stats, sub, strlen(sub), &vs))
112     return (jlog_feed_stats_t *)vs;
113   s = calloc(1, sizeof(*s));
114   s->feed_name = strdup(sub);
115   noit_hash_store(&feed_stats, s->feed_name, strlen(s->feed_name), s);
116   return s;
117 }
118 int
119 noit_jlog_foreach_feed_stats(int (*f)(jlog_feed_stats_t *, void *),
120                              void *c) {
121   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
122   const char *key;
123   int klen, cnt = 0;
124   void *vs;
125   while(noit_hash_next(&feed_stats, &iter, &key, &klen, &vs)) {
126     cnt += f((jlog_feed_stats_t *)vs, c);
127   }
128   return cnt;
129 }
130 static int
131 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
132   int w, sofar = 0;
133   while(l > sofar) {
134     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
135     if(w <= 0) return w;
136     sofar += w;
137   }
138   return sofar;
139 }
140 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
141
142 static int
143 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
144   jlog_message msg;
145   int mask;
146   u_int32_t n_count;
147   n_count = htonl(jcl->count);
148   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
149     return -1;
150   while(jcl->count > 0) {
151     int rv;
152     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
153     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
154       return -1;
155
156     /* Here we actually push the message */
157     payload.chkpt.log = htonl(jcl->start.log);
158     payload.chkpt.marker = htonl(jcl->start.marker);
159     payload.n_sec  = htonl(msg.header->tv_sec);
160     payload.n_usec = htonl(msg.header->tv_usec);
161     payload.n_len  = htonl(msg.mess_len);
162     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
163       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
164             rv, (int)sizeof(payload));
165       return -1;
166     }
167     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
168       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
169             rv, msg.mess_len);
170       return -1;
171     }
172     /* Note what the client must checkpoint */
173     jcl->chkpt = jcl->start;
174
175     JLOG_ID_ADVANCE(&jcl->start);
176     jcl->count--;
177   }
178   return 0;
179 }
180
181 void *
182 noit_jlog_thread_main(void *e_vptr) {
183   int mask, bytes_read;
184   eventer_t e = e_vptr;
185   acceptor_closure_t *ac = e->closure;
186   noit_jlog_closure_t *jcl = ac->service_ctx;
187   char inbuff[sizeof(jlog_id)];
188
189   eventer_set_fd_blocking(e->fd);
190
191   while(1) {
192     jlog_id client_chkpt;
193     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
194                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
195     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
196     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
197     if(jcl->count < 0) {
198       char idxfile[PATH_MAX];
199       noitL(noit_error, "jlog_ctx_read_interval: %s\n",
200             jlog_ctx_err_string(jcl->jlog));
201       switch (jlog_ctx_err(jcl->jlog)) {
202         case JLOG_ERR_FILE_CORRUPT:
203         case JLOG_ERR_IDX_CORRUPT:
204           jlog_repair_datafile(jcl->jlog, jcl->start.log);
205           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
206           noitL(noit_error,
207                 "jlog reconstructed, deleting corresponding index.\n");
208           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
209           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
210           unlink(idxfile);
211           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
212           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
213           unlink(idxfile);
214           goto alldone;
215           break;
216         default:
217           goto alldone;
218       }
219     }
220     if(jcl->count > MAX_ROWS_AT_ONCE) {
221       /* Artificially set down the range to make the batches a bit easier
222        * to handle on the stratcond/postgres end.
223        * However, we must have more data, so drop the sleeptime to 0
224        */
225       jcl->count = MAX_ROWS_AT_ONCE;
226       jcl->finish.marker = jcl->start.marker + jcl->count;
227     }
228     if(jcl->count > 0) {
229       sleeptime = 0;
230       if(noit_jlog_push(e, jcl)) {
231         goto alldone;
232       }
233       /* Read our jlog_id accounting for possibly short reads */
234       bytes_read = 0;
235       while(bytes_read < sizeof(jlog_id)) {
236         int len;
237         if((len = e->opset->read(e->fd, inbuff + bytes_read,
238                                  sizeof(jlog_id) - bytes_read,
239                                  &mask, e)) <= 0)
240           goto alldone;
241         bytes_read += len;
242       }
243       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
244       /* Fix the endian */
245       client_chkpt.log = ntohl(client_chkpt.log);
246       client_chkpt.marker = ntohl(client_chkpt.marker);
247  
248       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
249         noitL(noit_error,
250               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
251               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
252               jcl->chkpt.log, jcl->chkpt.marker);
253         goto alldone;
254       }
255       gettimeofday(&jcl->feed_stats->last_checkpoint, NULL);
256       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
257     }
258     else {
259       /* we have nothing to write -- maybe we have no checks configured...
260        * If this is the case "forever", the remote might disconnect and
261        * we would never know. Do the painful work of detecting a
262        * disconnected client.
263        */
264       struct pollfd pfd;
265       pfd.fd = e->fd;
266       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
267       pfd.revents = 0;
268       if(poll(&pfd, 1, 0) != 0) {
269         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
270          * not be writing to us.  So, we know we can't have any legitimate
271          * data on this socket (true even though this is SSL). So, if we're
272          * here then "shit went wrong"
273          */
274         noitL(noit_error, "jlog client %s disconnected while idle\n",
275               ac->remote_cn);
276         goto alldone;
277       }
278     }
279     if(sleeptime) sleep(sleeptime);
280   }
281
282  alldone:
283   e->opset->close(e->fd, &mask, e);
284   noit_atomic_dec32(&jcl->feed_stats->connections);
285   noit_jlog_closure_free(jcl);
286   acceptor_closure_free(ac);
287   return NULL;
288 }
289
290 int
291 noit_jlog_handler(eventer_t e, int mask, void *closure,
292                      struct timeval *now) {
293   eventer_t newe;
294   pthread_t tid;
295   pthread_attr_t tattr;
296   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
297   acceptor_closure_t *ac = closure;
298   noit_jlog_closure_t *jcl = ac->service_ctx;
299   char errbuff[256];
300   const char *errstr = "unknown error";
301
302   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
303     int len, nlen;
304 socket_error:
305     /* Exceptions cause us to simply snip the connection */
306     len = strlen(errstr);
307     nlen = htonl(0 - len);
308     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
309     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
310     eventer_remove_fd(e->fd);
311     e->opset->close(e->fd, &newmask, e);
312     if(jcl) noit_jlog_closure_free(jcl);
313     acceptor_closure_free(ac);
314     return 0;
315   }
316
317   if(!ac->service_ctx) {
318     noit_log_stream_t ls;
319     const char *logname, *type;
320     int first_attempt = 1;
321     char path[PATH_MAX], subscriber[256], *sub;
322     jcl = ac->service_ctx = noit_jlog_closure_alloc();
323     if(!noit_hash_retr_str(ac->config,
324                            "log_transit_feed_name",
325                            strlen("log_transit_feed_name"),
326                            &logname)) {
327       errstr = "No 'log_transit_feed_name' specified in log_transit.";
328       noitL(noit_error, "%s\n", errstr);
329       goto socket_error;
330     }
331     ls = noit_log_stream_find(logname);
332     if(!ls) {
333       snprintf(errbuff, sizeof(errbuff),
334                "Could not find log '%s' for log_transit.", logname);
335       errstr = errbuff;
336       noitL(noit_error, "%s\n", errstr);
337       goto socket_error;
338     }
339     type = noit_log_stream_get_type(ls);
340     if(!type || strcmp(type, "jlog")) {
341       snprintf(errbuff, sizeof(errbuff),
342                "Log '%s' for log_transit is not a jlog.", logname);
343       errstr = errbuff;
344       noitL(noit_error, "%s\n", errstr);
345       goto socket_error;
346     }
347     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
348       if(!ac->remote_cn) {
349         errstr = "jlog transit started to unidentified party.";
350         noitL(noit_error, "%s\n", errstr);
351         goto socket_error;
352       }
353       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
354       jcl->feed_stats = noit_jlog_feed_stats(subscriber);
355     }
356     else {
357       jcl->feed_stats = noit_jlog_feed_stats("~");
358       snprintf(subscriber, sizeof(subscriber),
359                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
360     }
361     jcl->subscriber = strdup(subscriber);
362
363     strlcpy(path, noit_log_stream_get_path(ls), sizeof(path));
364     sub = strchr(path, '(');
365     if(sub) {
366       char *esub = strchr(sub, ')');
367       if(esub) {
368         *esub = '\0';
369         *sub++ = '\0';
370       }
371     }
372
373     jcl->jlog = jlog_new(path);
374     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) {
375  add_sub:
376       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
377         snprintf(errbuff, sizeof(errbuff),
378                  "jlog reader[%s] error: %s", jcl->subscriber,
379                  jlog_ctx_err_string(jcl->jlog));
380         errstr = errbuff;
381         noitL(noit_error, "%s\n", errstr);
382       }
383     }
384     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
385       if(sub && !strcmp(sub, "*")) {
386         if(first_attempt) {
387           jlog_ctx_close(jcl->jlog);
388           jcl->jlog = jlog_new(path);
389           first_attempt = 0;
390           goto add_sub;
391         }
392       }
393       snprintf(errbuff, sizeof(errbuff),
394                "jlog reader[%s] error: %s", jcl->subscriber,
395                jlog_ctx_err_string(jcl->jlog));
396       errstr = errbuff;
397       noitL(noit_error, "%s\n", errstr);
398       goto socket_error;
399     }
400   }
401
402   /* The jlog stuff is disk I/O and can block us.
403    * We'll create a new thread to just handle this connection.
404    */
405   eventer_remove_fd(e->fd);
406   newe = eventer_alloc();
407   memcpy(newe, e, sizeof(*e));
408   pthread_attr_init(&tattr);
409   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
410   gettimeofday(&jcl->feed_stats->last_connection, NULL);
411   noit_atomic_inc32(&jcl->feed_stats->connections);
412   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
413     return 0;
414   }
415
416   /* Undo our dup */
417   eventer_free(newe);
418   /* Creating the thread failed, close it down and deschedule. */
419   e->opset->close(e->fd, &newmask, e);
420   return 0;
421 }
422
423 static int rest_show_feed(noit_http_rest_closure_t *restc,
424                           int npats, char **pats) {
425   noit_http_session_ctx *ctx = restc->http_ctx;
426   const char *err = "unknown error";
427   const char *jpath_with_sub;
428   char jlogpath[PATH_MAX], *cp, **subs = NULL;
429   int nsubs, i;
430   noit_log_stream_t feed;
431   jlog_ctx *jctx = NULL;
432   xmlDocPtr doc = NULL;
433   xmlNodePtr root = NULL, subnodes;
434
435   feed = noit_log_stream_find("feed");
436   if(!feed) { err = "cannot find feed"; goto error; }
437
438   jpath_with_sub = noit_log_stream_get_path(feed);
439   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
440   cp = strchr(jlogpath, '(');
441   if(cp) *cp = '\0';
442
443   jctx = jlog_new(jlogpath);
444   if((nsubs = jlog_ctx_list_subscribers(jctx, &subs)) == -1) {
445     err = jlog_ctx_err_string(jctx);
446     goto error;
447   }
448
449   doc = xmlNewDoc((xmlChar *)"1.0");
450   root = xmlNewDocNode(doc, NULL, (xmlChar *)"feed", NULL);
451   xmlDocSetRootElement(doc, root);
452
453   subnodes = xmlNewNode(NULL, (xmlChar *)"subscribers");
454   for(i=0; i<nsubs; i++) {
455     xmlNewChild(subnodes, NULL, (xmlChar *)"subscriber", (xmlChar *)subs[i]);
456   }
457   xmlAddChild(root, subnodes);
458
459   noit_http_response_ok(restc->http_ctx, "text/xml");
460   noit_http_response_xml(restc->http_ctx, doc);
461   noit_http_response_end(restc->http_ctx);
462   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
463   xmlFreeDoc(doc);
464   jlog_ctx_close(jctx);
465   return 0;
466
467  error:
468   if(doc) xmlFreeDoc(doc);
469   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
470   noit_http_response_server_error(ctx, "text/plain");
471   noit_http_response_append(ctx, err, strlen(err));
472   noit_http_response_end(ctx);
473   if(jctx) jlog_ctx_close(jctx);
474   return 0;
475 }
476
477 static int rest_delete_feed(noit_http_rest_closure_t *restc,
478                             int npats, char **pats) {
479   noit_http_session_ctx *ctx = restc->http_ctx;
480   const char *err = "unknown error";
481   const char *jpath_with_sub;
482   char jlogpath[PATH_MAX], *cp;
483   int rv;
484   noit_log_stream_t feed;
485   jlog_ctx *jctx;
486
487   feed = noit_log_stream_find("feed");
488   if(!feed) { err = "cannot find feed"; goto error; }
489
490   jpath_with_sub = noit_log_stream_get_path(feed);
491   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
492   cp = strchr(jlogpath, '(');
493   if(cp) *cp = '\0';
494
495   jctx = jlog_new(jlogpath);
496   rv = jlog_ctx_remove_subscriber(jctx, pats[0]);
497   jlog_ctx_close(jctx);
498   if(rv < 0) {
499     err = jlog_ctx_err_string(jctx);
500     goto error;
501   }
502
503   /* removed or note, we should do a sweeping cleanup */
504   jlog_clean(jlogpath);
505
506   if(rv == 0) {
507     noit_http_response_not_found(ctx, "text/plain");
508     noit_http_response_end(ctx);
509     return 0;
510   }
511
512   noit_http_response_standard(ctx, 204, "OK", "text/plain");
513   noit_http_response_end(ctx);
514   return 0;
515
516  error:
517   noit_http_response_server_error(ctx, "text/plain");
518   noit_http_response_append(ctx, err, strlen(err));
519   noit_http_response_end(ctx);
520   return 0;
521 }
522
Note: See TracBrowser for help on using the browser.