root/src/noit_jlog_listener.c

Revision 2ff4db5a6730270eb30827e23883ed354c42ddf6, 20.2 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 2 months ago)

Explicitly Initialize Mtev Hash Tables

Rather than using MTEV_HASH_EMPTY or not calling any initialization at
all, explicitly initialize hash tables.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <libxml/parser.h>
35 #include <libxml/tree.h>
36 #include <libxml/xpath.h>
37
38 #include <mtev_defines.h>
39 #include <eventer/eventer.h>
40 #include <mtev_listener.h>
41 #include <mtev_hash.h>
42 #include <mtev_memory.h>
43 #include <mtev_rest.h>
44 #include <mtev_conf.h>
45
46 #include <jlog.h>
47 #include <jlog_private.h>
48 #include "noit_mtev_bridge.h"
49 #include "noit_jlog_listener.h"
50
51 #include <unistd.h>
52 #include <poll.h>
53
54 static int MAX_ROWS_AT_ONCE = 10000;
55 static int DEFAULT_MSECONDS_BETWEEN_BATCHES = 10000;
56 static int DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES = 500;
57
58 static mtev_hash_table feed_stats;
59
60 static mtev_atomic32_t tmpfeedcounter = 0;
61
62 static int rest_show_feed(mtev_http_rest_closure_t *restc,
63                           int npats, char **pats);
64 static int rest_delete_feed(mtev_http_rest_closure_t *restc,
65                             int npats, char **pats);
66 static int rest_add_feed(mtev_http_rest_closure_t *restc,
67                          int npats, char **pats);
68
69 void
70 noit_jlog_listener_init() {
71   xmlNodePtr node;
72   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
73   mtev_control_dispatch_delegate(mtev_control_dispatch,
74                                  NOIT_JLOG_DATA_FEED,
75                                  noit_jlog_handler);
76   mtev_control_dispatch_delegate(mtev_control_dispatch,
77                                  NOIT_JLOG_DATA_TEMP_FEED,
78                                  noit_jlog_handler);
79   node = mtev_conf_get_section(NULL, "//logs");
80   if (node) {
81     mtev_conf_get_int(node, "//jlog/max_msg_batch_lines", &MAX_ROWS_AT_ONCE);
82     mtev_conf_get_int(node, "//jlog/default_mseconds_between_batches", &DEFAULT_MSECONDS_BETWEEN_BATCHES);
83     mtev_conf_get_int(node, "//jlog/default_transient_mseconds_between_batches", &DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES);
84   }
85   mtevAssert(mtev_http_rest_register_auth(
86     "GET", "/", "^feed$",
87     rest_show_feed, mtev_http_rest_client_cert_auth
88   ) == 0);
89   mtevAssert(mtev_http_rest_register_auth(
90     "DELETE", "/feed/", "^(.+)$",
91     rest_delete_feed, mtev_http_rest_client_cert_auth
92   ) == 0);
93   mtevAssert(mtev_http_rest_register_auth(
94     "PUT", "/", "^feed$",
95     rest_add_feed, mtev_http_rest_client_cert_auth
96   ) == 0);
97 }
98
99 typedef struct {
100   jlog_ctx *jlog;
101   char *subscriber;
102   jlog_id chkpt;
103   jlog_id start;
104   jlog_id finish;
105   jlog_feed_stats_t *feed_stats;
106   int count;
107   int wants_shutdown;
108 } noit_jlog_closure_t;
109
110 noit_jlog_closure_t *
111 noit_jlog_closure_alloc(void) {
112   noit_jlog_closure_t *jcl;
113   jcl = calloc(1, sizeof(*jcl));
114   return jcl;
115 }
116
117 void
118 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
119   if(jcl->jlog) {
120     if(jcl->subscriber) {
121       if(jcl->subscriber[0] == '~')
122         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
123       free(jcl->subscriber);
124     }
125     jlog_ctx_close(jcl->jlog);
126   }
127   free(jcl);
128 }
129
130 jlog_feed_stats_t *
131 noit_jlog_feed_stats(const char *sub) {
132   void *vs = NULL;
133   jlog_feed_stats_t *s = NULL;
134   if(mtev_hash_retrieve(&feed_stats, sub, strlen(sub), &vs))
135     return (jlog_feed_stats_t *)vs;
136   s = calloc(1, sizeof(*s));
137   s->feed_name = strdup(sub);
138   mtev_hash_store(&feed_stats, s->feed_name, strlen(s->feed_name), s);
139   return s;
140 }
141 int
142 noit_jlog_foreach_feed_stats(int (*f)(jlog_feed_stats_t *, void *),
143                              void *c) {
144   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
145   const char *key;
146   int klen, cnt = 0;
147   void *vs;
148   while(mtev_hash_next(&feed_stats, &iter, &key, &klen, &vs)) {
149     cnt += f((jlog_feed_stats_t *)vs, c);
150   }
151   return cnt;
152 }
153 static int
154 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
155   int w, sofar = 0;
156   while(l > sofar) {
157     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
158     if(w <= 0) return w;
159     sofar += w;
160   }
161   return sofar;
162 }
163 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
164
165 static int
166 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
167   jlog_message msg;
168   int mask;
169   u_int32_t n_count;
170   n_count = htonl(jcl->count);
171   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
172     return -1;
173   while(jcl->count > 0) {
174     int rv;
175     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
176     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
177       return -1;
178
179     /* Here we actually push the message */
180     payload.chkpt.log = htonl(jcl->start.log);
181     payload.chkpt.marker = htonl(jcl->start.marker);
182     payload.n_sec  = htonl(msg.header->tv_sec);
183     payload.n_usec = htonl(msg.header->tv_usec);
184     payload.n_len  = htonl(msg.mess_len);
185     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
186       mtevL(noit_error, "Error writing jlog header over SSL %d != %d\n",
187             rv, (int)sizeof(payload));
188       return -1;
189     }
190     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
191       mtevL(noit_error, "Error writing jlog message over SSL %d != %d\n",
192             rv, msg.mess_len);
193       return -1;
194     }
195     /* Note what the client must checkpoint */
196     jcl->chkpt = jcl->start;
197
198     JLOG_ID_ADVANCE(&jcl->start);
199     jcl->count--;
200   }
201   return 0;
202 }
203
204 void *
205 noit_jlog_thread_main(void *e_vptr) {
206   int mask, bytes_read, sleeptime, max_sleeptime;
207   eventer_t e = e_vptr;
208   acceptor_closure_t *ac = e->closure;
209   noit_jlog_closure_t *jcl = ac->service_ctx;
210   char inbuff[sizeof(jlog_id)];
211
212   mtev_memory_init_thread();
213   eventer_set_fd_blocking(e->fd);
214
215   max_sleeptime = DEFAULT_MSECONDS_BETWEEN_BATCHES;
216   if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
217     max_sleeptime = DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES;
218
219   sleeptime = max_sleeptime;
220   while(1) {
221     jlog_id client_chkpt;
222     sleeptime = MIN(sleeptime, max_sleeptime);
223     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
224     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
225     if(jcl->count < 0) {
226       char idxfile[PATH_MAX];
227       mtevL(noit_error, "jlog_ctx_read_interval: %s\n",
228             jlog_ctx_err_string(jcl->jlog));
229       switch (jlog_ctx_err(jcl->jlog)) {
230         case JLOG_ERR_FILE_CORRUPT:
231         case JLOG_ERR_IDX_CORRUPT:
232           jlog_repair_datafile(jcl->jlog, jcl->start.log);
233           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
234           mtevL(noit_error,
235                 "jlog reconstructed, deleting corresponding index.\n");
236           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
237           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
238           unlink(idxfile);
239           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
240           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
241           unlink(idxfile);
242           goto alldone;
243           break;
244         default:
245           goto alldone;
246       }
247     }
248     if(jcl->count > MAX_ROWS_AT_ONCE) {
249       /* Artificially set down the range to make the batches a bit easier
250        * to handle on the stratcond/postgres end.
251        * However, we must have more data, so drop the sleeptime to 0
252        */
253       jcl->count = MAX_ROWS_AT_ONCE;
254       jcl->finish.marker = jcl->start.marker + jcl->count;
255     }
256     if(jcl->count > 0) {
257       sleeptime = 0;
258       if(noit_jlog_push(e, jcl)) {
259         goto alldone;
260       }
261       /* Read our jlog_id accounting for possibly short reads */
262       bytes_read = 0;
263       while(bytes_read < sizeof(jlog_id)) {
264         int len;
265         if((len = e->opset->read(e->fd, inbuff + bytes_read,
266                                  sizeof(jlog_id) - bytes_read,
267                                  &mask, e)) <= 0)
268           goto alldone;
269         bytes_read += len;
270       }
271       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
272       /* Fix the endian */
273       client_chkpt.log = ntohl(client_chkpt.log);
274       client_chkpt.marker = ntohl(client_chkpt.marker);
275  
276       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
277         mtevL(noit_error,
278               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
279               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
280               jcl->chkpt.log, jcl->chkpt.marker);
281         goto alldone;
282       }
283       gettimeofday(&jcl->feed_stats->last_checkpoint, NULL);
284       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
285     }
286     else {
287       /* we have nothing to write -- maybe we have no checks configured...
288        * If this is the case "forever", the remote might disconnect and
289        * we would never know. Do the painful work of detecting a
290        * disconnected client.
291        */
292       struct pollfd pfd;
293       pfd.fd = e->fd;
294       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
295       pfd.revents = 0;
296       if(poll(&pfd, 1, 0) != 0) {
297         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
298          * not be writing to us.  So, we know we can't have any legitimate
299          * data on this socket (true even though this is SSL). So, if we're
300          * here then "shit went wrong"
301          */
302         mtevL(noit_error, "jlog client %s disconnected while idle\n",
303               ac->remote_cn);
304         goto alldone;
305       }
306     }
307     if(sleeptime) {
308       usleep(sleeptime * 1000); /* us -> ms */
309     }
310     sleeptime += 1000; /* 1 s */
311   }
312
313  alldone:
314   e->opset->close(e->fd, &mask, e);
315   mtev_atomic_dec32(&jcl->feed_stats->connections);
316   noit_jlog_closure_free(jcl);
317   acceptor_closure_free(ac);
318   mtev_memory_maintenance();
319   return NULL;
320 }
321
322 int
323 noit_jlog_handler(eventer_t e, int mask, void *closure,
324                      struct timeval *now) {
325   eventer_t newe;
326   pthread_t tid;
327   pthread_attr_t tattr;
328   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
329   acceptor_closure_t *ac = closure;
330   noit_jlog_closure_t *jcl = ac->service_ctx;
331   char errbuff[256];
332   const char *errstr = "unknown error";
333
334   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
335     int len, nlen;
336 socket_error:
337     /* Exceptions cause us to simply snip the connection */
338     len = strlen(errstr);
339     nlen = htonl(0 - len);
340     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
341     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
342     eventer_remove_fd(e->fd);
343     e->opset->close(e->fd, &newmask, e);
344     if(jcl) noit_jlog_closure_free(jcl);
345     acceptor_closure_free(ac);
346     return 0;
347   }
348
349   if(!ac->service_ctx) {
350     mtev_log_stream_t ls;
351     const char *logname, *type;
352     int first_attempt = 1;
353     char path[PATH_MAX], subscriber[256], *sub;
354     jcl = ac->service_ctx = noit_jlog_closure_alloc();
355     if(!mtev_hash_retr_str(ac->config,
356                            "log_transit_feed_name",
357                            strlen("log_transit_feed_name"),
358                            &logname)) {
359       errstr = "No 'log_transit_feed_name' specified in log_transit.";
360       mtevL(noit_error, "%s\n", errstr);
361       goto socket_error;
362     }
363     ls = mtev_log_stream_find(logname);
364     if(!ls) {
365       snprintf(errbuff, sizeof(errbuff),
366                "Could not find log '%s' for log_transit.", logname);
367       errstr = errbuff;
368       mtevL(noit_error, "%s\n", errstr);
369       goto socket_error;
370     }
371     type = mtev_log_stream_get_type(ls);
372     if(!type || strcmp(type, "jlog")) {
373       snprintf(errbuff, sizeof(errbuff),
374                "Log '%s' for log_transit is not a jlog.", logname);
375       errstr = errbuff;
376       mtevL(noit_error, "%s\n", errstr);
377       goto socket_error;
378     }
379     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
380       if(!ac->remote_cn) {
381         errstr = "jlog transit started to unidentified party.";
382         mtevL(noit_error, "%s\n", errstr);
383         goto socket_error;
384       }
385       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
386       jcl->feed_stats = noit_jlog_feed_stats(subscriber);
387     }
388     else {
389       jcl->feed_stats = noit_jlog_feed_stats("~");
390       snprintf(subscriber, sizeof(subscriber),
391                "~%07d", mtev_atomic_inc32(&tmpfeedcounter));
392     }
393     jcl->subscriber = strdup(subscriber);
394
395     strlcpy(path, mtev_log_stream_get_path(ls), sizeof(path));
396     sub = strchr(path, '(');
397     if(sub) {
398       char *esub = strchr(sub, ')');
399       if(esub) {
400         *esub = '\0';
401         *sub++ = '\0';
402       }
403     }
404
405     jcl->jlog = jlog_new(path);
406     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) {
407  add_sub:
408       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
409         snprintf(errbuff, sizeof(errbuff),
410                  "jlog reader[%s] error: %s", jcl->subscriber,
411                  jlog_ctx_err_string(jcl->jlog));
412         errstr = errbuff;
413         mtevL(noit_error, "%s\n", errstr);
414       }
415     }
416     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
417       if(sub && !strcmp(sub, "*")) {
418         if(first_attempt) {
419           jlog_ctx_close(jcl->jlog);
420           jcl->jlog = jlog_new(path);
421           first_attempt = 0;
422           goto add_sub;
423         }
424       }
425       snprintf(errbuff, sizeof(errbuff),
426                "jlog reader[%s] error: %s", jcl->subscriber,
427                jlog_ctx_err_string(jcl->jlog));
428       errstr = errbuff;
429       mtevL(noit_error, "%s\n", errstr);
430       goto socket_error;
431     }
432   }
433
434   /* The jlog stuff is disk I/O and can block us.
435    * We'll create a new thread to just handle this connection.
436    */
437   eventer_remove_fd(e->fd);
438   newe = eventer_alloc();
439   memcpy(newe, e, sizeof(*e));
440   pthread_attr_init(&tattr);
441   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
442   gettimeofday(&jcl->feed_stats->last_connection, NULL);
443   mtev_atomic_inc32(&jcl->feed_stats->connections);
444   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
445     return 0;
446   }
447
448   /* Undo our dup */
449   eventer_free(newe);
450   /* Creating the thread failed, close it down and deschedule. */
451   e->opset->close(e->fd, &newmask, e);
452   return 0;
453 }
454
455 static int rest_show_feed(mtev_http_rest_closure_t *restc,
456                           int npats, char **pats) {
457   mtev_http_session_ctx *ctx = restc->http_ctx;
458   const char *err = "unknown error";
459   const char *jpath_with_sub;
460   char jlogpath[PATH_MAX], *cp, **subs = NULL;
461   int nsubs, i;
462   mtev_log_stream_t feed;
463   jlog_ctx *jctx = NULL;
464   xmlDocPtr doc = NULL;
465   xmlNodePtr root = NULL, subnodes;
466
467   feed = mtev_log_stream_find("feed");
468   if(!feed) { err = "cannot find feed"; goto error; }
469
470   jpath_with_sub = mtev_log_stream_get_path(feed);
471   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
472   cp = strchr(jlogpath, '(');
473   if(cp) *cp = '\0';
474
475   jctx = jlog_new(jlogpath);
476   if((nsubs = jlog_ctx_list_subscribers(jctx, &subs)) == -1) {
477     err = jlog_ctx_err_string(jctx);
478     goto error;
479   }
480
481   doc = xmlNewDoc((xmlChar *)"1.0");
482   root = xmlNewDocNode(doc, NULL, (xmlChar *)"feed", NULL);
483   xmlDocSetRootElement(doc, root);
484
485   subnodes = xmlNewNode(NULL, (xmlChar *)"subscribers");
486   for(i=0; i<nsubs; i++) {
487     xmlNewChild(subnodes, NULL, (xmlChar *)"subscriber", (xmlChar *)subs[i]);
488   }
489   xmlAddChild(root, subnodes);
490
491   mtev_http_response_ok(restc->http_ctx, "text/xml");
492   mtev_http_response_xml(restc->http_ctx, doc);
493   mtev_http_response_end(restc->http_ctx);
494   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
495   xmlFreeDoc(doc);
496   jlog_ctx_close(jctx);
497   return 0;
498
499  error:
500   if(doc) xmlFreeDoc(doc);
501   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
502   mtev_http_response_server_error(ctx, "text/plain");
503   mtev_http_response_append(ctx, err, strlen(err));
504   mtev_http_response_end(ctx);
505   if(jctx) jlog_ctx_close(jctx);
506   return 0;
507 }
508
509 static int rest_delete_feed(mtev_http_rest_closure_t *restc,
510                             int npats, char **pats) {
511   mtev_http_session_ctx *ctx = restc->http_ctx;
512   const char *err = "unknown error";
513   const char *jpath_with_sub;
514   char jlogpath[PATH_MAX], *cp;
515   int rv;
516   mtev_log_stream_t feed;
517   jlog_ctx *jctx;
518
519   feed = mtev_log_stream_find("feed");
520   if(!feed) { err = "cannot find feed"; goto error; }
521
522   jpath_with_sub = mtev_log_stream_get_path(feed);
523   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
524   cp = strchr(jlogpath, '(');
525   if(cp) *cp = '\0';
526
527   jctx = jlog_new(jlogpath);
528   rv = jlog_ctx_remove_subscriber(jctx, pats[0]);
529   jlog_ctx_close(jctx);
530   if(rv < 0) {
531     err = jlog_ctx_err_string(jctx);
532     goto error;
533   }
534
535   /* removed or note, we should do a sweeping cleanup */
536   jlog_clean(jlogpath);
537
538   if(rv == 0) {
539     mtev_http_response_not_found(ctx, "text/plain");
540     mtev_http_response_end(ctx);
541     return 0;
542   }
543
544   mtev_http_response_standard(ctx, 204, "OK", "text/plain");
545   mtev_http_response_end(ctx);
546   return 0;
547
548  error:
549   mtev_http_response_server_error(ctx, "text/plain");
550   mtev_http_response_append(ctx, err, strlen(err));
551   mtev_http_response_end(ctx);
552   return 0;
553 }
554
555 static int rest_add_feed(mtev_http_rest_closure_t *restc,
556                          int npats, char **pats) {
557   mtev_http_session_ctx *ctx = restc->http_ctx;
558   xmlXPathObjectPtr pobj = NULL;
559   xmlDocPtr doc = NULL, indoc = NULL;
560   xmlNodePtr node, root;
561   acceptor_closure_t *ac = restc->ac;
562   int error_code = 500, complete = 0, mask = 0, rv;
563   const char *error = "internal error", *logname;
564   char *name, *copy_from;
565   mtev_log_stream_t feed;
566   const char *jpath_with_sub;
567   char jlogpath[PATH_MAX], *cp;
568   jlog_ctx *jctx = NULL;
569   jlog_id chkpt;
570
571   if(npats != 0) goto error;
572
573   indoc = rest_get_xml_upload(restc, &mask, &complete);
574   if(!complete) return mask;
575   if(indoc == NULL) {
576     error = "xml parse error";
577     goto error;
578   }
579   if(!mtev_hash_retr_str(ac->config,
580                          "log_transit_feed_name",
581                          strlen("log_transit_feed_name"),
582                          &logname)) {
583     goto error;
584
585   }
586   feed = mtev_log_stream_find("feed");
587   if(!feed) {
588     error = "couldn't find feed";
589     goto error;
590   }
591
592   jpath_with_sub = mtev_log_stream_get_path(feed);
593   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
594   cp = strchr(jlogpath, '(');
595   if(cp) *cp = '\0';
596
597   node = xmlDocGetRootElement(indoc);
598   name = (char*)xmlGetProp(node, (xmlChar*)"name");
599   copy_from = (char*)xmlGetProp(node, (xmlChar*)"checkpoint_copy");
600
601   jctx = jlog_new(jlogpath);
602   if (!jctx) {
603     error = "couldn't open logpath";
604     goto error;
605   }
606
607   if (!jlog_get_checkpoint(jctx, name, &chkpt)) {
608     error = "subscriber already exists, can't add";
609     goto error;
610   }
611
612   if (copy_from) {
613     rv = jlog_ctx_add_subscriber_copy_checkpoint(jctx, name, copy_from);
614   }
615   else {
616     rv = jlog_ctx_add_subscriber(jctx, name, JLOG_END);
617   }
618   if (rv == -1) {
619     error = "couldn't add subscriber";
620     goto error;
621   }
622
623   mtev_http_response_ok(restc->http_ctx, "text/xml");
624   mtev_http_response_end(restc->http_ctx);
625   goto cleanup;
626
627  error:
628   mtev_http_response_standard(ctx, error_code, "ERROR", "text/xml");
629   doc = xmlNewDoc((xmlChar *)"1.0");
630   root = xmlNewDocNode(doc, NULL, (xmlChar *)"error", NULL);
631   xmlDocSetRootElement(doc, root);
632   xmlNodeAddContent(root, (xmlChar *)error);
633   mtev_http_response_xml(ctx, doc);
634   mtev_http_response_end(ctx);
635
636  cleanup:
637   if (jctx) {
638     jlog_ctx_close(jctx);
639   }
640   if(pobj) xmlXPathFreeObject(pobj);
641   if(doc) xmlFreeDoc(doc);
642   return 0;
643 }
644
645 void
646 noit_jlog_listener_init_globals(void) {
647   mtev_hash_init(&feed_stats);
648 }
649
Note: See TracBrowser for help on using the browser.