root/src/stratcon_iep.c

Revision ea6ba4161f18ddc6b87d887e6bda5c05ae18ceef, 19.3 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 9 years ago)

fixes #123

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are
7  * met:
8  *
9  *     * Redistributions of source code must retain the above copyright
10  *       notice, this list of conditions and the following disclaimer.
11  *     * Redistributions in binary form must reproduce the above
12  *       copyright notice, this list of conditions and the following
13  *       disclaimer in the documentation and/or other materials provided
14  *       with the distribution.
15  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
16  *       of its contributors may be used to endorse or promote products
17  *       derived from this software without specific prior written
18  *       permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
21  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
22  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
23  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
24  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
25  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
26  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
27  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
28  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
29  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
30  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32
33 #include "noit_defines.h"
34 #include "eventer/eventer.h"
35 #include "utils/noit_log.h"
36 #include "utils/noit_b64.h"
37 #include "noit_jlog_listener.h"
38 #include "stratcon_jlog_streamer.h"
39 #include "stratcon_datastore.h"
40 #include "stratcon_iep.h"
41 #include "noit_conf.h"
42 #include "noit_check.h"
43 #include "noit_xml.h"
44
45 #include <unistd.h>
46 #include <sys/fcntl.h>
47 #include <assert.h>
48 #include <libxml/parser.h>
49 #include <libxml/tree.h>
50 #include <libxml/xmlsave.h>
51 #ifdef OPENWIRE
52 #include "amqcs.h"
53 #else
54 #include "stomp/stomp.h"
55 #endif
56
57 eventer_jobq_t iep_jobq;
58
59 struct iep_thread_driver {
60 #ifdef OPENWIRE
61   amqcs_connect_options connect_options;
62   amqcs_connection *connection;
63 #else
64   stomp_connection *connection;
65 #endif
66   apr_pool_t *pool;
67 };
68 pthread_key_t iep_connection;
69
70 struct iep_job_closure {
71   char *line;       /* This is a copy and gets trashed during processing */
72   char *remote;
73   xmlDocPtr doc;
74   char *doc_str;
75   apr_pool_t *pool;
76 };
77
78 static void
79 start_iep_daemon();
80
81 static int
82 bust_to_parts(char *in, char **p, int len) {
83   int cnt = 0;
84   char *s = in;
85   while(cnt < len) {
86     p[cnt++] = s;
87     while(*s && *s != '\t') s++;
88     if(!*s) break;
89     *s++ = '\0';
90   }
91   while(*s) s++; /* Move to end */
92   if(s > in && *(s-1) == '\n') *(s-1) = '\0'; /* chomp */
93   return cnt;
94 }
95
96 #define ADDCHILD(a,b) \
97   xmlNewTextChild(root, NULL, (xmlChar *)(a), (xmlChar *)(b))
98 #define NEWDOC(xmldoc,n,stanza) do { \
99   xmlNodePtr root; \
100   xmldoc = xmlNewDoc((xmlChar *)"1.0"); \
101   root = xmlNewDocNode(xmldoc, NULL, (xmlChar *)(n), NULL); \
102   xmlDocSetRootElement(xmldoc, root); \
103   stanza \
104 } while(0)
105
106
107 static xmlDocPtr
108 stratcon_iep_doc_from_status(char *data, char *remote) {
109   xmlDocPtr doc;
110   char *parts[7];
111   if(bust_to_parts(data, parts, 7) != 7) return NULL;
112   /* 'S' TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE */
113   NEWDOC(doc, "NoitStatus",
114          {
115            ADDCHILD("remote", remote);
116            ADDCHILD("id", parts[2]);
117            ADDCHILD("state", parts[3]);
118            ADDCHILD("availability", parts[4]);
119            ADDCHILD("duration", parts[5]);
120            ADDCHILD("status", parts[6]);
121          });
122   return doc;
123 }
124
125 static xmlDocPtr
126 stratcon_iep_doc_from_check(char *data, char *remote) {
127   xmlDocPtr doc;
128   char *parts[6];
129   if(bust_to_parts(data, parts, 6) != 6) return NULL;
130   /* 'C' TIMESTAMP UUID TARGET MODULE NAME */
131   NEWDOC(doc, "NoitCheck",
132          {
133            ADDCHILD("remote", remote);
134            ADDCHILD("id", parts[2]);
135            ADDCHILD("target", parts[3]);
136            ADDCHILD("module", parts[4]);
137            ADDCHILD("name", parts[5]);
138          });
139 noitL(noit_error,"Submitting check %s\n", parts[2]);
140   return doc;
141 }
142
143 static xmlDocPtr
144 stratcon_iep_doc_from_metric(char *data, char *remote) {
145   xmlDocPtr doc;
146   char *parts[6];
147   const char *rootname = "NoitMetricNumeric";
148   const char *valuename = "value";
149   if(bust_to_parts(data, parts, 6) != 6) return NULL;
150   /*  'M' TIMESTAMP UUID NAME TYPE VALUE */
151
152   if(*parts[4] == METRIC_STRING) {
153     rootname = "NoitMetricText";
154     valuename = "message";
155   }
156   NEWDOC(doc, rootname,
157          {
158            ADDCHILD("remote", remote);
159            ADDCHILD("id", parts[2]);
160            ADDCHILD("name", parts[3]);
161            ADDCHILD(valuename, parts[5]);
162          });
163   return doc;
164 }
165
166 static xmlDocPtr
167 stratcon_iep_doc_from_query(char *data, char *remote) {
168   xmlDocPtr doc;
169   char *parts[4];
170   if(bust_to_parts(data, parts, 4) != 4) return NULL;
171   /*  'Q' ID NAME QUERY  */
172
173   NEWDOC(doc, "StratconQuery",
174          {
175            ADDCHILD("id", parts[1]);
176            ADDCHILD("name", parts[2]);
177            ADDCHILD("expression", parts[3]);
178          });
179   return doc;
180 }
181
182 static xmlDocPtr
183 stratcon_iep_doc_from_querystop(char *data, char *remote) {
184   xmlDocPtr doc;
185   char *parts[2];
186   if(bust_to_parts(data, parts, 2) != 2) return NULL;
187   /*  'Q' ID */
188
189   NEWDOC(doc, "StratconQueryStop",
190          {
191            xmlNodeSetContent(root, (xmlChar *)parts[1]);
192          });
193   return doc;
194 }
195
196 static xmlDocPtr
197 stratcon_iep_doc_from_line(char *data, char *remote) {
198   if(data) {
199     switch(*data) {
200       case 'C': return stratcon_iep_doc_from_check(data, remote);
201       case 'S': return stratcon_iep_doc_from_status(data, remote);
202       case 'M': return stratcon_iep_doc_from_metric(data, remote);
203       case 'Q': return stratcon_iep_doc_from_query(data, remote);
204       case 'q': return stratcon_iep_doc_from_querystop(data, remote);
205     }
206   }
207   return NULL;
208 }
209
210 static float
211 stratcon_iep_age_from_line(char *data, struct timeval now) {
212   float n, t;
213   if(data && (*data == 'S' || *data == 'M')) {
214     if(data[1] != '\t') return 0;
215     t = strtof(data + 2, NULL);
216     n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0;
217     return n - t;
218   }
219   return 0;
220 }
221
222 void stratcon_iep_submit_queries() {
223   int i, cnt = 0;
224   noit_conf_section_t *query_configs;
225   char path[256];
226
227   snprintf(path, sizeof(path), "/stratcon/iep/queries//query");
228   query_configs = noit_conf_get_sections(NULL, path, &cnt);
229   noitL(noit_debug, "Found %d %s stanzas\n", cnt, path);
230   for(i=0; i<cnt; i++) {
231     char id[UUID_STR_LEN];
232     char topic[256];
233     char *query;
234     char *line;
235     int line_len;
236
237     if(!noit_conf_get_stringbuf(query_configs[i],
238                                 "self::node()/@id",
239                                 id, sizeof(id))) {
240       noitL(noit_error, "No uuid specified in query\n");
241       continue;
242     }
243     if(!noit_conf_get_stringbuf(query_configs[i],
244                                 "ancestor-or-self::node()/@topic",
245                                 topic, sizeof(topic))) {
246       noitL(noit_error, "No topic specified in query\n");
247       continue;
248     }
249     if(!noit_conf_get_string(query_configs[i], "self::node()",
250                              &query)) {
251       noitL(noit_error, "No contents specified in query\n");
252       continue;
253     }
254     line_len = 4 /* 3 tabs + \0 */ +
255                1 /* 'Q' */ + 1 /* '\n' */ +
256                strlen(id) + strlen(topic) + strlen(query);
257     line = malloc(line_len);
258     snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query);
259     free(query);
260     query = line;
261     while(query[0] && query[1]) {
262       if(*query == '\n') *query = ' ';
263       query++;
264     }
265     stratcon_iep_line_processor(DS_OP_INSERT, NULL, line);
266     free(line);
267   }
268 }
269
270 static
271 struct iep_thread_driver *stratcon_iep_get_connection() {
272   apr_status_t rc;
273   struct iep_thread_driver *driver;
274   driver = pthread_getspecific(iep_connection);
275   if(!driver) {
276     driver = calloc(1, sizeof(*driver));
277     pthread_setspecific(iep_connection, driver);
278   }
279
280   if(!driver->pool) {
281     if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL;
282   }
283
284   if(!driver->connection) {
285     int port;
286     char hostname[128];
287     if(!noit_conf_get_int(NULL, "/stratcon/iep/port", &port))
288       port = 61613;
289     if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/hostname",
290                                 hostname, sizeof(hostname)))
291       strlcpy(hostname, "127.0.0.1", sizeof(hostname));
292 #ifdef OPENWIRE
293     memset(&driver->connect_options, 0, sizeof(driver->connect_options));
294     strlcpy(driver->connect_options.hostname, hostname,
295             sizeof(driver->connect_options.hostname));
296     driver->connect_options.port = port;
297     if(amqcs_connect(&driver->connection, &driver->connect_options,
298                      driver->pool) != APR_SUCCESS) {
299       noitL(noit_error, "MQ connection failed\n");
300       return NULL;
301     }
302 #else
303     if(stomp_connect(&driver->connection, hostname, port,
304                      driver->pool)!= APR_SUCCESS) {
305       noitL(noit_error, "MQ connection failed\n");
306       return NULL;
307     }
308
309     {
310       stomp_frame frame;
311       frame.command = "CONNECT";
312       frame.headers = apr_hash_make(driver->pool);
313 /*
314       We don't use login/pass
315       apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, "");
316       apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, "");
317 */
318       frame.body = NULL;
319       frame.body_length = -1;
320       rc = stomp_write(driver->connection, &frame, driver->pool);
321       if(rc != APR_SUCCESS) {
322         noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc);
323         stomp_disconnect(&driver->connection);
324         return NULL;
325       }
326     } 
327     {
328       stomp_frame *frame;
329       rc = stomp_read(driver->connection, &frame, driver->pool);
330       if (rc != APR_SUCCESS) {
331         noitL(noit_error, "MQ STOMP CONNECT bad response, %d\n", rc);
332         stomp_disconnect(&driver->connection);
333         return NULL;
334       }
335       noitL(noit_error, "Response: %s, %s\n", frame->command, frame->body);
336      }
337 #endif
338      stratcon_datastore_iep_check_preload();
339      stratcon_iep_submit_queries();
340   }
341
342   return driver;
343 }
344
345 static int
346 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
347                        struct timeval *now) {
348   float age;
349   struct iep_job_closure *job = closure;
350   /* We only play when it is an asynch event */
351   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
352
353   if(mask & EVENTER_ASYNCH_CLEANUP) {
354     /* free all the memory associated with the batch */
355     if(job) {
356       if(job->line) free(job->line);
357       if(job->remote) free(job->remote);
358       if(job->doc_str) free(job->doc_str);
359       if(job->doc) xmlFreeDoc(job->doc);
360       if(job->pool) apr_pool_destroy(job->pool);
361       free(job);
362     }
363     return 0;
364   }
365
366   if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) {
367     noitL(noit_debug, "Skipping old event %f second old.\n", age);
368     return 0;
369   }
370   job->doc = stratcon_iep_doc_from_line(job->line, job->remote);
371   if(job->doc) {
372     job->doc_str = noit_xmlSaveToBuffer(job->doc);
373     if(job->doc_str) {
374       /* Submit */
375       struct iep_thread_driver *driver;
376       driver = stratcon_iep_get_connection();
377       if(driver && driver->pool && driver->connection) {
378         apr_status_t rc;
379 #ifdef OPENWIRE
380         ow_ActiveMQQueue *dest;
381         ow_ActiveMQTextMessage *message;
382
383         apr_pool_create(&job->pool, driver->pool);
384         message = ow_ActiveMQTextMessage_create(job->pool);
385         message->content =
386           ow_byte_array_create_with_data(job->pool,strlen(job->doc_str),
387                                          job->doc_str);
388         dest = ow_ActiveMQQueue_create(job->pool);
389         dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");         
390         rc = amqcs_send(driver->connection,
391                         (ow_ActiveMQDestination*)dest,
392                         (ow_ActiveMQMessage*)message,
393                         1,4,0,job->pool);
394         if(rc != APR_SUCCESS) {
395           noitL(noit_error, "MQ send failed, disconnecting\n");
396           if(driver->connection) amqcs_disconnect(&driver->connection);
397           driver->connection = NULL;
398         }
399 #else
400         stomp_frame out;
401
402         apr_pool_create(&job->pool, driver->pool);
403
404         out.command = "SEND";
405         out.headers = apr_hash_make(job->pool);
406         apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose");
407         apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto");
408      
409         out.body_length = -1;
410         out.body = job->doc_str;
411         rc = stomp_write(driver->connection, &out, job->pool);
412         if(rc != APR_SUCCESS) {
413           noitL(noit_error, "STOMP send failed, disconnecting\n");
414           if(driver->connection) stomp_disconnect(&driver->connection);
415           driver->connection = NULL;
416         }
417 #endif
418       }
419       else {
420         noitL(noit_error, "Not submitting event, no MQ\n");
421       }
422     }
423   }
424   else {
425     noitL(noit_error, "no iep handler for: '%s'\n", job->line);
426   }
427   return 0;
428 }
429
430 void
431 stratcon_iep_line_processor(stratcon_datastore_op_t op,
432                             struct sockaddr *remote, void *operand) {
433   int len;
434   char remote_str[128];
435   struct iep_job_closure *jc;
436   eventer_t newe;
437   struct timeval __now, iep_timeout = { 20L, 0L };
438   /* We only care about inserts */
439
440   if(op == DS_OP_CHKPT) {
441     eventer_add((eventer_t) operand);
442     return;
443   }
444   if(op != DS_OP_INSERT) return;
445
446   snprintf(remote_str, sizeof(remote_str), "%s", "0.0.0.0");
447   if(remote) {
448     switch(remote->sa_family) {
449       case AF_INET:
450         len = sizeof(struct sockaddr_in);
451         inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr,
452                   remote_str, len);
453         break;
454       case AF_INET6:
455        len = sizeof(struct sockaddr_in6);
456         inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr,
457                   remote_str, len);
458        break;
459       case AF_UNIX:
460         len = SUN_LEN(((struct sockaddr_un *)remote));
461         snprintf(remote_str, sizeof(remote_str), "%s", ((struct sockaddr_un *)remote)->sun_path);
462         break;
463     }
464   }
465
466   /* process operand and push onto queue */
467   gettimeofday(&__now, NULL);
468   newe = eventer_alloc();
469   newe->mask = EVENTER_ASYNCH;
470   add_timeval(__now, iep_timeout, &newe->whence);
471   newe->callback = stratcon_iep_submitter;
472   jc = calloc(1, sizeof(*jc));
473   jc->line = strdup(operand);
474   jc->remote = strdup(remote_str);
475   newe->closure = jc;
476
477   eventer_add_asynch(&iep_jobq, newe);
478 }
479
480 static void connection_destroy(void *vd) {
481   struct iep_thread_driver *driver = vd;
482 #ifdef OPENWIRE
483   if(driver->connection) amqcs_disconnect(&driver->connection);
484 #else
485   if(driver->connection) stomp_disconnect(&driver->connection);
486 #endif
487   if(driver->pool) apr_pool_destroy(driver->pool);
488   free(driver);
489 }
490
491 jlog_streamer_ctx_t *
492 stratcon_jlog_streamer_iep_ctx_alloc(void) {
493   jlog_streamer_ctx_t *ctx;
494   ctx = stratcon_jlog_streamer_ctx_alloc();
495   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED);
496   ctx->push = stratcon_iep_line_processor;
497   return ctx;
498 }
499
500 struct iep_daemon_info {
501   pid_t child;
502   int stdin_pipe[2];
503   int stderr_pipe[2];
504   char *directory;
505   char *command;
506 };
507
508 static void
509 iep_daemon_info_free(struct iep_daemon_info *info) {
510   if(!info) return;
511   if(info->directory) free(info->directory);
512   if(info->command) free(info->command);
513   if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]);
514   if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]);
515   if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]);
516   if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]);
517   free(info);
518 }
519
520 static int
521 stratcon_iep_err_handler(eventer_t e, int mask, void *closure,
522                          struct timeval *now) {
523   int len, newmask;
524   char buff[4096];
525   struct iep_daemon_info *info = (struct iep_daemon_info *)closure;
526
527   if(mask & EVENTER_EXCEPTION) {
528     int rv;
529    read_error:
530     kill(SIGKILL, info->child);
531     if(waitpid(info->child, &rv, 0) != info->child) {
532       noitL(noit_error, "Failed to reap IEP daemon\n");
533       exit(-1);
534     }
535     noitL(noit_error, "IEP daemon is done, starting a new one\n");
536     start_iep_daemon();
537     eventer_remove_fd(e->fd);
538     e->opset->close(e->fd, &newmask, e);
539     return 0;
540   }
541   while(1) {
542     len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e);
543     if(len == -1 && (errno == EAGAIN || errno == EINTR))
544       return newmask | EVENTER_EXCEPTION;
545     if(len <= 0) goto read_error;
546     assert(len < sizeof(buff));
547     buff[len] = '\0';
548     noitL(noit_error, "IEP: %s", buff);
549   }
550 }
551
552 static void
553 start_iep_daemon() {
554   eventer_t newe;
555   struct iep_daemon_info *info;
556
557   info = calloc(1, sizeof(*info));
558   info->stdin_pipe[0] = info->stdin_pipe[1] = -1;
559   info->stderr_pipe[0] = info->stderr_pipe[1] = -1;
560
561   if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@directory",
562                            &info->directory))
563     info->directory = strdup(".");
564   if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@command",
565                            &info->command)) {
566     noitL(noit_error, "No IEP start command provided.  You're on your own.\n");
567     goto bail;
568   }
569   if(pipe(info->stdin_pipe) != 0 ||
570      pipe(info->stderr_pipe) != 0) {
571     noitL(noit_error, "pipe: %s\n", strerror(errno));
572     goto bail;
573   }
574   info->child = fork();
575   if(info->child == -1) {
576     noitL(noit_error, "fork: %s\n", strerror(errno));
577     goto bail;
578   }
579   if(info->child == 0) {
580     char *argv[2] = { "run-iep", NULL };
581     int stdout_fileno;
582
583     if(chdir(info->directory) != 0) {
584       noitL(noit_error, "Starting IEP daemon, chdir failed: %s\n",
585             strerror(errno));
586       exit(-1);
587     }
588
589     close(info->stdin_pipe[1]);
590     close(info->stderr_pipe[0]);
591     dup2(info->stdin_pipe[0], 0);
592     dup2(info->stderr_pipe[1], 2);
593     stdout_fileno = open("/dev/null", O_WRONLY);
594     dup2(stdout_fileno, 1);
595
596     exit(execv(info->command, argv));
597   }
598   /* in the parent */
599   socklen_t on = 1;
600
601   close(info->stdin_pipe[0]);
602   info->stdin_pipe[0] = -1;
603   close(info->stderr_pipe[1]);
604   info->stderr_pipe[1] = -1;
605   if(ioctl(info->stderr_pipe[0], FIONBIO, &on)) {
606     goto bail;
607   }
608
609   newe = eventer_alloc();
610   newe->fd = info->stderr_pipe[0];
611   newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
612   newe->callback = stratcon_iep_err_handler;
613   newe->closure = info;
614   eventer_add(newe);
615   info = NULL;
616
617   return;
618
619  bail:
620   if(info) {
621     iep_daemon_info_free(info);
622   }
623   noitL(noit_error, "Failed to start IEP daemon\n");
624   exit(-1);
625   return;
626 }
627
628 void
629 stratcon_iep_init() {
630   noit_boolean disabled = noit_false;
631   apr_initialize();
632   atexit(apr_terminate);   
633
634   if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) &&
635      disabled == noit_true) {
636     noitL(noit_error, "IEP system is disabled!\n");
637     return;
638   }
639
640   start_iep_daemon();
641
642   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
643   pthread_key_create(&iep_connection, connection_destroy);
644
645   /* start up a thread pool of one */
646   memset(&iep_jobq, 0, sizeof(iep_jobq));
647   eventer_jobq_init(&iep_jobq, "iep_submitter");
648   iep_jobq.backq = eventer_default_backq();
649   eventer_jobq_increase_concurrency(&iep_jobq);
650
651   /* setup our live jlog stream */
652   stratcon_streamer_connection(NULL, NULL,
653                                stratcon_jlog_recv_handler,
654                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
655                                NULL,
656                                jlog_streamer_ctx_free);
657 }
658
Note: See TracBrowser for help on using the browser.