root/src/noit_jlog_listener.c

Revision 304ec80b8cf842fc0abe5f9029790908b6455957, 17.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 months ago)

Convert to libmtev.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35 #include <eventer/eventer.h>
36 #include <mtev_listener.h>
37 #include <mtev_hash.h>
38 #include <mtev_memory.h>
39 #include <mtev_rest.h>
40
41 #include <jlog.h>
42 #include <jlog_private.h>
43 #include "noit_mtev_bridge.h"
44 #include "noit_jlog_listener.h"
45
46 #include <unistd.h>
47 #include <poll.h>
48 #include <assert.h>
49 #define MAX_ROWS_AT_ONCE 1000
50 #define DEFAULT_MSECONDS_BETWEEN_BATCHES 10000
51 #define DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES 500
52
53 static mtev_atomic32_t tmpfeedcounter = 0;
54 static int rest_show_feed(mtev_http_rest_closure_t *restc,
55                           int npats, char **pats);
56 static int rest_delete_feed(mtev_http_rest_closure_t *restc,
57                             int npats, char **pats);
58
59 void
60 noit_jlog_listener_init() {
61   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
62   mtev_control_dispatch_delegate(mtev_control_dispatch,
63                                  NOIT_JLOG_DATA_FEED,
64                                  noit_jlog_handler);
65   mtev_control_dispatch_delegate(mtev_control_dispatch,
66                                  NOIT_JLOG_DATA_TEMP_FEED,
67                                  noit_jlog_handler);
68   assert(mtev_http_rest_register_auth(
69     "GET", "/", "^feed$",
70     rest_show_feed, mtev_http_rest_client_cert_auth
71   ) == 0);
72   assert(mtev_http_rest_register_auth(
73     "DELETE", "/feed/", "^(.+)$",
74     rest_delete_feed, mtev_http_rest_client_cert_auth
75   ) == 0);
76 }
77
78 typedef struct {
79   jlog_ctx *jlog;
80   char *subscriber;
81   jlog_id chkpt;
82   jlog_id start;
83   jlog_id finish;
84   jlog_feed_stats_t *feed_stats;
85   int count;
86   int wants_shutdown;
87 } noit_jlog_closure_t;
88
89 noit_jlog_closure_t *
90 noit_jlog_closure_alloc(void) {
91   noit_jlog_closure_t *jcl;
92   jcl = calloc(1, sizeof(*jcl));
93   return jcl;
94 }
95
96 void
97 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
98   if(jcl->jlog) {
99     if(jcl->subscriber) {
100       if(jcl->subscriber[0] == '~')
101         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
102       free(jcl->subscriber);
103     }
104     jlog_ctx_close(jcl->jlog);
105   }
106   free(jcl);
107 }
108
109 static mtev_hash_table feed_stats = MTEV_HASH_EMPTY;
110
111 jlog_feed_stats_t *
112 noit_jlog_feed_stats(const char *sub) {
113   void *vs = NULL;
114   jlog_feed_stats_t *s = NULL;
115   if(mtev_hash_retrieve(&feed_stats, sub, strlen(sub), &vs))
116     return (jlog_feed_stats_t *)vs;
117   s = calloc(1, sizeof(*s));
118   s->feed_name = strdup(sub);
119   mtev_hash_store(&feed_stats, s->feed_name, strlen(s->feed_name), s);
120   return s;
121 }
122 int
123 noit_jlog_foreach_feed_stats(int (*f)(jlog_feed_stats_t *, void *),
124                              void *c) {
125   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
126   const char *key;
127   int klen, cnt = 0;
128   void *vs;
129   while(mtev_hash_next(&feed_stats, &iter, &key, &klen, &vs)) {
130     cnt += f((jlog_feed_stats_t *)vs, c);
131   }
132   return cnt;
133 }
134 static int
135 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
136   int w, sofar = 0;
137   while(l > sofar) {
138     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
139     if(w <= 0) return w;
140     sofar += w;
141   }
142   return sofar;
143 }
144 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
145
146 static int
147 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
148   jlog_message msg;
149   int mask;
150   u_int32_t n_count;
151   n_count = htonl(jcl->count);
152   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
153     return -1;
154   while(jcl->count > 0) {
155     int rv;
156     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
157     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
158       return -1;
159
160     /* Here we actually push the message */
161     payload.chkpt.log = htonl(jcl->start.log);
162     payload.chkpt.marker = htonl(jcl->start.marker);
163     payload.n_sec  = htonl(msg.header->tv_sec);
164     payload.n_usec = htonl(msg.header->tv_usec);
165     payload.n_len  = htonl(msg.mess_len);
166     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
167       mtevL(noit_error, "Error writing jlog header over SSL %d != %d\n",
168             rv, (int)sizeof(payload));
169       return -1;
170     }
171     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
172       mtevL(noit_error, "Error writing jlog message over SSL %d != %d\n",
173             rv, msg.mess_len);
174       return -1;
175     }
176     /* Note what the client must checkpoint */
177     jcl->chkpt = jcl->start;
178
179     JLOG_ID_ADVANCE(&jcl->start);
180     jcl->count--;
181   }
182   return 0;
183 }
184
185 void *
186 noit_jlog_thread_main(void *e_vptr) {
187   int mask, bytes_read, sleeptime, max_sleeptime;
188   eventer_t e = e_vptr;
189   acceptor_closure_t *ac = e->closure;
190   noit_jlog_closure_t *jcl = ac->service_ctx;
191   char inbuff[sizeof(jlog_id)];
192
193   mtev_memory_init_thread();
194   eventer_set_fd_blocking(e->fd);
195
196   max_sleeptime = DEFAULT_MSECONDS_BETWEEN_BATCHES;
197   if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
198     max_sleeptime = DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES;
199
200   sleeptime = max_sleeptime;
201   while(1) {
202     jlog_id client_chkpt;
203     sleeptime = MIN(sleeptime, max_sleeptime);
204     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
205     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
206     if(jcl->count < 0) {
207       char idxfile[PATH_MAX];
208       mtevL(noit_error, "jlog_ctx_read_interval: %s\n",
209             jlog_ctx_err_string(jcl->jlog));
210       switch (jlog_ctx_err(jcl->jlog)) {
211         case JLOG_ERR_FILE_CORRUPT:
212         case JLOG_ERR_IDX_CORRUPT:
213           jlog_repair_datafile(jcl->jlog, jcl->start.log);
214           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
215           mtevL(noit_error,
216                 "jlog reconstructed, deleting corresponding index.\n");
217           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
218           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
219           unlink(idxfile);
220           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
221           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
222           unlink(idxfile);
223           goto alldone;
224           break;
225         default:
226           goto alldone;
227       }
228     }
229     if(jcl->count > MAX_ROWS_AT_ONCE) {
230       /* Artificially set down the range to make the batches a bit easier
231        * to handle on the stratcond/postgres end.
232        * However, we must have more data, so drop the sleeptime to 0
233        */
234       jcl->count = MAX_ROWS_AT_ONCE;
235       jcl->finish.marker = jcl->start.marker + jcl->count;
236     }
237     if(jcl->count > 0) {
238       sleeptime = 0;
239       if(noit_jlog_push(e, jcl)) {
240         goto alldone;
241       }
242       /* Read our jlog_id accounting for possibly short reads */
243       bytes_read = 0;
244       while(bytes_read < sizeof(jlog_id)) {
245         int len;
246         if((len = e->opset->read(e->fd, inbuff + bytes_read,
247                                  sizeof(jlog_id) - bytes_read,
248                                  &mask, e)) <= 0)
249           goto alldone;
250         bytes_read += len;
251       }
252       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
253       /* Fix the endian */
254       client_chkpt.log = ntohl(client_chkpt.log);
255       client_chkpt.marker = ntohl(client_chkpt.marker);
256  
257       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
258         mtevL(noit_error,
259               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
260               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
261               jcl->chkpt.log, jcl->chkpt.marker);
262         goto alldone;
263       }
264       gettimeofday(&jcl->feed_stats->last_checkpoint, NULL);
265       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
266     }
267     else {
268       /* we have nothing to write -- maybe we have no checks configured...
269        * If this is the case "forever", the remote might disconnect and
270        * we would never know. Do the painful work of detecting a
271        * disconnected client.
272        */
273       struct pollfd pfd;
274       pfd.fd = e->fd;
275       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
276       pfd.revents = 0;
277       if(poll(&pfd, 1, 0) != 0) {
278         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
279          * not be writing to us.  So, we know we can't have any legitimate
280          * data on this socket (true even though this is SSL). So, if we're
281          * here then "shit went wrong"
282          */
283         mtevL(noit_error, "jlog client %s disconnected while idle\n",
284               ac->remote_cn);
285         goto alldone;
286       }
287     }
288     if(sleeptime) {
289       usleep(sleeptime * 1000); /* us -> ms */
290     }
291     sleeptime += 1000; /* 1 s */
292   }
293
294  alldone:
295   e->opset->close(e->fd, &mask, e);
296   mtev_atomic_dec32(&jcl->feed_stats->connections);
297   noit_jlog_closure_free(jcl);
298   acceptor_closure_free(ac);
299   mtev_memory_maintenance();
300   return NULL;
301 }
302
303 int
304 noit_jlog_handler(eventer_t e, int mask, void *closure,
305                      struct timeval *now) {
306   eventer_t newe;
307   pthread_t tid;
308   pthread_attr_t tattr;
309   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
310   acceptor_closure_t *ac = closure;
311   noit_jlog_closure_t *jcl = ac->service_ctx;
312   char errbuff[256];
313   const char *errstr = "unknown error";
314
315   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
316     int len, nlen;
317 socket_error:
318     /* Exceptions cause us to simply snip the connection */
319     len = strlen(errstr);
320     nlen = htonl(0 - len);
321     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
322     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
323     eventer_remove_fd(e->fd);
324     e->opset->close(e->fd, &newmask, e);
325     if(jcl) noit_jlog_closure_free(jcl);
326     acceptor_closure_free(ac);
327     return 0;
328   }
329
330   if(!ac->service_ctx) {
331     mtev_log_stream_t ls;
332     const char *logname, *type;
333     int first_attempt = 1;
334     char path[PATH_MAX], subscriber[256], *sub;
335     jcl = ac->service_ctx = noit_jlog_closure_alloc();
336     if(!mtev_hash_retr_str(ac->config,
337                            "log_transit_feed_name",
338                            strlen("log_transit_feed_name"),
339                            &logname)) {
340       errstr = "No 'log_transit_feed_name' specified in log_transit.";
341       mtevL(noit_error, "%s\n", errstr);
342       goto socket_error;
343     }
344     ls = mtev_log_stream_find(logname);
345     if(!ls) {
346       snprintf(errbuff, sizeof(errbuff),
347                "Could not find log '%s' for log_transit.", logname);
348       errstr = errbuff;
349       mtevL(noit_error, "%s\n", errstr);
350       goto socket_error;
351     }
352     type = mtev_log_stream_get_type(ls);
353     if(!type || strcmp(type, "jlog")) {
354       snprintf(errbuff, sizeof(errbuff),
355                "Log '%s' for log_transit is not a jlog.", logname);
356       errstr = errbuff;
357       mtevL(noit_error, "%s\n", errstr);
358       goto socket_error;
359     }
360     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
361       if(!ac->remote_cn) {
362         errstr = "jlog transit started to unidentified party.";
363         mtevL(noit_error, "%s\n", errstr);
364         goto socket_error;
365       }
366       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
367       jcl->feed_stats = noit_jlog_feed_stats(subscriber);
368     }
369     else {
370       jcl->feed_stats = noit_jlog_feed_stats("~");
371       snprintf(subscriber, sizeof(subscriber),
372                "~%07d", mtev_atomic_inc32(&tmpfeedcounter));
373     }
374     jcl->subscriber = strdup(subscriber);
375
376     strlcpy(path, mtev_log_stream_get_path(ls), sizeof(path));
377     sub = strchr(path, '(');
378     if(sub) {
379       char *esub = strchr(sub, ')');
380       if(esub) {
381         *esub = '\0';
382         *sub++ = '\0';
383       }
384     }
385
386     jcl->jlog = jlog_new(path);
387     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) {
388  add_sub:
389       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
390         snprintf(errbuff, sizeof(errbuff),
391                  "jlog reader[%s] error: %s", jcl->subscriber,
392                  jlog_ctx_err_string(jcl->jlog));
393         errstr = errbuff;
394         mtevL(noit_error, "%s\n", errstr);
395       }
396     }
397     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
398       if(sub && !strcmp(sub, "*")) {
399         if(first_attempt) {
400           jlog_ctx_close(jcl->jlog);
401           jcl->jlog = jlog_new(path);
402           first_attempt = 0;
403           goto add_sub;
404         }
405       }
406       snprintf(errbuff, sizeof(errbuff),
407                "jlog reader[%s] error: %s", jcl->subscriber,
408                jlog_ctx_err_string(jcl->jlog));
409       errstr = errbuff;
410       mtevL(noit_error, "%s\n", errstr);
411       goto socket_error;
412     }
413   }
414
415   /* The jlog stuff is disk I/O and can block us.
416    * We'll create a new thread to just handle this connection.
417    */
418   eventer_remove_fd(e->fd);
419   newe = eventer_alloc();
420   memcpy(newe, e, sizeof(*e));
421   pthread_attr_init(&tattr);
422   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
423   gettimeofday(&jcl->feed_stats->last_connection, NULL);
424   mtev_atomic_inc32(&jcl->feed_stats->connections);
425   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
426     return 0;
427   }
428
429   /* Undo our dup */
430   eventer_free(newe);
431   /* Creating the thread failed, close it down and deschedule. */
432   e->opset->close(e->fd, &newmask, e);
433   return 0;
434 }
435
436 static int rest_show_feed(mtev_http_rest_closure_t *restc,
437                           int npats, char **pats) {
438   mtev_http_session_ctx *ctx = restc->http_ctx;
439   const char *err = "unknown error";
440   const char *jpath_with_sub;
441   char jlogpath[PATH_MAX], *cp, **subs = NULL;
442   int nsubs, i;
443   mtev_log_stream_t feed;
444   jlog_ctx *jctx = NULL;
445   xmlDocPtr doc = NULL;
446   xmlNodePtr root = NULL, subnodes;
447
448   feed = mtev_log_stream_find("feed");
449   if(!feed) { err = "cannot find feed"; goto error; }
450
451   jpath_with_sub = mtev_log_stream_get_path(feed);
452   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
453   cp = strchr(jlogpath, '(');
454   if(cp) *cp = '\0';
455
456   jctx = jlog_new(jlogpath);
457   if((nsubs = jlog_ctx_list_subscribers(jctx, &subs)) == -1) {
458     err = jlog_ctx_err_string(jctx);
459     goto error;
460   }
461
462   doc = xmlNewDoc((xmlChar *)"1.0");
463   root = xmlNewDocNode(doc, NULL, (xmlChar *)"feed", NULL);
464   xmlDocSetRootElement(doc, root);
465
466   subnodes = xmlNewNode(NULL, (xmlChar *)"subscribers");
467   for(i=0; i<nsubs; i++) {
468     xmlNewChild(subnodes, NULL, (xmlChar *)"subscriber", (xmlChar *)subs[i]);
469   }
470   xmlAddChild(root, subnodes);
471
472   mtev_http_response_ok(restc->http_ctx, "text/xml");
473   mtev_http_response_xml(restc->http_ctx, doc);
474   mtev_http_response_end(restc->http_ctx);
475   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
476   xmlFreeDoc(doc);
477   jlog_ctx_close(jctx);
478   return 0;
479
480  error:
481   if(doc) xmlFreeDoc(doc);
482   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
483   mtev_http_response_server_error(ctx, "text/plain");
484   mtev_http_response_append(ctx, err, strlen(err));
485   mtev_http_response_end(ctx);
486   if(jctx) jlog_ctx_close(jctx);
487   return 0;
488 }
489
490 static int rest_delete_feed(mtev_http_rest_closure_t *restc,
491                             int npats, char **pats) {
492   mtev_http_session_ctx *ctx = restc->http_ctx;
493   const char *err = "unknown error";
494   const char *jpath_with_sub;
495   char jlogpath[PATH_MAX], *cp;
496   int rv;
497   mtev_log_stream_t feed;
498   jlog_ctx *jctx;
499
500   feed = mtev_log_stream_find("feed");
501   if(!feed) { err = "cannot find feed"; goto error; }
502
503   jpath_with_sub = mtev_log_stream_get_path(feed);
504   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
505   cp = strchr(jlogpath, '(');
506   if(cp) *cp = '\0';
507
508   jctx = jlog_new(jlogpath);
509   rv = jlog_ctx_remove_subscriber(jctx, pats[0]);
510   jlog_ctx_close(jctx);
511   if(rv < 0) {
512     err = jlog_ctx_err_string(jctx);
513     goto error;
514   }
515
516   /* removed or note, we should do a sweeping cleanup */
517   jlog_clean(jlogpath);
518
519   if(rv == 0) {
520     mtev_http_response_not_found(ctx, "text/plain");
521     mtev_http_response_end(ctx);
522     return 0;
523   }
524
525   mtev_http_response_standard(ctx, 204, "OK", "text/plain");
526   mtev_http_response_end(ctx);
527   return 0;
528
529  error:
530   mtev_http_response_server_error(ctx, "text/plain");
531   mtev_http_response_append(ctx, err, strlen(err));
532   mtev_http_response_end(ctx);
533   return 0;
534 }
535
Note: See TracBrowser for help on using the browser.