root/src/stratcon_iep.c

Revision 2ff4db5a6730270eb30827e23883ed354c42ddf6, 26.4 kB (checked in by Phil Maddox <philip.maddox@circonus.com>, 2 weeks ago)

Explicitly Initialize Mtev Hash Tables

Rather than using MTEV_HASH_EMPTY or not calling any initialization at
all, explicitly initialize hash tables.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * Copyright (c) 2015, Circonus, Inc. All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <mtev_defines.h>
35
36 #include <sys/types.h>
37 #ifdef HAVE_SYS_WAIT_H
38 #include <sys/wait.h>
39 #endif
40 #include <sys/stat.h>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <sys/fcntl.h>
44 #ifdef HAVE_SYS_FILIO_H
45 #include <sys/filio.h>
46 #endif
47 #include <signal.h>
48 #include <errno.h>
49 #include <libxml/parser.h>
50 #include <libxml/tree.h>
51 #include <libxml/xpath.h>
52
53 #include <eventer/eventer.h>
54 #include <mtev_log.h>
55 #include <mtev_b64.h>
56 #include <mtev_conf.h>
57 #include <mtev_rest.h>
58
59 #include "noit_mtev_bridge.h"
60 #include "noit_jlog_listener.h"
61 #include "stratcon_jlog_streamer.h"
62 #include "stratcon_datastore.h"
63 #include "stratcon_iep.h"
64 #include "noit_check.h"
65
66 eventer_jobq_t iep_jobq;
67 static mtev_log_stream_t noit_iep = NULL;
68 static mtev_log_stream_t noit_iep_debug = NULL;
69 static mtev_spinlock_t iep_conn_cnt = 0;
70 static mtev_boolean inject_remote_cn = mtev_false;
71
72 static mtev_hash_table mq_drivers;
73 struct driver_thread_data {
74   mq_driver_t *mq_driver;
75   struct iep_thread_driver *driver_data;
76 };
77 struct driver_list {
78   mq_driver_t *mq_driver;
79   pthread_key_t iep_connection;
80   mtev_conf_section_t section;
81   struct driver_list *next;
82 } *drivers;
83
84 static int iep_system_enabled = 1;
85 int stratcon_iep_get_enabled() { return iep_system_enabled; }
86 void stratcon_iep_set_enabled(int n) { iep_system_enabled = n; }
87 static int rest_set_filters(mtev_http_rest_closure_t *restc,
88                             int npats, char **pats);
89
90
91 struct iep_job_closure {
92   char *line;       /* This is a copy and gets trashed during processing */
93   char *remote;
94   char *doc_str;
95   struct timeval start;
96 };
97
98 static void
99 start_iep_daemon();
100
101 static double
102 stratcon_iep_age_from_line(char *data, struct timeval now) {
103   double n, t;
104   if(data && (*data == 'S' || *data == 'M')) {
105     if(data[1] != '\t') return 0;
106     t = strtod(data + 2, NULL);
107     n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0;
108     return n - t;
109   }
110   return 0;
111 }
112
113 struct statement_node {
114   char *id;
115   char *statement;
116   char *provides;
117   int marked; /* helps with identifying cycles */
118   int nrequires;
119   struct statement_node **requires;
120 };
121 static void
122 statement_node_free(void *vstmt) {
123   struct statement_node *stmt = vstmt;
124   if(stmt->id) free(stmt->id);
125   if(stmt->statement) free(stmt->statement);
126   if(stmt->provides) free(stmt->provides);
127   if(stmt->requires) free(stmt->requires);
128 }
129 static void
130 mq_command_free(void *command) {
131   mq_command_t *cmd = (mq_command_t*)command;
132   int i;
133   if (cmd->check.uuid) {
134     xmlFree(cmd->check.uuid);
135   }
136   if (cmd->check.metric_count > 0) {
137     for (i=0; i < cmd->check.metric_count; i++) {
138       if (!cmd->check.metrics[i]) break;
139       xmlFree(cmd->check.metrics[i]);
140     }
141     free(cmd->check.metrics);
142   }
143 }
144 static int
145 stmt_mark_dag(struct statement_node *stmt, int mgen) {
146   int i;
147   mtevAssert(stmt->marked <= mgen);
148   if(stmt->marked == mgen) return -1;
149   if(stmt->marked > 0) return 0; /* validated in a previous sweep */
150   stmt->marked = mgen;
151   for(i=0; i<stmt->nrequires; i++)
152     if(stmt_mark_dag(stmt->requires[i], mgen) < 0) return -1;
153   return 0;
154 }
155 static void
156 submit_statement_node(struct statement_node *stmt) {
157   int line_len, i;
158   char *line, *cp;
159
160   if(stmt->marked) return;
161   for(i=0; i<stmt->nrequires; i++)
162     submit_statement_node(stmt->requires[i]);
163
164   line_len = 3 /* 2 tabs + \0 */ +
165              1 /* 'D' */ + 1 /* '\n' */ +
166              strlen(stmt->id) + strlen(stmt->statement);
167   line = malloc(line_len);
168   snprintf(line, line_len, "D\t%s\t%s\n", stmt->id, stmt->statement);
169   cp = line;
170   while(cp[0] && cp[1]) {
171     if(*cp == '\n') *cp = ' ';
172     cp++;
173   }
174   mtevL(noit_iep, "submitting statement: %s\n", line);
175   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
176   stmt->marked = 1;
177 }
178 void stratcon_iep_submit_statements() {
179   int i, cnt = 0;
180   mtev_conf_section_t *statement_configs;
181   char path[256];
182   struct statement_node *stmt;
183   void *vstmt;
184   mtev_hash_table stmt_by_id;
185   mtev_hash_table stmt_by_provider;
186   mtev_hash_iter iter = MTEV_HASH_ITER_ZERO;
187   const char *key;
188   int klen, mgen = 0;
189
190   mtev_hash_init(&stmt_by_id);
191   mtev_hash_init(&stmt_by_provider);
192
193   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//statement");
194   statement_configs = mtev_conf_get_sections(NULL, path, &cnt);
195   mtevL(noit_debug, "Found %d %s stanzas\n", cnt, path);
196
197   /* Phase 1: sweep in all the statements */
198   for(i=0; i<cnt; i++) {
199     char id[UUID_STR_LEN+1];
200     char provides[256];
201     char *statement;
202
203     if(!mtev_conf_get_stringbuf(statement_configs[i],
204                                 "self::node()/@id",
205                                 id, sizeof(id))) {
206       mtevL(noit_iep, "No uuid specified in query\n");
207       continue;
208     }
209     if(!mtev_conf_get_stringbuf(statement_configs[i],
210                                 "ancestor-or-self::node()/@provides",
211                                 provides, sizeof(provides))) {
212       provides[0] = '\0';
213     }
214     if(!mtev_conf_get_string(statement_configs[i], "self::node()/epl",
215                              &statement)) {
216       mtevL(noit_iep, "No contents specified in statement\n");
217       continue;
218     }
219     stmt = calloc(1, sizeof(*stmt));
220     stmt->id = strdup(id);
221     stmt->statement = statement;
222     stmt->provides = provides[0] ? strdup(provides) : NULL;
223     if(!mtev_hash_store(&stmt_by_id, stmt->id, strlen(stmt->id), stmt)) {
224       mtevL(noit_iep, "Duplicate statement id: %s\n", stmt->id);
225       exit(-1);
226     }
227     if(stmt->provides) {
228       if(!mtev_hash_store(&stmt_by_provider, stmt->provides,
229                           strlen(stmt->provides), stmt)) {
230         mtevL(noit_iep, "Two statements provide: '%s'\n", stmt->provides);
231         exit(-1);
232       }
233     }
234   }
235
236   /* Phase 2: load the requires graph */
237   for(i=0; i<cnt; i++) {
238     char id[UUID_STR_LEN+1];
239     int rcnt, j;
240     char *requires;
241     mtev_conf_section_t *reqs;
242
243     if(!mtev_conf_get_stringbuf(statement_configs[i],
244                                 "self::node()/@id",
245                                 id, sizeof(id))) {
246       mtevL(noit_iep, "No uuid specified in query\n");
247       continue;
248     }
249     if(!mtev_hash_retrieve(&stmt_by_id, id, strlen(id), &vstmt)) {
250       mtevL(noit_iep, "Cannot find statement: %s\n", id);
251       exit(-1);
252     }
253     stmt = vstmt;
254     reqs = mtev_conf_get_sections(statement_configs[i],
255                                   "self::node()/requires", &rcnt);
256     if(rcnt > 0) {
257       stmt->requires = malloc(rcnt * sizeof(*(stmt->requires)));
258       for(j=0; j<rcnt; j++) {
259         void *vrstmt;
260         if(!mtev_conf_get_string(reqs[j], "self::node()",
261                                  &requires) || requires[0] == '\0') {
262           continue;
263         }
264         if(!mtev_hash_retrieve(&stmt_by_provider, requires, strlen(requires),
265                                &vrstmt)) {
266           mtevL(noit_iep,
267                 "Statement %s requires %s which no one provides.\n",
268                 stmt->id, requires);
269           exit(-1);
270         }
271         stmt->requires[stmt->nrequires++] = vrstmt;
272       }
273     }
274   }
275
276   /* Phase 3: Recursive sweep and mark to detect cycles.
277      We're walking the graph backwards here from dependent to provider,
278      but a cycle is a cycle, so this validates the graph. */
279   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
280     stmt = vstmt;
281     if(stmt_mark_dag(stmt, ++mgen) < 0) {
282       mtevL(noit_iep, "Statement %s has a cyclic requirement\n", stmt->id);
283       exit(-1);
284     }
285   }
286
287   /* Phase 4: clean the markings */
288   memset(&iter, 0, sizeof(iter));
289   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
290     stmt = vstmt;
291     stmt->marked = 0;
292   }
293
294   /* Phase 5: do the load */
295   memset(&iter, 0, sizeof(iter));
296   while(mtev_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
297     stmt = vstmt;
298     submit_statement_node(stmt);
299   }
300
301   mtev_hash_destroy(&stmt_by_provider, NULL, NULL);
302   mtev_hash_destroy(&stmt_by_id, NULL, statement_node_free);
303   free(statement_configs);
304 }
305
306 void stratcon_iep_submit_queries() {
307   int i, cnt = 0;
308   mtev_conf_section_t *query_configs;
309   char path[256];
310
311   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//query");
312   query_configs = mtev_conf_get_sections(NULL, path, &cnt);
313   mtevL(noit_debug, "Found %d %s stanzas\n", cnt, path);
314   for(i=0; i<cnt; i++) {
315     char id[UUID_STR_LEN+1];
316     char topic[256];
317     char *query;
318     char *line;
319     int line_len;
320
321     if(!mtev_conf_get_stringbuf(query_configs[i],
322                                 "self::node()/@id",
323                                 id, sizeof(id))) {
324       mtevL(noit_iep, "No uuid specified in query\n");
325       continue;
326     }
327     if(!mtev_conf_get_stringbuf(query_configs[i],
328                                 "ancestor-or-self::node()/@topic",
329                                 topic, sizeof(topic))) {
330       mtevL(noit_iep, "No topic specified in query\n");
331       continue;
332     }
333     if(!mtev_conf_get_string(query_configs[i], "self::node()/epl",
334                              &query)) {
335       mtevL(noit_iep, "No contents specified in query\n");
336       continue;
337     }
338     line_len = 4 /* 3 tabs + \0 */ +
339                1 /* 'Q' */ + 1 /* '\n' */ +
340                strlen(id) + strlen(topic) + strlen(query);
341     line = malloc(line_len);
342     snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query);
343     free(query);
344     query = line;
345     while(query[0] && query[1]) {
346       if(*query == '\n') *query = ' ';
347       query++;
348     }
349     stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
350   }
351   free(query_configs);
352 }
353
354 static struct driver_thread_data *
355 connect_iep_driver(struct driver_list *d) {
356   int rc;
357   struct driver_thread_data *data;
358   data = pthread_getspecific(d->iep_connection);
359   if(!data) {
360     data = calloc(1, sizeof(*data));
361     data->mq_driver = d->mq_driver;
362     pthread_setspecific(d->iep_connection, data);
363   }
364   if(!data->driver_data)
365     data->driver_data = data->mq_driver->allocate(d->section);
366   rc = data->mq_driver->connect(data->driver_data);
367   if(rc < 0) return NULL;
368   if(rc == 0) {
369     /* Initial connect */
370     /* TODO: this should be requested by Esper, not blindly pushed */
371     stratcon_iep_submit_statements();
372     stratcon_datastore_iep_check_preload();
373     stratcon_iep_submit_queries();
374   }
375
376   return data;
377 }
378
379 static int
380 setup_iep_connection_callback(eventer_t e, int mask, void *closure,
381                               struct timeval *now) {
382   mtev_spinlock_unlock(&iep_conn_cnt);
383   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, NULL, NULL);
384   return 0;
385 }
386
387 static void
388 setup_iep_connection_later(int seconds) {
389   eventer_t newe;
390   if(!mtev_spinlock_trylock(&iep_conn_cnt)) return;
391   newe = eventer_alloc();
392   gettimeofday(&newe->whence, NULL);
393   newe->whence.tv_sec += seconds;
394   newe->mask = EVENTER_TIMER;
395   newe->callback = setup_iep_connection_callback;
396   newe->closure = NULL;
397   eventer_add(newe);
398 }
399
400 static int
401 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
402                        struct timeval *now) {
403   double age;
404   struct iep_job_closure *job = closure;
405   char *line;
406   struct timeval diff;
407   /* We only play when it is an asynch event */
408   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
409
410   if(mask & EVENTER_ASYNCH_CLEANUP) {
411     /* free all the memory associated with the batch */
412     if(job) {
413       if(job->line) free(job->line);
414       if(job->remote) free(job->remote);
415       if(job->doc_str) free(job->doc_str);
416       free(job);
417     }
418     return 0;
419   }
420
421   /* If we're greater than 30 seconds old,
422      just quit. */
423   sub_timeval(*now, job->start, &diff);
424   if (diff.tv_sec >= 30) {
425     mtevL(noit_debug, "Skipping event from %s - waiting in eventer for more than 30 seconds\n",
426                         job->remote ? job->remote : "(null)");
427     return 0;
428   }
429
430   if(!job->line || job->line[0] == '\0') return 0;
431
432   if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) {
433     mtevL(noit_debug, "Skipping old event from %s, %f seconds old.\n",
434           job->remote ? job->remote : "(null)", age);
435     return 0;
436   }
437   /* Submit */
438   int line_len = strlen(job->line);
439   int remote_len = strlen(job->remote);
440   const char *toff = strchr(job->line, '\t');
441   int token_off = 2;
442   if(toff) token_off = toff - job->line + 1;
443
444   line = (char*)calloc(line_len + 1 /* \t */ + remote_len + 2, 1);
445   strncpy(line, job->line, token_off);
446   strncat(line, job->remote, remote_len);
447   strncat(line, "\t", 1);
448   strncat(line, job->line + token_off, line_len - token_off);
449   job->doc_str = line;
450
451   for(struct driver_list *d = drivers; d; d = d->next) {
452     struct driver_thread_data *tls = connect_iep_driver(d);
453     if(tls && tls->driver_data) {
454       if(tls->mq_driver->submit(tls->driver_data, job->doc_str,
455                                 line_len + remote_len + 1) != 0) {
456         mtevL(noit_debug, "failed to MQ submit.\n");
457       }
458     }
459   }
460   return 0;
461 }
462
463 void
464 stratcon_iep_line_processor(stratcon_datastore_op_t op,
465                             struct sockaddr *remote, const char *remote_cn,
466                             void *operand, eventer_t completion) {
467   int len;
468   char remote_str[256];
469   struct iep_job_closure *jc;
470   eventer_t newe;
471   /* We only care about inserts */
472
473   if(op == DS_OP_CHKPT) {
474     if(completion) eventer_add(completion);
475     return;
476   }
477   if(op != DS_OP_INSERT) return;
478
479   if(inject_remote_cn) {
480     if(remote_cn == NULL) remote_cn = "default";
481     strlcpy(remote_str, remote_cn, sizeof(remote_str));
482   }
483   else {
484     snprintf(remote_str, sizeof(remote_str), "%s", "0.0.0.0");
485     if(remote) {
486       switch(remote->sa_family) {
487         case AF_INET:
488           len = sizeof(struct sockaddr_in);
489           inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
490                     remote_str, len);
491           break;
492         case AF_INET6:
493          len = sizeof(struct sockaddr_in6);
494           inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
495                     remote_str, len);
496          break;
497         case AF_UNIX:
498           snprintf(remote_str, sizeof(remote_str), "%s", ((struct sockaddr_un *)remote)->sun_path);
499           break;
500       }
501     }
502   }
503
504   /* process operand and push onto queue */
505   newe = eventer_alloc();
506   newe->thr_owner = eventer_choose_owner(0);
507   newe->mask = EVENTER_ASYNCH;
508   newe->callback = stratcon_iep_submitter;
509   jc = calloc(1, sizeof(*jc));
510   jc->line = operand;
511   jc->remote = strdup(remote_str);
512   gettimeofday(&jc->start, NULL);
513   newe->closure = jc;
514
515   eventer_add_asynch(&iep_jobq, newe);
516 }
517
518 static void connection_destroy(void *vd) {
519   struct driver_thread_data *data = vd;
520   data->mq_driver->disconnect(data->driver_data);
521   data->mq_driver->deallocate(data->driver_data);
522   free(data);
523 }
524
525 jlog_streamer_ctx_t *
526 stratcon_jlog_streamer_iep_ctx_alloc(void) {
527   jlog_streamer_ctx_t *ctx;
528   ctx = stratcon_jlog_streamer_ctx_alloc();
529   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED);
530   ctx->push = stratcon_iep_line_processor;
531   return ctx;
532 }
533
534 struct iep_daemon_info {
535   pid_t child;
536   int stdin_pipe[2];
537   int stderr_pipe[2];
538   char *directory;
539   char *command;
540 };
541
542 static void
543 iep_daemon_info_free(struct iep_daemon_info *info) {
544   if(!info) return;
545   if(info->directory) free(info->directory);
546   if(info->command) free(info->command);
547   if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]);
548   if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]);
549   if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]);
550   if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]);
551   free(info);
552 }
553
554 static int
555 stratcon_iep_err_handler(eventer_t e, int mask, void *closure,
556                          struct timeval *now) {
557   int len, newmask;
558   char buff[4096];
559   struct iep_daemon_info *info = (struct iep_daemon_info *)closure;
560
561   if(mask & EVENTER_EXCEPTION) {
562     int rv;
563    read_error:
564     kill(info->child, SIGKILL);
565     if(waitpid(info->child, &rv, 0) != info->child) {
566       mtevL(noit_iep, "Failed to reap IEP daemon\n");
567       exit(-1);
568     }
569     mtevL(noit_iep, "IEP daemon is done, starting a new one\n");
570     start_iep_daemon();
571     eventer_remove_fd(e->fd);
572     iep_daemon_info_free(info);
573     return 0;
574   }
575   while(1) {
576     len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e);
577     if(len == -1 && (errno == EAGAIN || errno == EINTR))
578       return newmask | EVENTER_EXCEPTION;
579     if(len <= 0) goto read_error;
580     mtevAssert(len < sizeof(buff));
581     buff[len] = '\0';
582     mtevL(noit_iep_debug, "%s", buff);
583   }
584 }
585
586 static void
587 start_iep_daemon() {
588   eventer_t newe;
589   struct iep_daemon_info *info;
590   char *cmd = NULL;
591
592   if(!mtev_conf_get_string(NULL, "/stratcon/iep/start/@command",
593                            &cmd)) {
594     mtevL(noit_iep, "No IEP start command provided.  You're on your own.\n");
595     setup_iep_connection_later(0);
596     return;
597   }
598
599   info = calloc(1, sizeof(*info));
600   info->stdin_pipe[0] = info->stdin_pipe[1] = -1;
601   info->stderr_pipe[0] = info->stderr_pipe[1] = -1;
602   info->command = cmd;
603
604   if(!mtev_conf_get_string(NULL, "/stratcon/iep/start/@directory",
605                            &info->directory))
606     info->directory = strdup(".");
607   if(pipe(info->stdin_pipe) != 0 ||
608      pipe(info->stderr_pipe) != 0) {
609     mtevL(noit_iep, "pipe: %s\n", strerror(errno));
610     goto bail;
611   }
612   info->child = fork();
613   if(info->child == -1) {
614     mtevL(noit_iep, "fork: %s\n", strerror(errno));
615     goto bail;
616   }
617   if(info->child == 0) {
618     char *argv[3] = { "run-iep", NULL, NULL };
619     int stdout_fileno;
620
621     argv[1] = mtev_conf_config_filename();
622
623     if(chdir(info->directory) != 0) {
624       mtevL(noit_iep, "Starting IEP daemon, chdir failed: %s\n",
625             strerror(errno));
626       exit(-1);
627     }
628
629     close(info->stdin_pipe[1]);
630     close(info->stderr_pipe[0]);
631     dup2(info->stdin_pipe[0], 0);
632     dup2(info->stderr_pipe[1], 2);
633     stdout_fileno = open("/dev/null", O_WRONLY);
634     if(stdout_fileno >= 0) dup2(stdout_fileno, 1);
635
636     exit(execv(info->command, argv));
637   }
638   /* in the parent */
639   close(info->stdin_pipe[0]);
640   info->stdin_pipe[0] = -1;
641   close(info->stderr_pipe[1]);
642   info->stderr_pipe[1] = -1;
643   if(eventer_set_fd_nonblocking(info->stderr_pipe[0])) {
644     goto bail;
645   }
646
647   newe = eventer_alloc();
648   newe->fd = info->stderr_pipe[0];
649   newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
650   newe->callback = stratcon_iep_err_handler;
651   newe->closure = info;
652   eventer_add(newe);
653   info = NULL;
654
655   setup_iep_connection_later(1);
656
657   return;
658
659  bail:
660   iep_daemon_info_free(info);
661   mtevL(noit_iep, "Failed to start IEP daemon\n");
662   exit(-1);
663   return;
664 }
665
666 void
667 stratcon_iep_mq_driver_register(const char *name, mq_driver_t *d) {
668   mtev_hash_replace(&mq_drivers, strdup(name), strlen(name), d, free, NULL);
669 }
670
671 static int rest_set_filters(mtev_http_rest_closure_t *restc,
672                             int npats, char **pats) {
673   mtev_http_session_ctx *ctx = restc->http_ctx;
674   xmlXPathObjectPtr pobj = NULL;
675   xmlDocPtr doc = NULL, indoc = NULL;
676   xmlNodePtr node, root;
677   int error_code = 500, complete = 0, mask = 0, cnt = 0, i, j;
678   const char *error = "internal error";
679   xmlXPathContextPtr xpath_ctxt;
680   char *action = NULL, *uuid = NULL;
681   char xpath[1024];
682   mq_command_t *commands = NULL;
683   bool valid = true;
684
685   if (npats != 0) goto error;
686
687   indoc = rest_get_xml_upload(restc, &mask, &complete);
688   if(!complete) return mask;
689   if(indoc == NULL) {
690     error = "xml parse error";
691     goto error;
692   }
693
694   xpath_ctxt = xmlXPathNewContext(indoc);
695   pobj = xmlXPathEval((xmlChar *)"/mq_metrics/check", xpath_ctxt);
696
697   if(!pobj) {
698     error = "xml format incorrect";
699     goto error;
700   }
701   if(pobj->type != XPATH_NODESET) {
702     error = "couldn't find xml nodeset";
703     goto error;
704   }
705   cnt = xmlXPathNodeSetGetLength(pobj->nodesetval);
706   if (cnt <= 0) {
707     error = "no nodes given";
708     goto error;
709   }
710
711   commands = calloc(cnt, sizeof(mq_command_t));
712   if (!commands) goto error;
713
714   for(i=0; i<cnt; i++) {
715     xmlXPathObjectPtr metric_obj = NULL;
716
717     if (!valid) break;
718     node = xmlXPathNodeSetItem(pobj->nodesetval, i);
719     action = (char *)xmlGetProp(node, (xmlChar *)"action");
720     uuid = (char *)xmlGetProp(node, (xmlChar *)"uuid");
721     if ((action == NULL) || (uuid == NULL)) {
722       mtevL(mtev_error, "error parsing %d - need both action and uuid\n", i);
723       if (action) xmlFree(action);
724       if (uuid) xmlFree(uuid);
725       valid = false;
726       continue;
727     }
728     if (!strncmp(action, "set", 3)) {
729       commands[i].action = MQ_ACTION_SET;
730     }
731     else if (!strncmp(action, "forget", 6)) {
732       commands[i].action = MQ_ACTION_FORGET;
733     }
734     else {
735       mtevL(mtev_error, "error parsing %d - bad action (%s)\n", i, action);
736       if (action) xmlFree(action);
737       if (uuid) xmlFree(uuid);
738       valid = false;
739       continue;
740     }
741     commands[i].check.uuid = uuid;
742     snprintf(xpath, sizeof(xpath), "/mq_metrics/check[%d]/metrics/metric", i+1);
743     metric_obj = xmlXPathEval((xmlChar *)xpath, xpath_ctxt);
744     if (metric_obj && metric_obj->type == XPATH_NODESET) {
745       commands[i].check.metric_count = xmlXPathNodeSetGetLength(metric_obj->nodesetval);
746       if (commands[i].check.metric_count > 0) {
747         char *metric_name = NULL;
748         commands[i].check.metrics = calloc(commands[i].check.metric_count, sizeof(char*));
749         for (j=0; j < commands[i].check.metric_count; j++) {
750           xmlNodePtr metric_node = NULL;
751           if (!valid) break;
752           metric_node = xmlXPathNodeSetItem(metric_obj->nodesetval, j);
753           metric_name = (char *)xmlGetProp(metric_node, (xmlChar *)"name");
754           if (!metric_name) {
755             valid = false;
756             continue;
757           }
758           commands[i].check.metrics[j] = metric_name;
759         }
760       }
761     }
762   }
763
764   if (!valid) {
765     goto error;
766   }
767
768   for(struct driver_list *d = drivers; d; d = d->next) {
769     if (d->mq_driver) {
770       d->mq_driver->set_filters(commands, cnt);
771     }
772   }
773
774   mtev_http_response_ok(restc->http_ctx, "text/xml");
775   mtev_http_response_end(restc->http_ctx);
776   goto cleanup;
777
778  error:
779   mtev_http_response_standard(ctx, error_code, "ERROR", "text/xml");
780   doc = xmlNewDoc((xmlChar *)"1.0");
781   root = xmlNewDocNode(doc, NULL, (xmlChar *)"error", NULL);
782   xmlDocSetRootElement(doc, root);
783   xmlNodeAddContent(root, (xmlChar *)error);
784   mtev_http_response_xml(ctx, doc);
785   mtev_http_response_end(ctx);
786
787  cleanup:
788   if (commands) {
789     for (i=0; i<cnt; i++) {
790       mq_command_free(&commands[i]);
791     }
792     free(commands);
793   }
794   return 0;
795 }
796
797 void
798 stratcon_iep_init() {
799   mtev_conf_section_t *mqs;
800   int i, cnt;
801   mtev_boolean disabled = mtev_false;
802   char mq_type[128] = "stomp";
803   /* Only 32 so we can print out a reasonable length bad value */
804   char remote[32] = "ip";
805   struct driver_list *newdriver;
806   void *vdriver;
807
808   noit_iep = mtev_log_stream_find("error/iep");
809   noit_iep_debug = mtev_log_stream_find("debug/iep");
810   if(!noit_iep) noit_iep = noit_error;
811   if(!noit_iep_debug) noit_iep_debug = noit_debug;
812
813   if(mtev_conf_get_stringbuf(NULL, "/stratcon/iep/@inject_remote", remote, sizeof(remote))) {
814     if(strcmp(remote, "ip") && strcmp(remote, "cn")) {
815       mtevL(noit_iep, "Bad @remote_inject \"%s\", setting to \"cn\"\n", remote);
816       strlcpy(remote, "ip", sizeof(remote));
817     }
818   }
819   if(!strcmp(remote, "ip")) inject_remote_cn = mtev_false;
820   else if(!strcmp(remote, "cn")) inject_remote_cn = mtev_true;
821
822   if(mtev_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) &&
823      disabled == mtev_true) {
824     mtevL(noit_iep, "IEP system is disabled!\n");
825     return;
826   }
827
828   mqs = mtev_conf_get_sections(NULL, "/stratcon/iep/mq", &cnt);
829   for(i=0; i<cnt; i++) {
830     if(!mtev_conf_get_stringbuf(mqs[i], "@type",
831                                 mq_type, sizeof(mq_type))) {
832       mtevL(noit_iep, "You must specify an <mq type=\"...\"> that is valid.\n");
833       exit(-2);
834     }
835     if(!mtev_hash_retrieve(&mq_drivers, mq_type, strlen(mq_type), &vdriver) ||
836        vdriver == NULL) {
837       mtevL(noit_iep, "Cannot find MQ driver type: %s\n", mq_type);
838       mtevL(noit_iep, "Did you forget to load a module?\n");
839       exit(-2);
840     }
841     newdriver = calloc(1, sizeof(*newdriver));
842     newdriver->section = mqs[i];
843     newdriver->mq_driver = (mq_driver_t *)vdriver;
844     pthread_key_create(&newdriver->iep_connection, connection_destroy);
845     newdriver->next = drivers;
846     drivers = newdriver;
847   }
848   free(mqs);
849
850   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
851   eventer_name_callback("stratcon_iep_err_handler", stratcon_iep_err_handler);
852   eventer_name_callback("setup_iep_connection_callback", setup_iep_connection_callback);
853
854   /* start up a thread pool of one */
855   memset(&iep_jobq, 0, sizeof(iep_jobq));
856   eventer_jobq_init(&iep_jobq, "iep_submitter");
857   eventer_jobq_increase_concurrency(&iep_jobq);
858
859   mtevAssert(mtev_http_rest_register_auth(
860     "PUT", "/", "^mq_filters$",
861     rest_set_filters, mtev_http_rest_client_cert_auth
862   ) == 0);
863
864   start_iep_daemon();
865
866   /* setup our live jlog stream */
867   stratcon_streamer_connection(NULL, NULL, "noit",
868                                stratcon_jlog_recv_handler,
869                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
870                                NULL,
871                                jlog_streamer_ctx_free);
872 }
873
874 void
875 stratcon_iep_init_globals(void) {
876   mtev_hash_init(&mq_drivers);
877 }
878
Note: See TracBrowser for help on using the browser.