root/src/stratcon_iep.c

Revision fa38a804ef05fd92d811202fb9a3b92209bfa2bc, 20.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 8 months ago)

Alter the iep_drivers to support multiple concurrently loaded drivers.
Now multiple <mq> sections can be present and each is used such that
message are sent over all listed MQs.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "noit_jlog_listener.h"
38 #include "stratcon_jlog_streamer.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_iep.h"
41 #include "noit_conf.h"
42 #include "noit_check.h"
43
44 #include <sys/types.h>
45 #ifdef HAVE_SYS_WAIT_H
46 #include <sys/wait.h>
47 #endif
48 #include <sys/stat.h>
49 #include <fcntl.h>
50 #include <unistd.h>
51 #include <sys/fcntl.h>
52 #ifdef HAVE_SYS_FILIO_H
53 #include <sys/filio.h>
54 #endif
55 #include <signal.h>
56 #include <errno.h>
57 #include <assert.h>
58
59 eventer_jobq_t iep_jobq;
60 static noit_log_stream_t noit_iep = NULL;
61 static noit_log_stream_t noit_iep_debug = NULL;
62 static noit_spinlock_t iep_conn_cnt = 0;
63
64 static noit_hash_table mq_drivers = NOIT_HASH_EMPTY;
65 struct driver_thread_data {
66   mq_driver_t *mq_driver;
67   struct iep_thread_driver *driver_data;
68 };
69 struct driver_list {
70   mq_driver_t *mq_driver;
71   pthread_key_t iep_connection;
72   noit_conf_section_t section;
73   struct driver_list *next;
74 } *drivers;
75
76 static int iep_system_enabled = 1;
77 int stratcon_iep_get_enabled() { return iep_system_enabled; }
78 void stratcon_iep_set_enabled(int n) { iep_system_enabled = n; }
79
80
81 struct iep_job_closure {
82   char *line;       /* This is a copy and gets trashed during processing */
83   char *remote;
84   char *doc_str;
85 };
86
87 static void
88 start_iep_daemon();
89
90 static double
91 stratcon_iep_age_from_line(char *data, struct timeval now) {
92   double n, t;
93   if(data && (*data == 'S' || *data == 'M')) {
94     if(data[1] != '\t') return 0;
95     t = strtod(data + 2, NULL);
96     n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0;
97     return n - t;
98   }
99   return 0;
100 }
101
102 struct statement_node {
103   char *id;
104   char *statement;
105   char *provides;
106   int marked; /* helps with identifying cycles */
107   int nrequires;
108   struct statement_node **requires;
109 };
110 static void
111 statement_node_free(void *vstmt) {
112   struct statement_node *stmt = vstmt;
113   if(stmt->id) free(stmt->id);
114   if(stmt->statement) free(stmt->statement);
115   if(stmt->provides) free(stmt->provides);
116   if(stmt->requires) free(stmt->requires);
117 }
118 static int
119 stmt_mark_dag(struct statement_node *stmt, int mgen) {
120   int i;
121   assert(stmt->marked <= mgen);
122   if(stmt->marked == mgen) return -1;
123   if(stmt->marked > 0) return 0; /* validated in a previous sweep */
124   stmt->marked = mgen;
125   for(i=0; i<stmt->nrequires; i++)
126     if(stmt_mark_dag(stmt->requires[i], mgen) < 0) return -1;
127   return 0;
128 }
129 static void
130 submit_statement_node(struct statement_node *stmt) {
131   int line_len, i;
132   char *line, *cp;
133
134   if(stmt->marked) return;
135   for(i=0; i<stmt->nrequires; i++)
136     submit_statement_node(stmt->requires[i]);
137
138   line_len = 3 /* 2 tabs + \0 */ +
139              1 /* 'D' */ + 1 /* '\n' */ +
140              strlen(stmt->id) + strlen(stmt->statement);
141   line = malloc(line_len);
142   snprintf(line, line_len, "D\t%s\t%s\n", stmt->id, stmt->statement);
143   cp = line;
144   while(cp[0] && cp[1]) {
145     if(*cp == '\n') *cp = ' ';
146     cp++;
147   }
148   noitL(noit_iep, "submitting statement: %s\n", line);
149   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
150   stmt->marked = 1;
151 }
152 void stratcon_iep_submit_statements() {
153   int i, cnt = 0;
154   noit_conf_section_t *statement_configs;
155   char path[256];
156   struct statement_node *stmt;
157   void *vstmt;
158   noit_hash_table stmt_by_id = NOIT_HASH_EMPTY;
159   noit_hash_table stmt_by_provider = NOIT_HASH_EMPTY;
160   noit_hash_iter iter = NOIT_HASH_ITER_ZERO;
161   const char *key;
162   int klen, mgen = 0;
163
164   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//statement");
165   statement_configs = noit_conf_get_sections(NULL, path, &cnt);
166   noitL(noit_debug, "Found %d %s stanzas\n", cnt, path);
167
168   /* Phase 1: sweep in all the statements */
169   for(i=0; i<cnt; i++) {
170     char id[UUID_STR_LEN+1];
171     char provides[256];
172     char *statement;
173
174     if(!noit_conf_get_stringbuf(statement_configs[i],
175                                 "self::node()/@id",
176                                 id, sizeof(id))) {
177       noitL(noit_iep, "No uuid specified in query\n");
178       continue;
179     }
180     if(!noit_conf_get_stringbuf(statement_configs[i],
181                                 "ancestor-or-self::node()/@provides",
182                                 provides, sizeof(provides))) {
183       provides[0] = '\0';
184     }
185     if(!noit_conf_get_string(statement_configs[i], "self::node()/epl",
186                              &statement)) {
187       noitL(noit_iep, "No contents specified in statement\n");
188       continue;
189     }
190     stmt = calloc(1, sizeof(*stmt));
191     stmt->id = strdup(id);
192     stmt->statement = statement;
193     stmt->provides = provides[0] ? strdup(provides) : NULL;
194     if(!noit_hash_store(&stmt_by_id, stmt->id, strlen(stmt->id), stmt)) {
195       noitL(noit_iep, "Duplicate statement id: %s\n", stmt->id);
196       exit(-1);
197     }
198     if(stmt->provides) {
199       if(!noit_hash_store(&stmt_by_provider, stmt->provides,
200                           strlen(stmt->provides), stmt)) {
201         noitL(noit_iep, "Two statements provide: '%s'\n", stmt->provides);
202         exit(-1);
203       }
204     }
205   }
206
207   /* Phase 2: load the requires graph */
208   for(i=0; i<cnt; i++) {
209     char id[UUID_STR_LEN+1];
210     int rcnt, j;
211     char *requires;
212     noit_conf_section_t *reqs;
213
214     if(!noit_conf_get_stringbuf(statement_configs[i],
215                                 "self::node()/@id",
216                                 id, sizeof(id))) {
217       noitL(noit_iep, "No uuid specified in query\n");
218       continue;
219     }
220     if(!noit_hash_retrieve(&stmt_by_id, id, strlen(id), &vstmt)) {
221       noitL(noit_iep, "Cannot find statement: %s\n", id);
222       exit(-1);
223     }
224     stmt = vstmt;
225     reqs = noit_conf_get_sections(statement_configs[i],
226                                   "self::node()/requires", &rcnt);
227     if(rcnt > 0) {
228       stmt->requires = malloc(rcnt * sizeof(*(stmt->requires)));
229       for(j=0; j<rcnt; j++) {
230         void *vrstmt;
231         if(!noit_conf_get_string(reqs[j], "self::node()",
232                                  &requires) || requires[0] == '\0') {
233           continue;
234         }
235         if(!noit_hash_retrieve(&stmt_by_provider, requires, strlen(requires),
236                                &vrstmt)) {
237           noitL(noit_iep,
238                 "Statement %s requires %s which no one provides.\n",
239                 stmt->id, requires);
240           exit(-1);
241         }
242         stmt->requires[stmt->nrequires++] = vrstmt;
243       }
244     }
245   }
246
247   /* Phase 3: Recursive sweep and mark to detect cycles.
248      We're walking the graph backwards here from dependent to provider,
249      but a cycle is a cycle, so this validates the graph. */
250   while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
251     stmt = vstmt;
252     if(stmt_mark_dag(stmt, ++mgen) < 0) {
253       noitL(noit_iep, "Statement %s has a cyclic requirement\n", stmt->id);
254       exit(-1);
255     }
256   }
257
258   /* Phase 4: clean the markings */
259   memset(&iter, 0, sizeof(iter));
260   while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
261     stmt = vstmt;
262     stmt->marked = 0;
263   }
264
265   /* Phase 5: do the load */
266   memset(&iter, 0, sizeof(iter));
267   while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) {
268     stmt = vstmt;
269     submit_statement_node(stmt);
270   }
271
272   noit_hash_destroy(&stmt_by_provider, NULL, NULL);
273   noit_hash_destroy(&stmt_by_id, NULL, statement_node_free);
274   free(statement_configs);
275 }
276
277 void stratcon_iep_submit_queries() {
278   int i, cnt = 0;
279   noit_conf_section_t *query_configs;
280   char path[256];
281
282   snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//query");
283   query_configs = noit_conf_get_sections(NULL, path, &cnt);
284   noitL(noit_debug, "Found %d %s stanzas\n", cnt, path);
285   for(i=0; i<cnt; i++) {
286     char id[UUID_STR_LEN+1];
287     char topic[256];
288     char *query;
289     char *line;
290     int line_len;
291
292     if(!noit_conf_get_stringbuf(query_configs[i],
293                                 "self::node()/@id",
294                                 id, sizeof(id))) {
295       noitL(noit_iep, "No uuid specified in query\n");
296       continue;
297     }
298     if(!noit_conf_get_stringbuf(query_configs[i],
299                                 "ancestor-or-self::node()/@topic",
300                                 topic, sizeof(topic))) {
301       noitL(noit_iep, "No topic specified in query\n");
302       continue;
303     }
304     if(!noit_conf_get_string(query_configs[i], "self::node()/epl",
305                              &query)) {
306       noitL(noit_iep, "No contents specified in query\n");
307       continue;
308     }
309     line_len = 4 /* 3 tabs + \0 */ +
310                1 /* 'Q' */ + 1 /* '\n' */ +
311                strlen(id) + strlen(topic) + strlen(query);
312     line = malloc(line_len);
313     snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query);
314     free(query);
315     query = line;
316     while(query[0] && query[1]) {
317       if(*query == '\n') *query = ' ';
318       query++;
319     }
320     stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, line, NULL);
321   }
322   free(query_configs);
323 }
324
325 static struct driver_thread_data *
326 connect_iep_driver(struct driver_list *d) {
327   int rc;
328   struct driver_thread_data *data;
329   data = pthread_getspecific(d->iep_connection);
330   if(!data) {
331     data = calloc(1, sizeof(*data));
332     data->mq_driver = d->mq_driver;
333     pthread_setspecific(d->iep_connection, data);
334   }
335   if(!data->driver_data)
336     data->driver_data = data->mq_driver->allocate(d->section);
337   rc = data->mq_driver->connect(data->driver_data);
338   if(rc < 0) return NULL;
339   if(rc == 0) {
340     /* Initial connect */
341     /* TODO: this should be requested by Esper, not blindly pushed */
342     stratcon_iep_submit_statements();
343     stratcon_datastore_iep_check_preload();
344     stratcon_iep_submit_queries();
345   }
346
347   return data;
348 }
349
350 static int
351 setup_iep_connection_callback(eventer_t e, int mask, void *closure,
352                               struct timeval *now) {
353   noit_spinlock_unlock(&iep_conn_cnt);
354   stratcon_iep_line_processor(DS_OP_INSERT, NULL, NULL, NULL, NULL);
355   return 0;
356 }
357
358 static void
359 setup_iep_connection_later(int seconds) {
360   eventer_t newe;
361   if(!noit_spinlock_trylock(&iep_conn_cnt)) return;
362   newe = eventer_alloc();
363   gettimeofday(&newe->whence, NULL);
364   newe->whence.tv_sec += seconds;
365   newe->mask = EVENTER_TIMER;
366   newe->callback = setup_iep_connection_callback;
367   newe->closure = NULL;
368   eventer_add(newe);
369 }
370
371 static int
372 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
373                        struct timeval *now) {
374   double age;
375   struct iep_job_closure *job = closure;
376   char *line;
377   /* We only play when it is an asynch event */
378   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
379
380   if(mask & EVENTER_ASYNCH_CLEANUP) {
381     /* free all the memory associated with the batch */
382     if(job) {
383       if(job->line) free(job->line);
384       if(job->remote) free(job->remote);
385       if(job->doc_str) free(job->doc_str);
386       free(job);
387     }
388     return 0;
389   }
390
391   if(!job->line || job->line[0] == '\0') return 0;
392
393   if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) {
394     noitL(noit_debug, "Skipping old event from %s, %f seconds old.\n",
395           job->remote ? job->remote : "(null)", age);
396     return 0;
397   }
398   /* Submit */
399   int line_len = strlen(job->line);
400   int remote_len = strlen(job->remote);
401   const char *toff = strchr(job->line, '\t');
402   int token_off = 2;
403   if(toff) token_off = toff - job->line + 1;
404
405   line = (char*)calloc(line_len + 1 /* \t */ + remote_len + 2, 1);
406   strncpy(line, job->line, token_off);
407   strncat(line, job->remote, remote_len);
408   strncat(line, "\t", 1);
409   strncat(line, job->line + token_off, line_len - token_off);
410   job->doc_str = line;
411
412   for(struct driver_list *d = drivers; d; d = d->next) {
413     struct driver_thread_data *tls = connect_iep_driver(d);
414     if(tls && tls->driver_data) {
415       if(tls->mq_driver->submit(tls->driver_data, job->doc_str,
416                                 line_len + remote_len + 1) != 0) {
417         noitL(noit_debug, "failed to MQ submit.\n");
418       }
419     }
420   }
421   return 0;
422 }
423
424 void
425 stratcon_iep_line_processor(stratcon_datastore_op_t op,
426                             struct sockaddr *remote, const char *remote_cn,
427                             void *operand, eventer_t completion) {
428   int len;
429   char remote_str[128];
430   struct iep_job_closure *jc;
431   eventer_t newe;
432   struct timeval __now, iep_timeout = { 10L, 0L };
433   /* We only care about inserts */
434
435   if(op == DS_OP_CHKPT) {
436     if(completion) eventer_add(completion);
437     return;
438   }
439   if(op != DS_OP_INSERT) return;
440
441   snprintf(remote_str, sizeof(remote_str), "%s", "0.0.0.0");
442   if(remote) {
443     switch(remote->sa_family) {
444       case AF_INET:
445         len = sizeof(struct sockaddr_in);
446         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
447                   remote_str, len);
448         break;
449       case AF_INET6:
450        len = sizeof(struct sockaddr_in6);
451         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
452                   remote_str, len);
453        break;
454       case AF_UNIX:
455         snprintf(remote_str, sizeof(remote_str), "%s", ((struct sockaddr_un *)remote)->sun_path);
456         break;
457     }
458   }
459
460   /* process operand and push onto queue */
461   gettimeofday(&__now, NULL);
462   newe = eventer_alloc();
463   newe->mask = EVENTER_ASYNCH;
464   add_timeval(__now, iep_timeout, &newe->whence);
465   newe->callback = stratcon_iep_submitter;
466   jc = calloc(1, sizeof(*jc));
467   jc->line = operand;
468   jc->remote = strdup(remote_str);
469   newe->closure = jc;
470
471   eventer_add_asynch(&iep_jobq, newe);
472 }
473
474 static void connection_destroy(void *vd) {
475   struct driver_thread_data *data = vd;
476   data->mq_driver->disconnect(data->driver_data);
477   data->mq_driver->deallocate(data->driver_data);
478   free(data);
479 }
480
481 jlog_streamer_ctx_t *
482 stratcon_jlog_streamer_iep_ctx_alloc(void) {
483   jlog_streamer_ctx_t *ctx;
484   ctx = stratcon_jlog_streamer_ctx_alloc();
485   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED);
486   ctx->push = stratcon_iep_line_processor;
487   return ctx;
488 }
489
490 struct iep_daemon_info {
491   pid_t child;
492   int stdin_pipe[2];
493   int stderr_pipe[2];
494   char *directory;
495   char *command;
496 };
497
498 static void
499 iep_daemon_info_free(struct iep_daemon_info *info) {
500   if(!info) return;
501   if(info->directory) free(info->directory);
502   if(info->command) free(info->command);
503   if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]);
504   if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]);
505   if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]);
506   if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]);
507   free(info);
508 }
509
510 static int
511 stratcon_iep_err_handler(eventer_t e, int mask, void *closure,
512                          struct timeval *now) {
513   int len, newmask;
514   char buff[4096];
515   struct iep_daemon_info *info = (struct iep_daemon_info *)closure;
516
517   if(mask & EVENTER_EXCEPTION) {
518     int rv;
519    read_error:
520     kill(info->child, SIGKILL);
521     if(waitpid(info->child, &rv, 0) != info->child) {
522       noitL(noit_iep, "Failed to reap IEP daemon\n");
523       exit(-1);
524     }
525     noitL(noit_iep, "IEP daemon is done, starting a new one\n");
526     start_iep_daemon();
527     eventer_remove_fd(e->fd);
528     iep_daemon_info_free(info);
529     return 0;
530   }
531   while(1) {
532     len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e);
533     if(len == -1 && (errno == EAGAIN || errno == EINTR))
534       return newmask | EVENTER_EXCEPTION;
535     if(len <= 0) goto read_error;
536     assert(len < sizeof(buff));
537     buff[len] = '\0';
538     noitL(noit_iep_debug, "%s", buff);
539   }
540 }
541
542 static void
543 start_iep_daemon() {
544   eventer_t newe;
545   struct iep_daemon_info *info;
546   char *cmd = NULL;
547
548   if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@command",
549                            &cmd)) {
550     noitL(noit_iep, "No IEP start command provided.  You're on your own.\n");
551     setup_iep_connection_later(0);
552     return;
553   }
554
555   info = calloc(1, sizeof(*info));
556   info->stdin_pipe[0] = info->stdin_pipe[1] = -1;
557   info->stderr_pipe[0] = info->stderr_pipe[1] = -1;
558   info->command = cmd;
559
560   if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@directory",
561                            &info->directory))
562     info->directory = strdup(".");
563   if(pipe(info->stdin_pipe) != 0 ||
564      pipe(info->stderr_pipe) != 0) {
565     noitL(noit_iep, "pipe: %s\n", strerror(errno));
566     goto bail;
567   }
568   info->child = fork();
569   if(info->child == -1) {
570     noitL(noit_iep, "fork: %s\n", strerror(errno));
571     goto bail;
572   }
573   if(info->child == 0) {
574     char *argv[3] = { "run-iep", NULL, NULL };
575     int stdout_fileno;
576
577     argv[1] = noit_conf_config_filename();
578
579     if(chdir(info->directory) != 0) {
580       noitL(noit_iep, "Starting IEP daemon, chdir failed: %s\n",
581             strerror(errno));
582       exit(-1);
583     }
584
585     close(info->stdin_pipe[1]);
586     close(info->stderr_pipe[0]);
587     dup2(info->stdin_pipe[0], 0);
588     dup2(info->stderr_pipe[1], 2);
589     stdout_fileno = open("/dev/null", O_WRONLY);
590     if(stdout_fileno >= 0) dup2(stdout_fileno, 1);
591
592     exit(execv(info->command, argv));
593   }
594   /* in the parent */
595   close(info->stdin_pipe[0]);
596   info->stdin_pipe[0] = -1;
597   close(info->stderr_pipe[1]);
598   info->stderr_pipe[1] = -1;
599   if(eventer_set_fd_nonblocking(info->stderr_pipe[0])) {
600     goto bail;
601   }
602
603   newe = eventer_alloc();
604   newe->fd = info->stderr_pipe[0];
605   newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
606   newe->callback = stratcon_iep_err_handler;
607   newe->closure = info;
608   eventer_add(newe);
609   info = NULL;
610
611   setup_iep_connection_later(1);
612
613   return;
614
615  bail:
616   iep_daemon_info_free(info);
617   noitL(noit_iep, "Failed to start IEP daemon\n");
618   exit(-1);
619   return;
620 }
621
622 void
623 stratcon_iep_mq_driver_register(const char *name, mq_driver_t *d) {
624   noit_hash_replace(&mq_drivers, strdup(name), strlen(name), d, free, NULL);
625 }
626
627 void
628 stratcon_iep_init() {
629   noit_conf_section_t *mqs;
630   int i, cnt;
631   noit_boolean disabled = noit_false;
632   char mq_type[128] = "stomp";
633   struct driver_list *newdriver;
634   void *vdriver;
635
636   noit_iep = noit_log_stream_find("error/iep");
637   noit_iep_debug = noit_log_stream_find("debug/iep");
638   if(!noit_iep) noit_iep = noit_error;
639   if(!noit_iep_debug) noit_iep_debug = noit_debug;
640
641   if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) &&
642      disabled == noit_true) {
643     noitL(noit_iep, "IEP system is disabled!\n");
644     return;
645   }
646
647   mqs = noit_conf_get_sections(NULL, "/stratcon/iep/mq", &cnt);
648   for(i=0; i<cnt; i++) {
649     if(!noit_conf_get_stringbuf(mqs[i], "@type",
650                                 mq_type, sizeof(mq_type))) {
651       noitL(noit_iep, "You must specify an <mq type=\"...\"> that is valid.\n");
652       exit(-2);
653     }
654     if(!noit_hash_retrieve(&mq_drivers, mq_type, strlen(mq_type), &vdriver) ||
655        vdriver == NULL) {
656       noitL(noit_iep, "Cannot find MQ driver type: %s\n", mq_type);
657       noitL(noit_iep, "Did you forget to load a module?\n");
658       exit(-2);
659     }
660     newdriver = calloc(1, sizeof(*newdriver));
661     newdriver->section = mqs[i];
662     newdriver->mq_driver = (mq_driver_t *)vdriver;
663     pthread_key_create(&newdriver->iep_connection, connection_destroy);
664     newdriver->next = drivers;
665     drivers = newdriver;
666   }
667   free(mqs);
668
669   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
670   eventer_name_callback("stratcon_iep_err_handler", stratcon_iep_err_handler);
671   eventer_name_callback("setup_iep_connection_callback", setup_iep_connection_callback);
672
673   /* start up a thread pool of one */
674   memset(&iep_jobq, 0, sizeof(iep_jobq));
675   eventer_jobq_init(&iep_jobq, "iep_submitter");
676   iep_jobq.backq = eventer_default_backq();
677   eventer_jobq_increase_concurrency(&iep_jobq);
678
679   start_iep_daemon();
680
681   /* setup our live jlog stream */
682   stratcon_streamer_connection(NULL, NULL,
683                                stratcon_jlog_recv_handler,
684                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
685                                NULL,
686                                jlog_streamer_ctx_free);
687 }
688
Note: See TracBrowser for help on using the browser.