root/src/noit_jlog_listener.c

Revision d53e2c10a322e925b681be524e66a878d42818f1, 20.1 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 4 months ago)

Add Jlog Listener Default Value Configuration

Add the ability to configure the Jlog listener's default values. Changed
the MAX_LINES_AT_ONCE default value from 1000 to 10000.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007-2011, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <libxml/parser.h>
35 #include <libxml/tree.h>
36 #include <libxml/xpath.h>
37
38 #include <mtev_defines.h>
39 #include <eventer/eventer.h>
40 #include <mtev_listener.h>
41 #include <mtev_hash.h>
42 #include <mtev_memory.h>
43 #include <mtev_rest.h>
44 #include <mtev_conf.h>
45
46 #include <jlog.h>
47 #include <jlog_private.h>
48 #include "noit_mtev_bridge.h"
49 #include "noit_jlog_listener.h"
50
51 #include <unistd.h>
52 #include <poll.h>
53 #include <assert.h>
54
55 static int MAX_ROWS_AT_ONCE = 10000;
56 static int DEFAULT_MSECONDS_BETWEEN_BATCHES = 10000;
57 static int DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES = 500;
58
59 static mtev_atomic32_t tmpfeedcounter = 0;
60
61 static int rest_show_feed(mtev_http_rest_closure_t *restc,
62                           int npats, char **pats);
63 static int rest_delete_feed(mtev_http_rest_closure_t *restc,
64                             int npats, char **pats);
65 static int rest_add_feed(mtev_http_rest_closure_t *restc,
66                          int npats, char **pats);
67
68 void
69 noit_jlog_listener_init() {
70   xmlNodePtr node;
71   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
72   mtev_control_dispatch_delegate(mtev_control_dispatch,
73                                  NOIT_JLOG_DATA_FEED,
74                                  noit_jlog_handler);
75   mtev_control_dispatch_delegate(mtev_control_dispatch,
76                                  NOIT_JLOG_DATA_TEMP_FEED,
77                                  noit_jlog_handler);
78   node = mtev_conf_get_section(NULL, "//logs");
79   if (node) {
80     mtev_conf_get_int(node, "//jlog/max_msg_batch_lines", &MAX_ROWS_AT_ONCE);
81     mtev_conf_get_int(node, "//jlog/default_mseconds_between_batches", &DEFAULT_MSECONDS_BETWEEN_BATCHES);
82     mtev_conf_get_int(node, "//jlog/default_transient_mseconds_between_batches", &DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES);
83   }
84   assert(mtev_http_rest_register_auth(
85     "GET", "/", "^feed$",
86     rest_show_feed, mtev_http_rest_client_cert_auth
87   ) == 0);
88   assert(mtev_http_rest_register_auth(
89     "DELETE", "/feed/", "^(.+)$",
90     rest_delete_feed, mtev_http_rest_client_cert_auth
91   ) == 0);
92   assert(mtev_http_rest_register_auth(
93     "PUT", "/", "^feed$",
94     rest_add_feed, mtev_http_rest_client_cert_auth
95   ) == 0);
96 }
97
98 typedef struct {
99   jlog_ctx *jlog;
100   char *subscriber;
101   jlog_id chkpt;
102   jlog_id start;
103   jlog_id finish;
104   jlog_feed_stats_t *feed_stats;
105   int count;
106   int wants_shutdown;
107 } noit_jlog_closure_t;
108
109 noit_jlog_closure_t *
110 noit_jlog_closure_alloc(void) {
111   noit_jlog_closure_t *jcl;
112   jcl = calloc(1, sizeof(*jcl));
113   return jcl;
114 }
115
116 void
117 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
118   if(jcl->jlog) {
119     if(jcl->subscriber) {
120       if(jcl->subscriber[0] == '~')
121         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
122       free(jcl->subscriber);
123     }
124     jlog_ctx_close(jcl->jlog);
125   }
126   free(jcl);
127 }
128
129 static mtev_hash_table feed_stats = MTEV_HASH_EMPTY;
130
131 jlog_feed_stats_t *
132 noit_jlog_feed_stats(const char *sub) {
133   void *vs = NULL;
134   jlog_feed_stats_t *s = NULL;
135   if(mtev_hash_retrieve(&feed_stats, sub, strlen(sub), &vs))
136     return (jlog_feed_stats_t *)vs;
137   s = calloc(1, sizeof(*s));
138   s->feed_name = strdup(sub);
139   mtev_hash_store(&feed_stats, s->feed_name, strlen(s->feed_name), s);
140   return s;
141 }
142 int
143 noit_jlog_foreach_feed_stats(int (*f)(jlog_feed_stats_t *, void *),
144                              void *c) {
145   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
146   const char *key;
147   int klen, cnt = 0;
148   void *vs;
149   while(mtev_hash_next(&feed_stats, &iter, &key, &klen, &vs)) {
150     cnt += f((jlog_feed_stats_t *)vs, c);
151   }
152   return cnt;
153 }
154 static int
155 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
156   int w, sofar = 0;
157   while(l > sofar) {
158     w = e->opset->write(e->fd, (char *)b + sofar, l - sofar, mask, e);
159     if(w <= 0) return w;
160     sofar += w;
161   }
162   return sofar;
163 }
164 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
165
166 static int
167 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
168   jlog_message msg;
169   int mask;
170   u_int32_t n_count;
171   n_count = htonl(jcl->count);
172   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
173     return -1;
174   while(jcl->count > 0) {
175     int rv;
176     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
177     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
178       return -1;
179
180     /* Here we actually push the message */
181     payload.chkpt.log = htonl(jcl->start.log);
182     payload.chkpt.marker = htonl(jcl->start.marker);
183     payload.n_sec  = htonl(msg.header->tv_sec);
184     payload.n_usec = htonl(msg.header->tv_usec);
185     payload.n_len  = htonl(msg.mess_len);
186     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
187       mtevL(noit_error, "Error writing jlog header over SSL %d != %d\n",
188             rv, (int)sizeof(payload));
189       return -1;
190     }
191     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
192       mtevL(noit_error, "Error writing jlog message over SSL %d != %d\n",
193             rv, msg.mess_len);
194       return -1;
195     }
196     /* Note what the client must checkpoint */
197     jcl->chkpt = jcl->start;
198
199     JLOG_ID_ADVANCE(&jcl->start);
200     jcl->count--;
201   }
202   return 0;
203 }
204
205 void *
206 noit_jlog_thread_main(void *e_vptr) {
207   int mask, bytes_read, sleeptime, max_sleeptime;
208   eventer_t e = e_vptr;
209   acceptor_closure_t *ac = e->closure;
210   noit_jlog_closure_t *jcl = ac->service_ctx;
211   char inbuff[sizeof(jlog_id)];
212
213   mtev_memory_init_thread();
214   eventer_set_fd_blocking(e->fd);
215
216   max_sleeptime = DEFAULT_MSECONDS_BETWEEN_BATCHES;
217   if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
218     max_sleeptime = DEFAULT_TRANSIENT_MSECONDS_BETWEEN_BATCHES;
219
220   sleeptime = max_sleeptime;
221   while(1) {
222     jlog_id client_chkpt;
223     sleeptime = MIN(sleeptime, max_sleeptime);
224     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
225     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
226     if(jcl->count < 0) {
227       char idxfile[PATH_MAX];
228       mtevL(noit_error, "jlog_ctx_read_interval: %s\n",
229             jlog_ctx_err_string(jcl->jlog));
230       switch (jlog_ctx_err(jcl->jlog)) {
231         case JLOG_ERR_FILE_CORRUPT:
232         case JLOG_ERR_IDX_CORRUPT:
233           jlog_repair_datafile(jcl->jlog, jcl->start.log);
234           jlog_repair_datafile(jcl->jlog, jcl->start.log + 1);
235           mtevL(noit_error,
236                 "jlog reconstructed, deleting corresponding index.\n");
237           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log);
238           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
239           unlink(idxfile);
240           STRSETDATAFILE(jcl->jlog, idxfile, jcl->start.log + 1);
241           strlcat(idxfile, INDEX_EXT, sizeof(idxfile));
242           unlink(idxfile);
243           goto alldone;
244           break;
245         default:
246           goto alldone;
247       }
248     }
249     if(jcl->count > MAX_ROWS_AT_ONCE) {
250       /* Artificially set down the range to make the batches a bit easier
251        * to handle on the stratcond/postgres end.
252        * However, we must have more data, so drop the sleeptime to 0
253        */
254       jcl->count = MAX_ROWS_AT_ONCE;
255       jcl->finish.marker = jcl->start.marker + jcl->count;
256     }
257     if(jcl->count > 0) {
258       sleeptime = 0;
259       if(noit_jlog_push(e, jcl)) {
260         goto alldone;
261       }
262       /* Read our jlog_id accounting for possibly short reads */
263       bytes_read = 0;
264       while(bytes_read < sizeof(jlog_id)) {
265         int len;
266         if((len = e->opset->read(e->fd, inbuff + bytes_read,
267                                  sizeof(jlog_id) - bytes_read,
268                                  &mask, e)) <= 0)
269           goto alldone;
270         bytes_read += len;
271       }
272       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
273       /* Fix the endian */
274       client_chkpt.log = ntohl(client_chkpt.log);
275       client_chkpt.marker = ntohl(client_chkpt.marker);
276  
277       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
278         mtevL(noit_error,
279               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
280               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
281               jcl->chkpt.log, jcl->chkpt.marker);
282         goto alldone;
283       }
284       gettimeofday(&jcl->feed_stats->last_checkpoint, NULL);
285       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
286     }
287     else {
288       /* we have nothing to write -- maybe we have no checks configured...
289        * If this is the case "forever", the remote might disconnect and
290        * we would never know. Do the painful work of detecting a
291        * disconnected client.
292        */
293       struct pollfd pfd;
294       pfd.fd = e->fd;
295       pfd.events = POLLIN | POLLHUP | POLLRDNORM;
296       pfd.revents = 0;
297       if(poll(&pfd, 1, 0) != 0) {
298         /* normally, we'd recv PEEK|DONTWAIT.  However, the client should
299          * not be writing to us.  So, we know we can't have any legitimate
300          * data on this socket (true even though this is SSL). So, if we're
301          * here then "shit went wrong"
302          */
303         mtevL(noit_error, "jlog client %s disconnected while idle\n",
304               ac->remote_cn);
305         goto alldone;
306       }
307     }
308     if(sleeptime) {
309       usleep(sleeptime * 1000); /* us -> ms */
310     }
311     sleeptime += 1000; /* 1 s */
312   }
313
314  alldone:
315   e->opset->close(e->fd, &mask, e);
316   mtev_atomic_dec32(&jcl->feed_stats->connections);
317   noit_jlog_closure_free(jcl);
318   acceptor_closure_free(ac);
319   mtev_memory_maintenance();
320   return NULL;
321 }
322
323 int
324 noit_jlog_handler(eventer_t e, int mask, void *closure,
325                      struct timeval *now) {
326   eventer_t newe;
327   pthread_t tid;
328   pthread_attr_t tattr;
329   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
330   acceptor_closure_t *ac = closure;
331   noit_jlog_closure_t *jcl = ac->service_ctx;
332   char errbuff[256];
333   const char *errstr = "unknown error";
334
335   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
336     int len, nlen;
337 socket_error:
338     /* Exceptions cause us to simply snip the connection */
339     len = strlen(errstr);
340     nlen = htonl(0 - len);
341     e->opset->write(e->fd, &nlen, sizeof(nlen), &newmask, e);
342     e->opset->write(e->fd, errstr, strlen(errstr), &newmask, e);
343     eventer_remove_fd(e->fd);
344     e->opset->close(e->fd, &newmask, e);
345     if(jcl) noit_jlog_closure_free(jcl);
346     acceptor_closure_free(ac);
347     return 0;
348   }
349
350   if(!ac->service_ctx) {
351     mtev_log_stream_t ls;
352     const char *logname, *type;
353     int first_attempt = 1;
354     char path[PATH_MAX], subscriber[256], *sub;
355     jcl = ac->service_ctx = noit_jlog_closure_alloc();
356     if(!mtev_hash_retr_str(ac->config,
357                            "log_transit_feed_name",
358                            strlen("log_transit_feed_name"),
359                            &logname)) {
360       errstr = "No 'log_transit_feed_name' specified in log_transit.";
361       mtevL(noit_error, "%s\n", errstr);
362       goto socket_error;
363     }
364     ls = mtev_log_stream_find(logname);
365     if(!ls) {
366       snprintf(errbuff, sizeof(errbuff),
367                "Could not find log '%s' for log_transit.", logname);
368       errstr = errbuff;
369       mtevL(noit_error, "%s\n", errstr);
370       goto socket_error;
371     }
372     type = mtev_log_stream_get_type(ls);
373     if(!type || strcmp(type, "jlog")) {
374       snprintf(errbuff, sizeof(errbuff),
375                "Log '%s' for log_transit is not a jlog.", logname);
376       errstr = errbuff;
377       mtevL(noit_error, "%s\n", errstr);
378       goto socket_error;
379     }
380     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
381       if(!ac->remote_cn) {
382         errstr = "jlog transit started to unidentified party.";
383         mtevL(noit_error, "%s\n", errstr);
384         goto socket_error;
385       }
386       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
387       jcl->feed_stats = noit_jlog_feed_stats(subscriber);
388     }
389     else {
390       jcl->feed_stats = noit_jlog_feed_stats("~");
391       snprintf(subscriber, sizeof(subscriber),
392                "~%07d", mtev_atomic_inc32(&tmpfeedcounter));
393     }
394     jcl->subscriber = strdup(subscriber);
395
396     strlcpy(path, mtev_log_stream_get_path(ls), sizeof(path));
397     sub = strchr(path, '(');
398     if(sub) {
399       char *esub = strchr(sub, ')');
400       if(esub) {
401         *esub = '\0';
402         *sub++ = '\0';
403       }
404     }
405
406     jcl->jlog = jlog_new(path);
407     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) {
408  add_sub:
409       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1) {
410         snprintf(errbuff, sizeof(errbuff),
411                  "jlog reader[%s] error: %s", jcl->subscriber,
412                  jlog_ctx_err_string(jcl->jlog));
413         errstr = errbuff;
414         mtevL(noit_error, "%s\n", errstr);
415       }
416     }
417     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
418       if(sub && !strcmp(sub, "*")) {
419         if(first_attempt) {
420           jlog_ctx_close(jcl->jlog);
421           jcl->jlog = jlog_new(path);
422           first_attempt = 0;
423           goto add_sub;
424         }
425       }
426       snprintf(errbuff, sizeof(errbuff),
427                "jlog reader[%s] error: %s", jcl->subscriber,
428                jlog_ctx_err_string(jcl->jlog));
429       errstr = errbuff;
430       mtevL(noit_error, "%s\n", errstr);
431       goto socket_error;
432     }
433   }
434
435   /* The jlog stuff is disk I/O and can block us.
436    * We'll create a new thread to just handle this connection.
437    */
438   eventer_remove_fd(e->fd);
439   newe = eventer_alloc();
440   memcpy(newe, e, sizeof(*e));
441   pthread_attr_init(&tattr);
442   pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);
443   gettimeofday(&jcl->feed_stats->last_connection, NULL);
444   mtev_atomic_inc32(&jcl->feed_stats->connections);
445   if(pthread_create(&tid, &tattr, noit_jlog_thread_main, newe) == 0) {
446     return 0;
447   }
448
449   /* Undo our dup */
450   eventer_free(newe);
451   /* Creating the thread failed, close it down and deschedule. */
452   e->opset->close(e->fd, &newmask, e);
453   return 0;
454 }
455
456 static int rest_show_feed(mtev_http_rest_closure_t *restc,
457                           int npats, char **pats) {
458   mtev_http_session_ctx *ctx = restc->http_ctx;
459   const char *err = "unknown error";
460   const char *jpath_with_sub;
461   char jlogpath[PATH_MAX], *cp, **subs = NULL;
462   int nsubs, i;
463   mtev_log_stream_t feed;
464   jlog_ctx *jctx = NULL;
465   xmlDocPtr doc = NULL;
466   xmlNodePtr root = NULL, subnodes;
467
468   feed = mtev_log_stream_find("feed");
469   if(!feed) { err = "cannot find feed"; goto error; }
470
471   jpath_with_sub = mtev_log_stream_get_path(feed);
472   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
473   cp = strchr(jlogpath, '(');
474   if(cp) *cp = '\0';
475
476   jctx = jlog_new(jlogpath);
477   if((nsubs = jlog_ctx_list_subscribers(jctx, &subs)) == -1) {
478     err = jlog_ctx_err_string(jctx);
479     goto error;
480   }
481
482   doc = xmlNewDoc((xmlChar *)"1.0");
483   root = xmlNewDocNode(doc, NULL, (xmlChar *)"feed", NULL);
484   xmlDocSetRootElement(doc, root);
485
486   subnodes = xmlNewNode(NULL, (xmlChar *)"subscribers");
487   for(i=0; i<nsubs; i++) {
488     xmlNewChild(subnodes, NULL, (xmlChar *)"subscriber", (xmlChar *)subs[i]);
489   }
490   xmlAddChild(root, subnodes);
491
492   mtev_http_response_ok(restc->http_ctx, "text/xml");
493   mtev_http_response_xml(restc->http_ctx, doc);
494   mtev_http_response_end(restc->http_ctx);
495   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
496   xmlFreeDoc(doc);
497   jlog_ctx_close(jctx);
498   return 0;
499
500  error:
501   if(doc) xmlFreeDoc(doc);
502   if(subs) jlog_ctx_list_subscribers_dispose(jctx, subs);
503   mtev_http_response_server_error(ctx, "text/plain");
504   mtev_http_response_append(ctx, err, strlen(err));
505   mtev_http_response_end(ctx);
506   if(jctx) jlog_ctx_close(jctx);
507   return 0;
508 }
509
510 static int rest_delete_feed(mtev_http_rest_closure_t *restc,
511                             int npats, char **pats) {
512   mtev_http_session_ctx *ctx = restc->http_ctx;
513   const char *err = "unknown error";
514   const char *jpath_with_sub;
515   char jlogpath[PATH_MAX], *cp;
516   int rv;
517   mtev_log_stream_t feed;
518   jlog_ctx *jctx;
519
520   feed = mtev_log_stream_find("feed");
521   if(!feed) { err = "cannot find feed"; goto error; }
522
523   jpath_with_sub = mtev_log_stream_get_path(feed);
524   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
525   cp = strchr(jlogpath, '(');
526   if(cp) *cp = '\0';
527
528   jctx = jlog_new(jlogpath);
529   rv = jlog_ctx_remove_subscriber(jctx, pats[0]);
530   jlog_ctx_close(jctx);
531   if(rv < 0) {
532     err = jlog_ctx_err_string(jctx);
533     goto error;
534   }
535
536   /* removed or note, we should do a sweeping cleanup */
537   jlog_clean(jlogpath);
538
539   if(rv == 0) {
540     mtev_http_response_not_found(ctx, "text/plain");
541     mtev_http_response_end(ctx);
542     return 0;
543   }
544
545   mtev_http_response_standard(ctx, 204, "OK", "text/plain");
546   mtev_http_response_end(ctx);
547   return 0;
548
549  error:
550   mtev_http_response_server_error(ctx, "text/plain");
551   mtev_http_response_append(ctx, err, strlen(err));
552   mtev_http_response_end(ctx);
553   return 0;
554 }
555
556 static int rest_add_feed(mtev_http_rest_closure_t *restc,
557                          int npats, char **pats) {
558   mtev_http_session_ctx *ctx = restc->http_ctx;
559   xmlXPathObjectPtr pobj = NULL;
560   xmlDocPtr doc = NULL, indoc = NULL;
561   xmlNodePtr node, root;
562   acceptor_closure_t *ac = restc->ac;
563   int error_code = 500, complete = 0, mask = 0, rv;
564   const char *error = "internal error", *logname;
565   char *name, *copy_from;
566   mtev_log_stream_t feed;
567   const char *jpath_with_sub;
568   char jlogpath[PATH_MAX], *cp;
569   jlog_ctx *jctx = NULL;
570   jlog_id chkpt;
571
572   if(npats != 0) goto error;
573
574   indoc = rest_get_xml_upload(restc, &mask, &complete);
575   if(!complete) return mask;
576   if(indoc == NULL) {
577     error = "xml parse error";
578     goto error;
579   }
580   if(!mtev_hash_retr_str(ac->config,
581                          "log_transit_feed_name",
582                          strlen("log_transit_feed_name"),
583                          &logname)) {
584     goto error;
585
586   }
587   feed = mtev_log_stream_find("feed");
588   if(!feed) {
589     error = "couldn't find feed";
590     goto error;
591   }
592
593   jpath_with_sub = mtev_log_stream_get_path(feed);
594   strlcpy(jlogpath, jpath_with_sub, sizeof(jlogpath));
595   cp = strchr(jlogpath, '(');
596   if(cp) *cp = '\0';
597
598   node = xmlDocGetRootElement(indoc);
599   name = (char*)xmlGetProp(node, (xmlChar*)"name");
600   copy_from = (char*)xmlGetProp(node, (xmlChar*)"checkpoint_copy");
601
602   jctx = jlog_new(jlogpath);
603   if (!jctx) {
604     error = "couldn't open logpath";
605     goto error;
606   }
607
608   if (!jlog_get_checkpoint(jctx, name, &chkpt)) {
609     error = "subscriber already exists, can't add";
610     goto error;
611   }
612
613   if (copy_from) {
614     rv = jlog_ctx_add_subscriber_copy_checkpoint(jctx, name, copy_from);
615   }
616   else {
617     rv = jlog_ctx_add_subscriber(jctx, name, JLOG_END);
618   }
619   if (rv == -1) {
620     error = "couldn't add subscriber";
621     goto error;
622   }
623
624   mtev_http_response_ok(restc->http_ctx, "text/xml");
625   mtev_http_response_end(restc->http_ctx);
626   goto cleanup;
627
628  error:
629   mtev_http_response_standard(ctx, error_code, "ERROR", "text/xml");
630   doc = xmlNewDoc((xmlChar *)"1.0");
631   root = xmlNewDocNode(doc, NULL, (xmlChar *)"error", NULL);
632   xmlDocSetRootElement(doc, root);
633   xmlNodeAddContent(root, (xmlChar *)error);
634   mtev_http_response_xml(ctx, doc);
635   mtev_http_response_end(ctx);
636
637  cleanup:
638   if (jctx) {
639     jlog_ctx_close(jctx);
640   }
641   if(pobj) xmlXPathFreeObject(pobj);
642   if(doc) xmlFreeDoc(doc);
643   return 0;
644 }
645
Note: See TracBrowser for help on using the browser.