root/src/stratcon_iep.c

Revision d25e46078a2b0f26a7561a53c04d2a00e21150e2, 25.6 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 4 months ago)

Add Filtered Exchange To Stratcon

Added the ability to have a "filtered" exchange in Stratcon. This will
only pass certain checks and metrics through. The checks and/or
metrics required can be specified in XML via a PUT command, which will
then pass the information on to the RabbitMQ and FQ drivers.

Right now, the RabbitMQ driver is not implemented; only the FQ driver
is.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <sys/types.h>
37 #ifdef HAVE_SYS_WAIT_H
38 #include <sys/wait.h>
39 #endif
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <sys/fcntl.h>
44 #ifdef HAVE_SYS_FILIO_H
45 #include <sys/filio.h>
46 #endif
47 #include <signal.h>
48 #include <errno.h>
49 #include <assert.h>
50 #include <libxml/parser.h>
51 #include <libxml/tree.h>
52 #include <libxml/xpath.h>
53
54 #include <eventer/eventer.h>
55 #include <mtev_log.h>
56 #include <mtev_b64.h>
57 #include <mtev_conf.h>
58 #include <mtev_rest.h>
59
60 #include "noit_mtev_bridge.h"
61 #include "noit_jlog_listener.h"
62 #include "stratcon_jlog_streamer.h"
63 #include "stratcon_datastore.h"
64 #include "stratcon_iep.h"
65 #include "noit_check.h"
66
67 eventer_jobq_t iep_jobq;
68 static mtev_log_stream_t noit_iep = NULL;
69 static mtev_log_stream_t noit_iep_debug = NULL;
70 static mtev_spinlock_t iep_conn_cnt = 0;
71
72 static mtev_hash_table mq_drivers = MTEV_HASH_EMPTY;
73 struct driver_thread_data {
74   mq_driver_t *mq_driver;
75   struct iep_thread_driver *driver_data;
76 };
77 struct driver_list {
78   mq_driver_t *mq_driver;
79   pthread_key_t iep_connection;
80   mtev_conf_section_t section;
81   struct driver_list *next;
82 } *drivers;
83
84 static int iep_system_enabled = 1;
85 int stratcon_iep_get_enabled() { return iep_system_enabled; }
86 void stratcon_iep_set_enabled(int n) { iep_system_enabled = n; }
87 static int rest_set_filters(mtev_http_rest_closure_t *restc,
88                             int npats, char **pats);
89
90
91 struct iep_job_closure {
92   char *line;       /* This is a copy and gets trashed during processing */
93   char *remote;
94   char *doc_str;
95   struct timeval start;
96 };
97
98 static void
99 start_iep_daemon();
100
101 static double
102 stratcon_iep_age_from_line(char *data, struct timeval now) {
103   double n, t;
104   if(data && (*data == 'S' || *data == 'M')) {
105     if(data[1] != '\t') return 0;
106     t = strtod(data + 2, NULL);
107     n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0;
108     return n - t;
109   }
110   return 0;
111 }
112
113 struct statement_node {
114   char *id;
115   char *statement;
116   char *provides;
117   int marked; /* helps with identifying cycles */
118   int nrequires;
119   struct statement_node **requires;
120 };
121 static void
122 statement_node_free(void *vstmt) {
123   struct statement_node *stmt = vstmt;
124   if(stmt->id) free(stmt->id);
125   if(stmt->statement) free(stmt->statement);
126   if(stmt->provides) free(stmt->provides);
127   if(stmt->requires) free(stmt->requires);
128 }
129 static void
130 mq_command_free(void *command) {
131   mq_command_t *cmd = (mq_command_t*)command;
132   int i;
133   if (cmd->check.uuid) {
134     xmlFree(cmd->check.uuid);
135   }
136   if (cmd->check.metric_count > 0) {
137     for (i=0; i < cmd->check.metric_count; i++) {
138       if (!cmd->check.metrics[i]) break;
139       xmlFree(cmd->check.metrics[i]);
140     }
141     free(cmd->check.metrics);
142   }
143 }
144 static int
145 stmt_mark_dag(struct statement_node *stmt, int mgen) {
146   int i;
147   assert(stmt->marked <= mgen);
148   if(stmt->marked == mgen) return -1;
149   if(stmt->marked > 0) return 0; /* validated in a previous sweep */
150   stmt->marked = mgen;
151   for(i=0; i<stmt->nrequires; i++)
152     if(stmt_mark_dag(stmt->requires[i], mgen) < 0) return -1;
153   return 0;
154 }
155 static void
156 submit_statement_node(struct statement_node *stmt) {
157   int line_len, i;
158   char *line, *cp;
159
160   if(stmt->marked) return;
161   for(i=0; i<stmt->nrequires; i++)
162     submit_statement_node(stmt->requires[i]);
163
164   line_len = 3 /* 2 tabs + \0 */ +
165              1 /* 'D' */ + 1 /* '\n' */ +
166              strlen(stmt->id) + strlen(stmt->statement);
167   line = malloc(line_len);
168   snprintf(line, line_len, "D\t%s\t%s\n", stmt->id, stmt->statement);
169   cp = line;
170   while(cp[0] && cp[1]) {
171     if(*cp == '\n') *cp = ' ';
172     cp++;
173   }
174   mtevL(noit_iep, "submitting statement: %s\n", line);
175   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
176   stmt->marked = 1;
177 }
178 void stratcon_iep_submit_statements() {
179   int i, cnt = 0;
180   mtev_conf_section_t *statement_configs;
181   char path[256];
182   struct statement_node *stmt;
183   void *vstmt;
184   mtev_hash_table stmt_by_id = MTEV_HASH_EMPTY;
185   mtev_hash_table stmt_by_provider = MTEV_HASH_EMPTY;
186   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
187   const char *key;
188   int klen, mgen = 0;
189
190   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//statement");
191   statement_configs = mtev_conf_get_sections(NULL, path, &cnt);
192   mtevL(noit_debug, "Found %d %s stanzas\n", cnt, path);
193
194   /* Phase 1: sweep in all the statements */
195   for(i=0; i<cnt; i++) {
196     char id[UUID_STR_LEN+1];
197     char provides[256];
198     char *statement;
199
200     if(!mtev_conf_get_stringbuf(statement_configs[i],
201                                 "self::node()/@id",
202                                 id, sizeof(id))) {
203       mtevL(noit_iep, "No uuid specified in query\n");
204       continue;
205     }
206     if(!mtev_conf_get_stringbuf(statement_configs[i],
207                                 "ancestor-or-self::node()/@provides",
208                                 provides, sizeof(provides))) {
209       provides[0] = '\0';
210     }
211     if(!mtev_conf_get_string(statement_configs[i], "self::node()/epl",
212                              &statement)) {
213       mtevL(noit_iep, "No contents specified in statement\n");
214       continue;
215     }
216     stmt = calloc(1, sizeof(*stmt));
217     stmt->id = strdup(id);
218     stmt->statement = statement;
219     stmt->provides = provides[0] ? strdup(provides) : NULL;
220     if(!mtev_hash_store(&stmt_by_id, stmt->id, strlen(stmt->id), stmt)) {
221       mtevL(noit_iep, "Duplicate statement id: %s\n", stmt->id);
222       exit(-1);
223     }
224     if(stmt->provides) {
225       if(!mtev_hash_store(&stmt_by_provider, stmt->provides,
226                           strlen(stmt->provides), stmt)) {
227         mtevL(noit_iep, "Two statements provide: '%s'\n", stmt->provides);
228         exit(-1);
229       }
230     }
231   }
232
233   /* Phase 2: load the requires graph */
234   for(i=0; i<cnt; i++) {
235     char id[UUID_STR_LEN+1];
236     int rcnt, j;
237     char *requires;
238     mtev_conf_section_t *reqs;
239
240     if(!mtev_conf_get_stringbuf(statement_configs[i],
241                                 "self::node()/@id",
242                                 id, sizeof(id))) {
243       mtevL(noit_iep, "No uuid specified in query\n");
244       continue;
245     }
246     if(!mtev_hash_retrieve(&stmt_by_id, id, strlen(id), &vstmt)) {
247       mtevL(noit_iep, "Cannot find statement: %s\n", id);
248       exit(-1);
249     }
250     stmt = vstmt;
251     reqs = mtev_conf_get_sections(statement_configs[i],
252                                   "self::node()/requires", &rcnt);
253     if(rcnt > 0) {
254       stmt->requires = malloc(rcnt * sizeof(*(stmt->requires)));
255       for(j=0; j<rcnt; j++) {
256         void *vrstmt;
257         if(!mtev_conf_get_string(reqs[j], "self::node()",
258                                  &requires) || requires[0] == '\0') {
259           continue;
260         }
261         if(!mtev_hash_retrieve(&stmt_by_provider, requires, strlen(requires),
262                                &vrstmt)) {
263           mtevL(noit_iep,
264                 "Statement %s requires %s which no one provides.\n",
265                 stmt->id, requires);
266           exit(-1);
267         }
268         stmt->requires[stmt->nrequires++] = vrstmt;
269       }
270     }
271   }
272
273   /* Phase 3: Recursive sweep and mark to detect cycles.
274      We're walking the graph backwards here from dependent to provider,
275      but a cycle is a cycle, so this validates the graph. */
276   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
277     stmt = vstmt;
278     if(stmt_mark_dag(stmt, ++mgen) < 0) {
279       mtevL(noit_iep, "Statement %s has a cyclic requirement\n", stmt->id);
280       exit(-1);
281     }
282   }
283
284   /* Phase 4: clean the markings */
285   memset(&iter, 0, sizeof(iter));
286   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
287     stmt = vstmt;
288     stmt->marked = 0;
289   }
290
291   /* Phase 5: do the load */
292   memset(&iter, 0, sizeof(iter));
293   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
294     stmt = vstmt;
295     submit_statement_node(stmt);
296   }
297
298   mtev_hash_destroy(&stmt_by_provider, NULL, NULL);
299   mtev_hash_destroy(&stmt_by_id, NULL, statement_node_free);
300   free(statement_configs);
301 }
302
303 void stratcon_iep_submit_queries() {
304   int i, cnt = 0;
305   mtev_conf_section_t *query_configs;
306   char path[256];
307
308   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//query");
309   query_configs = mtev_conf_get_sections(NULL, path, &cnt);
310   mtevL(noit_debug, "Found %d %s stanzas\n", cnt, path);
311   for(i=0; i<cnt; i++) {
312     char id[UUID_STR_LEN+1];
313     char topic[256];
314     char *query;
315     char *line;
316     int line_len;
317
318     if(!mtev_conf_get_stringbuf(query_configs[i],
319                                 "self::node()/@id",
320                                 id, sizeof(id))) {
321       mtevL(noit_iep, "No uuid specified in query\n");
322       continue;
323     }
324     if(!mtev_conf_get_stringbuf(query_configs[i],
325                                 "ancestor-or-self::node()/@topic",
326                                 topic, sizeof(topic))) {
327       mtevL(noit_iep, "No topic specified in query\n");
328       continue;
329     }
330     if(!mtev_conf_get_string(query_configs[i], "self::node()/epl",
331                              &query)) {
332       mtevL(noit_iep, "No contents specified in query\n");
333       continue;
334     }
335     line_len = 4 /* 3 tabs + \0 */ +
336                1 /* 'Q' */ + 1 /* '\n' */ +
337                strlen(id) + strlen(topic) + strlen(query);
338     line = malloc(line_len);
339     snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query);
340     free(query);
341     query = line;
342     while(query[0] && query[1]) {
343       if(*query == '\n') *query = ' ';
344       query++;
345     }
346     stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
347   }
348   free(query_configs);
349 }
350
351 static struct driver_thread_data *
352 connect_iep_driver(struct driver_list *d) {
353   int rc;
354   struct driver_thread_data *data;
355   data = pthread_getspecific(d->iep_connection);
356   if(!data) {
357     data = calloc(1, sizeof(*data));
358     data->mq_driver = d->mq_driver;
359     pthread_setspecific(d->iep_connection, data);
360   }
361   if(!data->driver_data)
362     data->driver_data = data->mq_driver->allocate(d->section);
363   rc = data->mq_driver->connect(data->driver_data);
364   if(rc < 0) return NULL;
365   if(rc == 0) {
366     /* Initial connect */
367     /* TODO: this should be requested by Esper, not blindly pushed */
368     stratcon_iep_submit_statements();
369     stratcon_datastore_iep_check_preload();
370     stratcon_iep_submit_queries();
371   }
372
373   return data;
374 }
375
376 static int
377 setup_iep_connection_callback(eventer_t e, int mask, void *closure,
378                               struct timeval *now) {
379   mtev_spinlock_unlock(&iep_conn_cnt);
380   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, NULL, NULL);
381   return 0;
382 }
383
384 static void
385 setup_iep_connection_later(int seconds) {
386   eventer_t newe;
387   if(!mtev_spinlock_trylock(&iep_conn_cnt)) return;
388   newe = eventer_alloc();
389   gettimeofday(&newe->whence, NULL);
390   newe->whence.tv_sec += seconds;
391   newe->mask = EVENTER_TIMER;
392   newe->callback = setup_iep_connection_callback;
393   newe->closure = NULL;
394   eventer_add(newe);
395 }
396
397 static int
398 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
399                        struct timeval *now) {
400   double age;
401   struct iep_job_closure *job = closure;
402   char *line;
403   struct timeval diff;
404   /* We only play when it is an asynch event */
405   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
406
407   if(mask & EVENTER_ASYNCH_CLEANUP) {
408     /* free all the memory associated with the batch */
409     if(job) {
410       if(job->line) free(job->line);
411       if(job->remote) free(job->remote);
412       if(job->doc_str) free(job->doc_str);
413       free(job);
414     }
415     return 0;
416   }
417
418   /* If we're greater than 30 seconds old,
419      just quit. */
420   sub_timeval(*now, job->start, &diff);
421   if (diff.tv_sec >= 30) {
422     mtevL(noit_debug, "Skipping event from %s - waiting in eventer for more than 30 seconds\n",
423                         job->remote ? job->remote : "(null)");
424     return 0;
425   }
426
427   if(!job->line || job->line[0] == '\0') return 0;
428
429   if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) {
430     mtevL(noit_debug, "Skipping old event from %s, %f seconds old.\n",
431           job->remote ? job->remote : "(null)", age);
432     return 0;
433   }
434   /* Submit */
435   int line_len = strlen(job->line);
436   int remote_len = strlen(job->remote);
437   const char *toff = strchr(job->line, '\t');
438   int token_off = 2;
439   if(toff) token_off = toff - job->line + 1;
440
441   line = (char*)calloc(line_len + 1 /* \t */ + remote_len + 2, 1);
442   strncpy(line, job->line, token_off);
443   strncat(line, job->remote, remote_len);
444   strncat(line, "\t", 1);
445   strncat(line, job->line + token_off, line_len - token_off);
446   job->doc_str = line;
447
448   for(struct driver_list *d = drivers; d; d = d->next) {
449     struct driver_thread_data *tls = connect_iep_driver(d);
450     if(tls && tls->driver_data) {
451       if(tls->mq_driver->submit(tls->driver_data, job->doc_str,
452                                 line_len + remote_len + 1) != 0) {
453         mtevL(noit_debug, "failed to MQ submit.\n");
454       }
455     }
456   }
457   return 0;
458 }
459
460 void
461 stratcon_iep_line_processor(stratcon_datastore_op_t op,
462                             struct sockaddr *remote, const char *remote_cn,
463                             void *operand, eventer_t completion) {
464   int len;
465   char remote_str[128];
466   struct iep_job_closure *jc;
467   eventer_t newe;
468   /* We only care about inserts */
469
470   if(op == DS_OP_CHKPT) {
471     if(completion) eventer_add(completion);
472     return;
473   }
474   if(op != DS_OP_INSERT) return;
475
476   snprintf(remote_str, sizeof(remote_str), "%s", "0.0.0.0");
477   if(remote) {
478     switch(remote->sa_family) {
479       case AF_INET:
480         len = sizeof(struct sockaddr_in);
481         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
482                   remote_str, len);
483         break;
484       case AF_INET6:
485        len = sizeof(struct sockaddr_in6);
486         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
487                   remote_str, len);
488        break;
489       case AF_UNIX:
490         snprintf(remote_str, sizeof(remote_str), "%s", ((struct sockaddr_un *)remote)->sun_path);
491         break;
492     }
493   }
494
495   /* process operand and push onto queue */
496   newe = eventer_alloc();
497   newe->thr_owner = eventer_choose_owner(0);
498   newe->mask = EVENTER_ASYNCH;
499   newe->callback = stratcon_iep_submitter;
500   jc = calloc(1, sizeof(*jc));
501   jc->line = operand;
502   jc->remote = strdup(remote_str);
503   gettimeofday(&jc->start, NULL);
504   newe->closure = jc;
505
506   eventer_add_asynch(&iep_jobq, newe);
507 }
508
509 static void connection_destroy(void *vd) {
510   struct driver_thread_data *data = vd;
511   data->mq_driver->disconnect(data->driver_data);
512   data->mq_driver->deallocate(data->driver_data);
513   free(data);
514 }
515
516 jlog_streamer_ctx_t *
517 stratcon_jlog_streamer_iep_ctx_alloc(void) {
518   jlog_streamer_ctx_t *ctx;
519   ctx = stratcon_jlog_streamer_ctx_alloc();
520   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED);
521   ctx->push = stratcon_iep_line_processor;
522   return ctx;
523 }
524
525 struct iep_daemon_info {
526   pid_t child;
527   int stdin_pipe[2];
528   int stderr_pipe[2];
529   char *directory;
530   char *command;
531 };
532
533 static void
534 iep_daemon_info_free(struct iep_daemon_info *info) {
535   if(!info) return;
536   if(info->directory) free(info->directory);
537   if(info->command) free(info->command);
538   if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]);
539   if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]);
540   if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]);
541   if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]);
542   free(info);
543 }
544
545 static int
546 stratcon_iep_err_handler(eventer_t e, int mask, void *closure,
547                          struct timeval *now) {
548   int len, newmask;
549   char buff[4096];
550   struct iep_daemon_info *info = (struct iep_daemon_info *)closure;
551
552   if(mask & EVENTER_EXCEPTION) {
553     int rv;
554    read_error:
555     kill(info->child, SIGKILL);
556     if(waitpid(info->child, &rv, 0) != info->child) {
557       mtevL(noit_iep, "Failed to reap IEP daemon\n");
558       exit(-1);
559     }
560     mtevL(noit_iep, "IEP daemon is done, starting a new one\n");
561     start_iep_daemon();
562     eventer_remove_fd(e->fd);
563     iep_daemon_info_free(info);
564     return 0;
565   }
566   while(1) {
567     len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e);
568     if(len == -1 && (errno == EAGAIN || errno == EINTR))
569       return newmask | EVENTER_EXCEPTION;
570     if(len <= 0) goto read_error;
571     assert(len < sizeof(buff));
572     buff[len] = '\0';
573     mtevL(noit_iep_debug, "%s", buff);
574   }
575 }
576
577 static void
578 start_iep_daemon() {
579   eventer_t newe;
580   struct iep_daemon_info *info;
581   char *cmd = NULL;
582
583   if(!mtev_conf_get_string(NULL, "/stratcon/iep/start/@command",
584                            &cmd)) {
585     mtevL(noit_iep, "No IEP start command provided.  You're on your own.\n");
586     setup_iep_connection_later(0);
587     return;
588   }
589
590   info = calloc(1, sizeof(*info));
591   info->stdin_pipe[0] = info->stdin_pipe[1] = -1;
592   info->stderr_pipe[0] = info->stderr_pipe[1] = -1;
593   info->command = cmd;
594
595   if(!mtev_conf_get_string(NULL, "/stratcon/iep/start/@directory",
596                            &info->directory))
597     info->directory = strdup(".");
598   if(pipe(info->stdin_pipe) != 0 ||
599      pipe(info->stderr_pipe) != 0) {
600     mtevL(noit_iep, "pipe: %s\n", strerror(errno));
601     goto bail;
602   }
603   info->child = fork();
604   if(info->child == -1) {
605     mtevL(noit_iep, "fork: %s\n", strerror(errno));
606     goto bail;
607   }
608   if(info->child == 0) {
609     char *argv[3] = { "run-iep", NULL, NULL };
610     int stdout_fileno;
611
612     argv[1] = mtev_conf_config_filename();
613
614     if(chdir(info->directory) != 0) {
615       mtevL(noit_iep, "Starting IEP daemon, chdir failed: %s\n",
616             strerror(errno));
617       exit(-1);
618     }
619
620     close(info->stdin_pipe[1]);
621     close(info->stderr_pipe[0]);
622     dup2(info->stdin_pipe[0], 0);
623     dup2(info->stderr_pipe[1], 2);
624     stdout_fileno = open("/dev/null", O_WRONLY);
625     if(stdout_fileno >= 0) dup2(stdout_fileno, 1);
626
627     exit(execv(info->command, argv));
628   }
629   /* in the parent */
630   close(info->stdin_pipe[0]);
631   info->stdin_pipe[0] = -1;
632   close(info->stderr_pipe[1]);
633   info->stderr_pipe[1] = -1;
634   if(eventer_set_fd_nonblocking(info->stderr_pipe[0])) {
635     goto bail;
636   }
637
638   newe = eventer_alloc();
639   newe->fd = info->stderr_pipe[0];
640   newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
641   newe->callback = stratcon_iep_err_handler;
642   newe->closure = info;
643   eventer_add(newe);
644   info = NULL;
645
646   setup_iep_connection_later(1);
647
648   return;
649
650  bail:
651   iep_daemon_info_free(info);
652   mtevL(noit_iep, "Failed to start IEP daemon\n");
653   exit(-1);
654   return;
655 }
656
657 void
658 stratcon_iep_mq_driver_register(const char *name, mq_driver_t *d) {
659   mtev_hash_replace(&mq_drivers, strdup(name), strlen(name), d, free, NULL);
660 }
661
662 static int rest_set_filters(mtev_http_rest_closure_t *restc,
663                             int npats, char **pats) {
664   mtev_http_session_ctx *ctx = restc->http_ctx;
665   xmlXPathObjectPtr pobj = NULL;
666   xmlDocPtr doc = NULL, indoc = NULL;
667   xmlNodePtr node, root;
668   int error_code = 500, complete = 0, mask = 0, cnt = 0, i, j;
669   const char *error = "internal error";
670   xmlXPathContextPtr xpath_ctxt;
671   char *action = NULL, *uuid = NULL;
672   char xpath[1024];
673   mq_command_t *commands = NULL;
674   bool valid = true;
675
676   if (npats != 0) goto error;
677
678   indoc = rest_get_xml_upload(restc, &mask, &complete);
679   if(!complete) return mask;
680   if(indoc == NULL) {
681     error = "xml parse error";
682     goto error;
683   }
684
685   xpath_ctxt = xmlXPathNewContext(indoc);
686   pobj = xmlXPathEval((xmlChar *)"/mq_metrics/check", xpath_ctxt);
687
688   if(!pobj) {
689     error = "xml format incorrect";
690     goto error;
691   }
692   if(pobj->type != XPATH_NODESET) {
693     error = "couldn't find xml nodeset";
694     goto error;
695   }
696   cnt = xmlXPathNodeSetGetLength(pobj->nodesetval);
697   if (cnt <= 0) {
698     error = "no nodes given";
699     goto error;
700   }
701
702   commands = calloc(cnt, sizeof(mq_command_t));
703   if (!commands) goto error;
704
705   for(i=0; i<cnt; i++) {
706     xmlXPathObjectPtr metric_obj = NULL;
707
708     if (!valid) break;
709     node = xmlXPathNodeSetItem(pobj->nodesetval, i);
710     action = (char *)xmlGetProp(node, (xmlChar *)"action");
711     uuid = (char *)xmlGetProp(node, (xmlChar *)"uuid");
712     if ((action == NULL) || (uuid == NULL)) {
713       mtevL(mtev_error, "error parsing %d - need both action and uuid\n", i);
714       if (action) xmlFree(action);
715       if (uuid) xmlFree(uuid);
716       valid = false;
717       continue;
718     }
719     if (!strncmp(action, "set", 3)) {
720       commands[i].action = MQ_ACTION_SET;
721     }
722     else if (!strncmp(action, "forget", 6)) {
723       commands[i].action = MQ_ACTION_FORGET;
724     }
725     else {
726       mtevL(mtev_error, "error parsing %d - bad action (%s)\n", i, action);
727       if (action) xmlFree(action);
728       if (uuid) xmlFree(uuid);
729       valid = false;
730       continue;
731     }
732     commands[i].check.uuid = uuid;
733     snprintf(xpath, sizeof(xpath), "/mq_metrics/check[%d]/metrics/metric", i+1);
734     metric_obj = xmlXPathEval((xmlChar *)xpath, xpath_ctxt);
735     if (metric_obj && metric_obj->type == XPATH_NODESET) {
736       commands[i].check.metric_count = xmlXPathNodeSetGetLength(metric_obj->nodesetval);
737       if (commands[i].check.metric_count > 0) {
738         char *metric_name = NULL;
739         commands[i].check.metrics = calloc(commands[i].check.metric_count, sizeof(char*));
740         for (j=0; j < commands[i].check.metric_count; j++) {
741           xmlNodePtr metric_node = NULL;
742           if (!valid) break;
743           metric_node = xmlXPathNodeSetItem(metric_obj->nodesetval, j);
744           metric_name = (char *)xmlGetProp(metric_node, (xmlChar *)"name");
745           if (!metric_name) {
746             valid = false;
747             continue;
748           }
749           commands[i].check.metrics[j] = metric_name;
750         }
751       }
752     }
753   }
754
755   if (!valid) {
756     goto error;
757   }
758
759   for(struct driver_list *d = drivers; d; d = d->next) {
760     if (d->mq_driver) {
761       d->mq_driver->set_filters(commands, cnt);
762     }
763   }
764
765   mtev_http_response_ok(restc->http_ctx, "text/xml");
766   mtev_http_response_end(restc->http_ctx);
767   goto cleanup;
768
769  error:
770   mtev_http_response_standard(ctx, error_code, "ERROR", "text/xml");
771   doc = xmlNewDoc((xmlChar *)"1.0");
772   root = xmlNewDocNode(doc, NULL, (xmlChar *)"error", NULL);
773   xmlDocSetRootElement(doc, root);
774   xmlNodeAddContent(root, (xmlChar *)error);
775   mtev_http_response_xml(ctx, doc);
776   mtev_http_response_end(ctx);
777
778  cleanup:
779   if (commands) {
780     for (i=0; i<cnt; i++) {
781       mq_command_free(&commands[i]);
782     }
783     free(commands);
784   }
785   return 0;
786 }
787
788 void
789 stratcon_iep_init() {
790   mtev_conf_section_t *mqs;
791   int i, cnt;
792   mtev_boolean disabled = mtev_false;
793   char mq_type[128] = "stomp";
794   struct driver_list *newdriver;
795   void *vdriver;
796
797   noit_iep = mtev_log_stream_find("error/iep");
798   noit_iep_debug = mtev_log_stream_find("debug/iep");
799   if(!noit_iep) noit_iep = noit_error;
800   if(!noit_iep_debug) noit_iep_debug = noit_debug;
801
802   if(mtev_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) &&
803      disabled == mtev_true) {
804     mtevL(noit_iep, "IEP system is disabled!\n");
805     return;
806   }
807
808   mqs = mtev_conf_get_sections(NULL, "/stratcon/iep/mq", &cnt);
809   for(i=0; i<cnt; i++) {
810     if(!mtev_conf_get_stringbuf(mqs[i], "@type",
811                                 mq_type, sizeof(mq_type))) {
812       mtevL(noit_iep, "You must specify an <mq type=\"...\"> that is valid.\n");
813       exit(-2);
814     }
815     if(!mtev_hash_retrieve(&mq_drivers, mq_type, strlen(mq_type), &vdriver) ||
816        vdriver == NULL) {
817       mtevL(noit_iep, "Cannot find MQ driver type: %s\n", mq_type);
818       mtevL(noit_iep, "Did you forget to load a module?\n");
819       exit(-2);
820     }
821     newdriver = calloc(1, sizeof(*newdriver));
822     newdriver->section = mqs[i];
823     newdriver->mq_driver = (mq_driver_t *)vdriver;
824     pthread_key_create(&newdriver->iep_connection, connection_destroy);
825     newdriver->next = drivers;
826     drivers = newdriver;
827   }
828   free(mqs);
829
830   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
831   eventer_name_callback("stratcon_iep_err_handler", stratcon_iep_err_handler);
832   eventer_name_callback("setup_iep_connection_callback", setup_iep_connection_callback);
833
834   /* start up a thread pool of one */
835   memset(&iep_jobq, 0, sizeof(iep_jobq));
836   eventer_jobq_init(&iep_jobq, "iep_submitter");
837   eventer_jobq_increase_concurrency(&iep_jobq);
838
839   assert(mtev_http_rest_register_auth(
840     "PUT", "/", "^mq_filters$",
841     rest_set_filters, mtev_http_rest_client_cert_auth
842   ) == 0);
843
844   start_iep_daemon();
845
846   /* setup our live jlog stream */
847   stratcon_streamer_connection(NULL, NULL, "noit",
848                                stratcon_jlog_recv_handler,
849                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
850                                NULL,
851                                jlog_streamer_ctx_free);
852 }
853
Note: See TracBrowser for help on using the browser.