root/src/stratcon_iep.c

Revision 70c30ef22f6d1312375e0392164d755db5824cb5, 26.3 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 5 days ago)

Use mtevAssert and mtevFatal Instead Of assert() and abort()

Use libmtev calls to safely flush logs and abort rather than calling
the assert and abort calls directly.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <sys/types.h>
37 #ifdef HAVE_SYS_WAIT_H
38 #include <sys/wait.h>
39 #endif
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <sys/fcntl.h>
44 #ifdef HAVE_SYS_FILIO_H
45 #include <sys/filio.h>
46 #endif
47 #include <signal.h>
48 #include <errno.h>
49 #include <libxml/parser.h>
50 #include <libxml/tree.h>
51 #include <libxml/xpath.h>
52
53 #include <eventer/eventer.h>
54 #include <mtev_log.h>
55 #include <mtev_b64.h>
56 #include <mtev_conf.h>
57 #include <mtev_rest.h>
58
59 #include "noit_mtev_bridge.h"
60 #include "noit_jlog_listener.h"
61 #include "stratcon_jlog_streamer.h"
62 #include "stratcon_datastore.h"
63 #include "stratcon_iep.h"
64 #include "noit_check.h"
65
66 eventer_jobq_t iep_jobq;
67 static mtev_log_stream_t noit_iep = NULL;
68 static mtev_log_stream_t noit_iep_debug = NULL;
69 static mtev_spinlock_t iep_conn_cnt = 0;
70 static mtev_boolean inject_remote_cn = mtev_false;
71
72 static mtev_hash_table mq_drivers = MTEV_HASH_EMPTY;
73 struct driver_thread_data {
74   mq_driver_t *mq_driver;
75   struct iep_thread_driver *driver_data;
76 };
77 struct driver_list {
78   mq_driver_t *mq_driver;
79   pthread_key_t iep_connection;
80   mtev_conf_section_t section;
81   struct driver_list *next;
82 } *drivers;
83
84 static int iep_system_enabled = 1;
85 int stratcon_iep_get_enabled() { return iep_system_enabled; }
86 void stratcon_iep_set_enabled(int n) { iep_system_enabled = n; }
87 static int rest_set_filters(mtev_http_rest_closure_t *restc,
88                             int npats, char **pats);
89
90
91 struct iep_job_closure {
92   char *line;       /* This is a copy and gets trashed during processing */
93   char *remote;
94   char *doc_str;
95   struct timeval start;
96 };
97
98 static void
99 start_iep_daemon();
100
101 static double
102 stratcon_iep_age_from_line(char *data, struct timeval now) {
103   double n, t;
104   if(data && (*data == 'S' || *data == 'M')) {
105     if(data[1] != '\t') return 0;
106     t = strtod(data + 2, NULL);
107     n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0;
108     return n - t;
109   }
110   return 0;
111 }
112
113 struct statement_node {
114   char *id;
115   char *statement;
116   char *provides;
117   int marked; /* helps with identifying cycles */
118   int nrequires;
119   struct statement_node **requires;
120 };
121 static void
122 statement_node_free(void *vstmt) {
123   struct statement_node *stmt = vstmt;
124   if(stmt->id) free(stmt->id);
125   if(stmt->statement) free(stmt->statement);
126   if(stmt->provides) free(stmt->provides);
127   if(stmt->requires) free(stmt->requires);
128 }
129 static void
130 mq_command_free(void *command) {
131   mq_command_t *cmd = (mq_command_t*)command;
132   int i;
133   if (cmd->check.uuid) {
134     xmlFree(cmd->check.uuid);
135   }
136   if (cmd->check.metric_count > 0) {
137     for (i=0; i < cmd->check.metric_count; i++) {
138       if (!cmd->check.metrics[i]) break;
139       xmlFree(cmd->check.metrics[i]);
140     }
141     free(cmd->check.metrics);
142   }
143 }
144 static int
145 stmt_mark_dag(struct statement_node *stmt, int mgen) {
146   int i;
147   mtevAssert(stmt->marked <= mgen);
148   if(stmt->marked == mgen) return -1;
149   if(stmt->marked > 0) return 0; /* validated in a previous sweep */
150   stmt->marked = mgen;
151   for(i=0; i<stmt->nrequires; i++)
152     if(stmt_mark_dag(stmt->requires[i], mgen) < 0) return -1;
153   return 0;
154 }
155 static void
156 submit_statement_node(struct statement_node *stmt) {
157   int line_len, i;
158   char *line, *cp;
159
160   if(stmt->marked) return;
161   for(i=0; i<stmt->nrequires; i++)
162     submit_statement_node(stmt->requires[i]);
163
164   line_len = 3 /* 2 tabs + \0 */ +
165              1 /* 'D' */ + 1 /* '\n' */ +
166              strlen(stmt->id) + strlen(stmt->statement);
167   line = malloc(line_len);
168   snprintf(line, line_len, "D\t%s\t%s\n", stmt->id, stmt->statement);
169   cp = line;
170   while(cp[0] && cp[1]) {
171     if(*cp == '\n') *cp = ' ';
172     cp++;
173   }
174   mtevL(noit_iep, "submitting statement: %s\n", line);
175   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
176   stmt->marked = 1;
177 }
178 void stratcon_iep_submit_statements() {
179   int i, cnt = 0;
180   mtev_conf_section_t *statement_configs;
181   char path[256];
182   struct statement_node *stmt;
183   void *vstmt;
184   mtev_hash_table stmt_by_id = MTEV_HASH_EMPTY;
185   mtev_hash_table stmt_by_provider = MTEV_HASH_EMPTY;
186   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
187   const char *key;
188   int klen, mgen = 0;
189
190   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//statement");
191   statement_configs = mtev_conf_get_sections(NULL, path, &cnt);
192   mtevL(noit_debug, "Found %d %s stanzas\n", cnt, path);
193
194   /* Phase 1: sweep in all the statements */
195   for(i=0; i<cnt; i++) {
196     char id[UUID_STR_LEN+1];
197     char provides[256];
198     char *statement;
199
200     if(!mtev_conf_get_stringbuf(statement_configs[i],
201                                 "self::node()/@id",
202                                 id, sizeof(id))) {
203       mtevL(noit_iep, "No uuid specified in query\n");
204       continue;
205     }
206     if(!mtev_conf_get_stringbuf(statement_configs[i],
207                                 "ancestor-or-self::node()/@provides",
208                                 provides, sizeof(provides))) {
209       provides[0] = '\0';
210     }
211     if(!mtev_conf_get_string(statement_configs[i], "self::node()/epl",
212                              &statement)) {
213       mtevL(noit_iep, "No contents specified in statement\n");
214       continue;
215     }
216     stmt = calloc(1, sizeof(*stmt));
217     stmt->id = strdup(id);
218     stmt->statement = statement;
219     stmt->provides = provides[0] ? strdup(provides) : NULL;
220     if(!mtev_hash_store(&stmt_by_id, stmt->id, strlen(stmt->id), stmt)) {
221       mtevL(noit_iep, "Duplicate statement id: %s\n", stmt->id);
222       exit(-1);
223     }
224     if(stmt->provides) {
225       if(!mtev_hash_store(&stmt_by_provider, stmt->provides,
226                           strlen(stmt->provides), stmt)) {
227         mtevL(noit_iep, "Two statements provide: '%s'\n", stmt->provides);
228         exit(-1);
229       }
230     }
231   }
232
233   /* Phase 2: load the requires graph */
234   for(i=0; i<cnt; i++) {
235     char id[UUID_STR_LEN+1];
236     int rcnt, j;
237     char *requires;
238     mtev_conf_section_t *reqs;
239
240     if(!mtev_conf_get_stringbuf(statement_configs[i],
241                                 "self::node()/@id",
242                                 id, sizeof(id))) {
243       mtevL(noit_iep, "No uuid specified in query\n");
244       continue;
245     }
246     if(!mtev_hash_retrieve(&stmt_by_id, id, strlen(id), &vstmt)) {
247       mtevL(noit_iep, "Cannot find statement: %s\n", id);
248       exit(-1);
249     }
250     stmt = vstmt;
251     reqs = mtev_conf_get_sections(statement_configs[i],
252                                   "self::node()/requires", &rcnt);
253     if(rcnt > 0) {
254       stmt->requires = malloc(rcnt * sizeof(*(stmt->requires)));
255       for(j=0; j<rcnt; j++) {
256         void *vrstmt;
257         if(!mtev_conf_get_string(reqs[j], "self::node()",
258                                  &requires) || requires[0] == '\0') {
259           continue;
260         }
261         if(!mtev_hash_retrieve(&stmt_by_provider, requires, strlen(requires),
262                                &vrstmt)) {
263           mtevL(noit_iep,
264                 "Statement %s requires %s which no one provides.\n",
265                 stmt->id, requires);
266           exit(-1);
267         }
268         stmt->requires[stmt->nrequires++] = vrstmt;
269       }
270     }
271   }
272
273   /* Phase 3: Recursive sweep and mark to detect cycles.
274      We're walking the graph backwards here from dependent to provider,
275      but a cycle is a cycle, so this validates the graph. */
276   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
277     stmt = vstmt;
278     if(stmt_mark_dag(stmt, ++mgen) < 0) {
279       mtevL(noit_iep, "Statement %s has a cyclic requirement\n", stmt->id);
280       exit(-1);
281     }
282   }
283
284   /* Phase 4: clean the markings */
285   memset(&iter, 0, sizeof(iter));
286   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
287     stmt = vstmt;
288     stmt->marked = 0;
289   }
290
291   /* Phase 5: do the load */
292   memset(&iter, 0, sizeof(iter));
293   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
294     stmt = vstmt;
295     submit_statement_node(stmt);
296   }
297
298   mtev_hash_destroy(&stmt_by_provider, NULL, NULL);
299   mtev_hash_destroy(&stmt_by_id, NULL, statement_node_free);
300   free(statement_configs);
301 }
302
303 void stratcon_iep_submit_queries() {
304   int i, cnt = 0;
305   mtev_conf_section_t *query_configs;
306   char path[256];
307
308   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//query");
309   query_configs = mtev_conf_get_sections(NULL, path, &cnt);
310   mtevL(noit_debug, "Found %d %s stanzas\n", cnt, path);
311   for(i=0; i<cnt; i++) {
312     char id[UUID_STR_LEN+1];
313     char topic[256];
314     char *query;
315     char *line;
316     int line_len;
317
318     if(!mtev_conf_get_stringbuf(query_configs[i],
319                                 "self::node()/@id",
320                                 id, sizeof(id))) {
321       mtevL(noit_iep, "No uuid specified in query\n");
322       continue;
323     }
324     if(!mtev_conf_get_stringbuf(query_configs[i],
325                                 "ancestor-or-self::node()/@topic",
326                                 topic, sizeof(topic))) {
327       mtevL(noit_iep, "No topic specified in query\n");
328       continue;
329     }
330     if(!mtev_conf_get_string(query_configs[i], "self::node()/epl",
331                              &query)) {
332       mtevL(noit_iep, "No contents specified in query\n");
333       continue;
334     }
335     line_len = 4 /* 3 tabs + \0 */ +
336                1 /* 'Q' */ + 1 /* '\n' */ +
337                strlen(id) + strlen(topic) + strlen(query);
338     line = malloc(line_len);
339     snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query);
340     free(query);
341     query = line;
342     while(query[0] && query[1]) {
343       if(*query == '\n') *query = ' ';
344       query++;
345     }
346     stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
347   }
348   free(query_configs);
349 }
350
351 static struct driver_thread_data *
352 connect_iep_driver(struct driver_list *d) {
353   int rc;
354   struct driver_thread_data *data;
355   data = pthread_getspecific(d->iep_connection);
356   if(!data) {
357     data = calloc(1, sizeof(*data));
358     data->mq_driver = d->mq_driver;
359     pthread_setspecific(d->iep_connection, data);
360   }
361   if(!data->driver_data)
362     data->driver_data = data->mq_driver->allocate(d->section);
363   rc = data->mq_driver->connect(data->driver_data);
364   if(rc < 0) return NULL;
365   if(rc == 0) {
366     /* Initial connect */
367     /* TODO: this should be requested by Esper, not blindly pushed */
368     stratcon_iep_submit_statements();
369     stratcon_datastore_iep_check_preload();
370     stratcon_iep_submit_queries();
371   }
372
373   return data;
374 }
375
376 static int
377 setup_iep_connection_callback(eventer_t e, int mask, void *closure,
378                               struct timeval *now) {
379   mtev_spinlock_unlock(&iep_conn_cnt);
380   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, NULL, NULL);
381   return 0;
382 }
383
384 static void
385 setup_iep_connection_later(int seconds) {
386   eventer_t newe;
387   if(!mtev_spinlock_trylock(&iep_conn_cnt)) return;
388   newe = eventer_alloc();
389   gettimeofday(&newe->whence, NULL);
390   newe->whence.tv_sec += seconds;
391   newe->mask = EVENTER_TIMER;
392   newe->callback = setup_iep_connection_callback;
393   newe->closure = NULL;
394   eventer_add(newe);
395 }
396
397 static int
398 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
399                        struct timeval *now) {
400   double age;
401   struct iep_job_closure *job = closure;
402   char *line;
403   struct timeval diff;
404   /* We only play when it is an asynch event */
405   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
406
407   if(mask & EVENTER_ASYNCH_CLEANUP) {
408     /* free all the memory associated with the batch */
409     if(job) {
410       if(job->line) free(job->line);
411       if(job->remote) free(job->remote);
412       if(job->doc_str) free(job->doc_str);
413       free(job);
414     }
415     return 0;
416   }
417
418   /* If we're greater than 30 seconds old,
419      just quit. */
420   sub_timeval(*now, job->start, &diff);
421   if (diff.tv_sec >= 30) {
422     mtevL(noit_debug, "Skipping event from %s - waiting in eventer for more than 30 seconds\n",
423                         job->remote ? job->remote : "(null)");
424     return 0;
425   }
426
427   if(!job->line || job->line[0] == '\0') return 0;
428
429   if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) {
430     mtevL(noit_debug, "Skipping old event from %s, %f seconds old.\n",
431           job->remote ? job->remote : "(null)", age);
432     return 0;
433   }
434   /* Submit */
435   int line_len = strlen(job->line);
436   int remote_len = strlen(job->remote);
437   const char *toff = strchr(job->line, '\t');
438   int token_off = 2;
439   if(toff) token_off = toff - job->line + 1;
440
441   line = (char*)calloc(line_len + 1 /* \t */ + remote_len + 2, 1);
442   strncpy(line, job->line, token_off);
443   strncat(line, job->remote, remote_len);
444   strncat(line, "\t", 1);
445   strncat(line, job->line + token_off, line_len - token_off);
446   job->doc_str = line;
447
448   for(struct driver_list *d = drivers; d; d = d->next) {
449     struct driver_thread_data *tls = connect_iep_driver(d);
450     if(tls && tls->driver_data) {
451       if(tls->mq_driver->submit(tls->driver_data, job->doc_str,
452                                 line_len + remote_len + 1) != 0) {
453         mtevL(noit_debug, "failed to MQ submit.\n");
454       }
455     }
456   }
457   return 0;
458 }
459
460 void
461 stratcon_iep_line_processor(stratcon_datastore_op_t op,
462                             struct sockaddr *remote, const char *remote_cn,
463                             void *operand, eventer_t completion) {
464   int len;
465   char remote_str[256];
466   struct iep_job_closure *jc;
467   eventer_t newe;
468   /* We only care about inserts */
469
470   if(op == DS_OP_CHKPT) {
471     if(completion) eventer_add(completion);
472     return;
473   }
474   if(op != DS_OP_INSERT) return;
475
476   if(inject_remote_cn) {
477     if(remote_cn == NULL) remote_cn = "default";
478     strlcpy(remote_str, remote_cn, sizeof(remote_str));
479   }
480   else {
481     snprintf(remote_str, sizeof(remote_str), "%s", "0.0.0.0");
482     if(remote) {
483       switch(remote->sa_family) {
484         case AF_INET:
485           len = sizeof(struct sockaddr_in);
486           inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
487                     remote_str, len);
488           break;
489         case AF_INET6:
490          len = sizeof(struct sockaddr_in6);
491           inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
492                     remote_str, len);
493          break;
494         case AF_UNIX:
495           snprintf(remote_str, sizeof(remote_str), "%s", ((struct sockaddr_un *)remote)->sun_path);
496           break;
497       }
498     }
499   }
500
501   /* process operand and push onto queue */
502   newe = eventer_alloc();
503   newe->thr_owner = eventer_choose_owner(0);
504   newe->mask = EVENTER_ASYNCH;
505   newe->callback = stratcon_iep_submitter;
506   jc = calloc(1, sizeof(*jc));
507   jc->line = operand;
508   jc->remote = strdup(remote_str);
509   gettimeofday(&jc->start, NULL);
510   newe->closure = jc;
511
512   eventer_add_asynch(&iep_jobq, newe);
513 }
514
515 static void connection_destroy(void *vd) {
516   struct driver_thread_data *data = vd;
517   data->mq_driver->disconnect(data->driver_data);
518   data->mq_driver->deallocate(data->driver_data);
519   free(data);
520 }
521
522 jlog_streamer_ctx_t *
523 stratcon_jlog_streamer_iep_ctx_alloc(void) {
524   jlog_streamer_ctx_t *ctx;
525   ctx = stratcon_jlog_streamer_ctx_alloc();
526   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED);
527   ctx->push = stratcon_iep_line_processor;
528   return ctx;
529 }
530
531 struct iep_daemon_info {
532   pid_t child;
533   int stdin_pipe[2];
534   int stderr_pipe[2];
535   char *directory;
536   char *command;
537 };
538
539 static void
540 iep_daemon_info_free(struct iep_daemon_info *info) {
541   if(!info) return;
542   if(info->directory) free(info->directory);
543   if(info->command) free(info->command);
544   if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]);
545   if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]);
546   if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]);
547   if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]);
548   free(info);
549 }
550
551 static int
552 stratcon_iep_err_handler(eventer_t e, int mask, void *closure,
553                          struct timeval *now) {
554   int len, newmask;
555   char buff[4096];
556   struct iep_daemon_info *info = (struct iep_daemon_info *)closure;
557
558   if(mask & EVENTER_EXCEPTION) {
559     int rv;
560    read_error:
561     kill(info->child, SIGKILL);
562     if(waitpid(info->child, &rv, 0) != info->child) {
563       mtevL(noit_iep, "Failed to reap IEP daemon\n");
564       exit(-1);
565     }
566     mtevL(noit_iep, "IEP daemon is done, starting a new one\n");
567     start_iep_daemon();
568     eventer_remove_fd(e->fd);
569     iep_daemon_info_free(info);
570     return 0;
571   }
572   while(1) {
573     len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e);
574     if(len == -1 && (errno == EAGAIN || errno == EINTR))
575       return newmask | EVENTER_EXCEPTION;
576     if(len <= 0) goto read_error;
577     mtevAssert(len < sizeof(buff));
578     buff[len] = '\0';
579     mtevL(noit_iep_debug, "%s", buff);
580   }
581 }
582
583 static void
584 start_iep_daemon() {
585   eventer_t newe;
586   struct iep_daemon_info *info;
587   char *cmd = NULL;
588
589   if(!mtev_conf_get_string(NULL, "/stratcon/iep/start/@command",
590                            &cmd)) {
591     mtevL(noit_iep, "No IEP start command provided.  You're on your own.\n");
592     setup_iep_connection_later(0);
593     return;
594   }
595
596   info = calloc(1, sizeof(*info));
597   info->stdin_pipe[0] = info->stdin_pipe[1] = -1;
598   info->stderr_pipe[0] = info->stderr_pipe[1] = -1;
599   info->command = cmd;
600
601   if(!mtev_conf_get_string(NULL, "/stratcon/iep/start/@directory",
602                            &info->directory))
603     info->directory = strdup(".");
604   if(pipe(info->stdin_pipe) != 0 ||
605      pipe(info->stderr_pipe) != 0) {
606     mtevL(noit_iep, "pipe: %s\n", strerror(errno));
607     goto bail;
608   }
609   info->child = fork();
610   if(info->child == -1) {
611     mtevL(noit_iep, "fork: %s\n", strerror(errno));
612     goto bail;
613   }
614   if(info->child == 0) {
615     char *argv[3] = { "run-iep", NULL, NULL };
616     int stdout_fileno;
617
618     argv[1] = mtev_conf_config_filename();
619
620     if(chdir(info->directory) != 0) {
621       mtevL(noit_iep, "Starting IEP daemon, chdir failed: %s\n",
622             strerror(errno));
623       exit(-1);
624     }
625
626     close(info->stdin_pipe[1]);
627     close(info->stderr_pipe[0]);
628     dup2(info->stdin_pipe[0], 0);
629     dup2(info->stderr_pipe[1], 2);
630     stdout_fileno = open("/dev/null", O_WRONLY);
631     if(stdout_fileno >= 0) dup2(stdout_fileno, 1);
632
633     exit(execv(info->command, argv));
634   }
635   /* in the parent */
636   close(info->stdin_pipe[0]);
637   info->stdin_pipe[0] = -1;
638   close(info->stderr_pipe[1]);
639   info->stderr_pipe[1] = -1;
640   if(eventer_set_fd_nonblocking(info->stderr_pipe[0])) {
641     goto bail;
642   }
643
644   newe = eventer_alloc();
645   newe->fd = info->stderr_pipe[0];
646   newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
647   newe->callback = stratcon_iep_err_handler;
648   newe->closure = info;
649   eventer_add(newe);
650   info = NULL;
651
652   setup_iep_connection_later(1);
653
654   return;
655
656  bail:
657   iep_daemon_info_free(info);
658   mtevL(noit_iep, "Failed to start IEP daemon\n");
659   exit(-1);
660   return;
661 }
662
663 void
664 stratcon_iep_mq_driver_register(const char *name, mq_driver_t *d) {
665   mtev_hash_replace(&mq_drivers, strdup(name), strlen(name), d, free, NULL);
666 }
667
668 static int rest_set_filters(mtev_http_rest_closure_t *restc,
669                             int npats, char **pats) {
670   mtev_http_session_ctx *ctx = restc->http_ctx;
671   xmlXPathObjectPtr pobj = NULL;
672   xmlDocPtr doc = NULL, indoc = NULL;
673   xmlNodePtr node, root;
674   int error_code = 500, complete = 0, mask = 0, cnt = 0, i, j;
675   const char *error = "internal error";
676   xmlXPathContextPtr xpath_ctxt;
677   char *action = NULL, *uuid = NULL;
678   char xpath[1024];
679   mq_command_t *commands = NULL;
680   bool valid = true;
681
682   if (npats != 0) goto error;
683
684   indoc = rest_get_xml_upload(restc, &mask, &complete);
685   if(!complete) return mask;
686   if(indoc == NULL) {
687     error = "xml parse error";
688     goto error;
689   }
690
691   xpath_ctxt = xmlXPathNewContext(indoc);
692   pobj = xmlXPathEval((xmlChar *)"/mq_metrics/check", xpath_ctxt);
693
694   if(!pobj) {
695     error = "xml format incorrect";
696     goto error;
697   }
698   if(pobj->type != XPATH_NODESET) {
699     error = "couldn't find xml nodeset";
700     goto error;
701   }
702   cnt = xmlXPathNodeSetGetLength(pobj->nodesetval);
703   if (cnt <= 0) {
704     error = "no nodes given";
705     goto error;
706   }
707
708   commands = calloc(cnt, sizeof(mq_command_t));
709   if (!commands) goto error;
710
711   for(i=0; i<cnt; i++) {
712     xmlXPathObjectPtr metric_obj = NULL;
713
714     if (!valid) break;
715     node = xmlXPathNodeSetItem(pobj->nodesetval, i);
716     action = (char *)xmlGetProp(node, (xmlChar *)"action");
717     uuid = (char *)xmlGetProp(node, (xmlChar *)"uuid");
718     if ((action == NULL) || (uuid == NULL)) {
719       mtevL(mtev_error, "error parsing %d - need both action and uuid\n", i);
720       if (action) xmlFree(action);
721       if (uuid) xmlFree(uuid);
722       valid = false;
723       continue;
724     }
725     if (!strncmp(action, "set", 3)) {
726       commands[i].action = MQ_ACTION_SET;
727     }
728     else if (!strncmp(action, "forget", 6)) {
729       commands[i].action = MQ_ACTION_FORGET;
730     }
731     else {
732       mtevL(mtev_error, "error parsing %d - bad action (%s)\n", i, action);
733       if (action) xmlFree(action);
734       if (uuid) xmlFree(uuid);
735       valid = false;
736       continue;
737     }
738     commands[i].check.uuid = uuid;
739     snprintf(xpath, sizeof(xpath), "/mq_metrics/check[%d]/metrics/metric", i+1);
740     metric_obj = xmlXPathEval((xmlChar *)xpath, xpath_ctxt);
741     if (metric_obj && metric_obj->type == XPATH_NODESET) {
742       commands[i].check.metric_count = xmlXPathNodeSetGetLength(metric_obj->nodesetval);
743       if (commands[i].check.metric_count > 0) {
744         char *metric_name = NULL;
745         commands[i].check.metrics = calloc(commands[i].check.metric_count, sizeof(char*));
746         for (j=0; j < commands[i].check.metric_count; j++) {
747           xmlNodePtr metric_node = NULL;
748           if (!valid) break;
749           metric_node = xmlXPathNodeSetItem(metric_obj->nodesetval, j);
750           metric_name = (char *)xmlGetProp(metric_node, (xmlChar *)"name");
751           if (!metric_name) {
752             valid = false;
753             continue;
754           }
755           commands[i].check.metrics[j] = metric_name;
756         }
757       }
758     }
759   }
760
761   if (!valid) {
762     goto error;
763   }
764
765   for(struct driver_list *d = drivers; d; d = d->next) {
766     if (d->mq_driver) {
767       d->mq_driver->set_filters(commands, cnt);
768     }
769   }
770
771   mtev_http_response_ok(restc->http_ctx, "text/xml");
772   mtev_http_response_end(restc->http_ctx);
773   goto cleanup;
774
775  error:
776   mtev_http_response_standard(ctx, error_code, "ERROR", "text/xml");
777   doc = xmlNewDoc((xmlChar *)"1.0");
778   root = xmlNewDocNode(doc, NULL, (xmlChar *)"error", NULL);
779   xmlDocSetRootElement(doc, root);
780   xmlNodeAddContent(root, (xmlChar *)error);
781   mtev_http_response_xml(ctx, doc);
782   mtev_http_response_end(ctx);
783
784  cleanup:
785   if (commands) {
786     for (i=0; i<cnt; i++) {
787       mq_command_free(&commands[i]);
788     }
789     free(commands);
790   }
791   return 0;
792 }
793
794 void
795 stratcon_iep_init() {
796   mtev_conf_section_t *mqs;
797   int i, cnt;
798   mtev_boolean disabled = mtev_false;
799   char mq_type[128] = "stomp";
800   /* Only 32 so we can print out a reasonable length bad value */
801   char remote[32] = "ip";
802   struct driver_list *newdriver;
803   void *vdriver;
804
805   noit_iep = mtev_log_stream_find("error/iep");
806   noit_iep_debug = mtev_log_stream_find("debug/iep");
807   if(!noit_iep) noit_iep = noit_error;
808   if(!noit_iep_debug) noit_iep_debug = noit_debug;
809
810   if(mtev_conf_get_stringbuf(NULL, "/stratcon/iep/@inject_remote", remote, sizeof(remote))) {
811     if(strcmp(remote, "ip") && strcmp(remote, "cn")) {
812       mtevL(noit_iep, "Bad @remote_inject \"%s\", setting to \"cn\"\n", remote);
813       strlcpy(remote, "ip", sizeof(remote));
814     }
815   }
816   if(!strcmp(remote, "ip")) inject_remote_cn = mtev_false;
817   else if(!strcmp(remote, "cn")) inject_remote_cn = mtev_true;
818
819   if(mtev_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) &&
820      disabled == mtev_true) {
821     mtevL(noit_iep, "IEP system is disabled!\n");
822     return;
823   }
824
825   mqs = mtev_conf_get_sections(NULL, "/stratcon/iep/mq", &cnt);
826   for(i=0; i<cnt; i++) {
827     if(!mtev_conf_get_stringbuf(mqs[i], "@type",
828                                 mq_type, sizeof(mq_type))) {
829       mtevL(noit_iep, "You must specify an <mq type=\"...\"> that is valid.\n");
830       exit(-2);
831     }
832     if(!mtev_hash_retrieve(&mq_drivers, mq_type, strlen(mq_type), &vdriver) ||
833        vdriver == NULL) {
834       mtevL(noit_iep, "Cannot find MQ driver type: %s\n", mq_type);
835       mtevL(noit_iep, "Did you forget to load a module?\n");
836       exit(-2);
837     }
838     newdriver = calloc(1, sizeof(*newdriver));
839     newdriver->section = mqs[i];
840     newdriver->mq_driver = (mq_driver_t *)vdriver;
841     pthread_key_create(&newdriver->iep_connection, connection_destroy);
842     newdriver->next = drivers;
843     drivers = newdriver;
844   }
845   free(mqs);
846
847   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
848   eventer_name_callback("stratcon_iep_err_handler", stratcon_iep_err_handler);
849   eventer_name_callback("setup_iep_connection_callback", setup_iep_connection_callback);
850
851   /* start up a thread pool of one */
852   memset(&iep_jobq, 0, sizeof(iep_jobq));
853   eventer_jobq_init(&iep_jobq, "iep_submitter");
854   eventer_jobq_increase_concurrency(&iep_jobq);
855
856   mtevAssert(mtev_http_rest_register_auth(
857     "PUT", "/", "^mq_filters$",
858     rest_set_filters, mtev_http_rest_client_cert_auth
859   ) == 0);
860
861   start_iep_daemon();
862
863   /* setup our live jlog stream */
864   stratcon_streamer_connection(NULL, NULL, "noit",
865                                stratcon_jlog_recv_handler,
866                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
867                                NULL,
868                                jlog_streamer_ctx_free);
869 }
870
Note: See TracBrowser for help on using the browser.