root/src/stratcon_iep.c

Revision a9077178423e39a94a9b624e44cd4b37899d6fd3, 16.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "utils/noit_b64.h"
10 #include "noit_jlog_listener.h"
11 #include "stratcon_jlog_streamer.h"
12 #include "stratcon_datastore.h"
13 #include "noit_conf.h"
14 #include "noit_check.h"
15
16 #include <unistd.h>
17 #include <sys/fcntl.h>
18 #include <assert.h>
19 #include <libxml/parser.h>
20 #include <libxml/tree.h>
21 #include <libxml/xmlsave.h>
22 #ifdef OPENWIRE
23 #include "amqcs.h"
24 #else
25 #include "stomp/stomp.h"
26 #endif
27
28 eventer_jobq_t iep_jobq;
29
30 struct iep_thread_driver {
31 #ifdef OPENWIRE
32   amqcs_connect_options connect_options;
33   amqcs_connection *connection;
34 #else
35   stomp_connection *connection;
36 #endif
37   apr_pool_t *pool;
38 };
39 pthread_key_t iep_connection;
40
41 struct iep_job_closure {
42   char *line;       /* This is a copy and gets trashed during processing */
43   xmlDocPtr doc;
44   char *doc_str;
45   apr_pool_t *pool;
46 };
47
48 static void
49 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op,
50                                 struct sockaddr *remote, void *operand);
51 static void
52 start_iep_daemon();
53
54 static int
55 bust_to_parts(char *in, char **p, int len) {
56   int cnt = 0;
57   char *s = in;
58   while(cnt < len) {
59     p[cnt++] = s;
60     while(*s && *s != '\t') s++;
61     if(!*s) break;
62     *s++ = '\0';
63   }
64   while(*s) s++; /* Move to end */
65   if(s > in && *(s-1) == '\n') *(s-1) = '\0'; /* chomp */
66   return cnt;
67 }
68
69 #define ADDCHILD(a,b) \
70   xmlNewTextChild(root, NULL, (xmlChar *)(a), (xmlChar *)(b))
71 #define NEWDOC(xmldoc,n,stanza) do { \
72   xmlNodePtr root; \
73   xmldoc = xmlNewDoc((xmlChar *)"1.0"); \
74   root = xmlNewDocNode(xmldoc, NULL, (xmlChar *)(n), NULL); \
75   xmlDocSetRootElement(xmldoc, root); \
76   stanza \
77 } while(0)
78
79
80 static xmlDocPtr
81 stratcon_iep_doc_from_status(char *data) {
82   xmlDocPtr doc;
83   char *parts[7];
84   if(bust_to_parts(data, parts, 7) != 7) return NULL;
85   /* 'S' TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE */
86   NEWDOC(doc, "NoitStatus",
87          {
88            ADDCHILD("id", parts[2]);
89            ADDCHILD("state", parts[3]);
90            ADDCHILD("availability", parts[4]);
91            ADDCHILD("duration", parts[5]);
92            ADDCHILD("status", parts[6]);
93          });
94   return doc;
95 }
96
97 static xmlDocPtr
98 stratcon_iep_doc_from_metric(char *data) {
99   xmlDocPtr doc;
100   char *parts[6];
101   const char *rootname = "NoitMetricNumeric";
102   const char *valuename = "value";
103   if(bust_to_parts(data, parts, 6) != 6) return NULL;
104   /*  'M' TIMESTAMP UUID NAME TYPE VALUE */
105
106   if(*parts[4] == METRIC_STRING) {
107     rootname = "NoitMetricText";
108     valuename = "message";
109   }
110   NEWDOC(doc, rootname,
111          {
112            ADDCHILD("id", parts[2]);
113            ADDCHILD("name", parts[3]);
114            ADDCHILD(valuename, parts[5]);
115          });
116   return doc;
117 }
118
119 static xmlDocPtr
120 stratcon_iep_doc_from_query(char *data) {
121   xmlDocPtr doc;
122   char *parts[4];
123   if(bust_to_parts(data, parts, 4) != 4) return NULL;
124   /*  'Q' ID NAME QUERY  */
125
126   NEWDOC(doc, "StratconQuery",
127          {
128            ADDCHILD("id", parts[1]);
129            ADDCHILD("name", parts[2]);
130            ADDCHILD("expression", parts[3]);
131          });
132   return doc;
133 }
134
135 static xmlDocPtr
136 stratcon_iep_doc_from_querystop(char *data) {
137   xmlDocPtr doc;
138   char *parts[2];
139   if(bust_to_parts(data, parts, 2) != 2) return NULL;
140   /*  'Q' ID */
141
142   NEWDOC(doc, "StratconQueryStop",
143          {
144            xmlNodeSetContent(root, (xmlChar *)parts[1]);
145          });
146   return doc;
147 }
148
149 static xmlDocPtr
150 stratcon_iep_doc_from_line(char *data) {
151   if(data) {
152     switch(*data) {
153       case 'S': return stratcon_iep_doc_from_status(data);
154       case 'M': return stratcon_iep_doc_from_metric(data);
155       case 'Q': return stratcon_iep_doc_from_query(data);
156       case 'q': return stratcon_iep_doc_from_querystop(data);
157     }
158   }
159   return NULL;
160 }
161
162 static float
163 stratcon_iep_age_from_line(char *data, struct timeval now) {
164   float n, t;
165   if(data && (*data == 'S' || *data == 'M')) {
166     if(data[1] != '\t') return 0;
167     t = strtof(data + 2, NULL);
168     n = (float)now.tv_sec + (float)now.tv_usec / 1000000.0;
169     return n - t;
170   }
171   return 0;
172 }
173
174 void stratcon_iep_submit_queries() {
175   int i, cnt = 0;
176   noit_conf_section_t *query_configs;
177   char path[256];
178
179   snprintf(path, sizeof(path), "/stratcon/iep/queries//query");
180   query_configs = noit_conf_get_sections(NULL, path, &cnt);
181   noitL(noit_debug, "Found %d %s stanzas\n", cnt, path);
182   for(i=0; i<cnt; i++) {
183     char id[UUID_STR_LEN];
184     char topic[256];
185     char *query;
186     char *line;
187     int line_len;
188
189     if(!noit_conf_get_stringbuf(query_configs[i],
190                                 "self::node()/@id",
191                                 id, sizeof(id))) {
192       noitL(noit_error, "No uuid specified in query\n");
193       continue;
194     }
195     if(!noit_conf_get_stringbuf(query_configs[i],
196                                 "ancestor-or-self::node()/@topic",
197                                 topic, sizeof(topic))) {
198       noitL(noit_error, "No topic specified in query\n");
199       continue;
200     }
201     if(!noit_conf_get_string(query_configs[i], "self::node()",
202                              &query)) {
203       noitL(noit_error, "No contents specified in query\n");
204       continue;
205     }
206     line_len = 4 /* 3 tabs + \0 */ +
207                1 /* 'Q' */ + 1 /* '\n' */ +
208                strlen(id) + strlen(topic) + strlen(query);
209     line = malloc(line_len);
210     snprintf(line, line_len, "Q\t%s\t%s\t%s\n", id, topic, query);
211     free(query);
212     query = line;
213     while(query[0] && query[1]) {
214       if(*query == '\n') *query = ' ';
215       query++;
216     }
217     stratcon_iep_datastore_onlooker(DS_OP_INSERT, NULL, line);
218     free(line);
219   }
220 }
221
222 static char *
223 stratcon__xml_doc_to_str(xmlDocPtr doc) {
224   char *rv;
225   xmlSaveCtxtPtr savectx;
226   xmlBufferPtr xmlbuffer;
227   xmlbuffer = xmlBufferCreate();
228   savectx = xmlSaveToBuffer(xmlbuffer, "utf8", 1);
229   xmlSaveDoc(savectx, doc);
230   xmlSaveClose(savectx);
231   rv = strdup((const char *)xmlBufferContent(xmlbuffer));
232   xmlBufferFree(xmlbuffer);
233   return rv;
234 }
235
236 static
237 struct iep_thread_driver *stratcon_iep_get_connection() {
238   apr_status_t rc;
239   struct iep_thread_driver *driver;
240   driver = pthread_getspecific(iep_connection);
241   if(!driver) {
242     driver = calloc(1, sizeof(*driver));
243     pthread_setspecific(iep_connection, driver);
244   }
245
246   if(!driver->pool) {
247     if(apr_pool_create(&driver->pool, NULL) != APR_SUCCESS) return NULL;
248   }
249
250   if(!driver->connection) {
251     int port;
252     char hostname[128];
253     if(!noit_conf_get_int(NULL, "/stratcon/iep/port", &port))
254       port = 61613;
255     if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/hostname",
256                                 hostname, sizeof(hostname)))
257       strlcpy(hostname, "127.0.0.1", sizeof(hostname));
258 #ifdef OPENWIRE
259     memset(&driver->connect_options, 0, sizeof(driver->connect_options));
260     strlcpy(driver->connect_options.hostname, hostname,
261             sizeof(driver->connect_options.hostname));
262     driver->connect_options.port = port;
263     if(amqcs_connect(&driver->connection, &driver->connect_options,
264                      driver->pool) != APR_SUCCESS) {
265       noitL(noit_error, "MQ connection failed\n");
266       return NULL;
267     }
268 #else
269     if(stomp_connect(&driver->connection, hostname, port,
270                      driver->pool)!= APR_SUCCESS) {
271       noitL(noit_error, "MQ connection failed\n");
272       return NULL;
273     }
274
275     {
276       stomp_frame frame;
277       frame.command = "CONNECT";
278       frame.headers = apr_hash_make(driver->pool);
279 /*
280       We don't use login/pass
281       apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, "");
282       apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, "");
283 */
284       frame.body = NULL;
285       frame.body_length = -1;
286       rc = stomp_write(driver->connection, &frame, driver->pool);
287       if(rc != APR_SUCCESS) {
288         noitL(noit_error, "MQ STOMP CONNECT failed, %d\n", rc);
289         stomp_disconnect(&driver->connection);
290         return NULL;
291       }
292     } 
293     {
294       stomp_frame *frame;
295       rc = stomp_read(driver->connection, &frame, driver->pool);
296       if (rc != APR_SUCCESS) {
297         noitL(noit_error, "MQ STOMP CONNECT bad response, %d\n", rc);
298         stomp_disconnect(&driver->connection);
299         return NULL;
300       }
301       noitL(noit_error, "Response: %s, %s\n", frame->command, frame->body);
302      }
303 #endif
304      stratcon_iep_submit_queries();
305   }
306
307   return driver;
308 }
309
310 static int
311 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
312                        struct timeval *now) {
313   float age;
314   struct iep_job_closure *job = closure;
315   /* We only play when it is an asynch event */
316   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
317
318   if(mask & EVENTER_ASYNCH_CLEANUP) {
319     /* free all the memory associated with the batch */
320     if(job) {
321       if(job->line) free(job->line);
322       if(job->doc_str) free(job->doc_str);
323       if(job->doc) xmlFreeDoc(job->doc);
324       if(job->pool) apr_pool_destroy(job->pool);
325       free(job);
326     }
327     return 0;
328   }
329
330   if((age = stratcon_iep_age_from_line(job->line, *now)) > 60) {
331     noitL(noit_debug, "Skipping old event %f second old.\n", age);
332     return 0;
333   }
334   job->doc = stratcon_iep_doc_from_line(job->line);
335   if(job->doc) {
336     job->doc_str = stratcon__xml_doc_to_str(job->doc);
337     if(job->doc_str) {
338       /* Submit */
339       struct iep_thread_driver *driver;
340       driver = stratcon_iep_get_connection();
341       if(driver && driver->pool && driver->connection) {
342         apr_status_t rc;
343 #ifdef OPENWIRE
344         ow_ActiveMQQueue *dest;
345         ow_ActiveMQTextMessage *message;
346
347         apr_pool_create(&job->pool, driver->pool);
348         message = ow_ActiveMQTextMessage_create(job->pool);
349         message->content =
350           ow_byte_array_create_with_data(job->pool,strlen(job->doc_str),
351                                          job->doc_str);
352         dest = ow_ActiveMQQueue_create(job->pool);
353         dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");         
354         rc = amqcs_send(driver->connection,
355                         (ow_ActiveMQDestination*)dest,
356                         (ow_ActiveMQMessage*)message,
357                         1,4,0,job->pool);
358         if(rc != APR_SUCCESS) {
359           noitL(noit_error, "MQ send failed, disconnecting\n");
360           if(driver->connection) amqcs_disconnect(&driver->connection);
361           driver->connection = NULL;
362         }
363 #else
364         stomp_frame out;
365
366         apr_pool_create(&job->pool, driver->pool);
367
368         out.command = "SEND";
369         out.headers = apr_hash_make(job->pool);
370         apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose");
371         apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto");
372      
373         out.body_length = -1;
374         out.body = job->doc_str;
375         rc = stomp_write(driver->connection, &out, job->pool);
376         if(rc != APR_SUCCESS) {
377           noitL(noit_error, "STOMP send failed, disconnecting\n");
378           if(driver->connection) stomp_disconnect(&driver->connection);
379           driver->connection = NULL;
380         }
381 #endif
382       }
383       else {
384         noitL(noit_error, "Not submitting event, no MQ\n");
385       }
386     }
387   }
388   else {
389     noitL(noit_error, "no iep handler for: '%s'\n", job->line);
390   }
391   return 0;
392 }
393
394 static void
395 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op,
396                                 struct sockaddr *remote, void *operand) {
397   struct iep_job_closure *jc;
398   eventer_t newe;
399   struct timeval __now, iep_timeout = { 20L, 0L };
400   /* We only care about inserts */
401
402   if(op == DS_OP_CHKPT) {
403     eventer_add((eventer_t) operand);
404     return;
405   }
406   if(op != DS_OP_INSERT) return;
407
408   /* process operand and push onto queue */
409   gettimeofday(&__now, NULL);
410   newe = eventer_alloc();
411   newe->mask = EVENTER_ASYNCH;
412   add_timeval(__now, iep_timeout, &newe->whence);
413   newe->callback = stratcon_iep_submitter;
414   jc = calloc(1, sizeof(*jc));
415   jc->line = strdup(operand);
416   newe->closure = jc;
417
418   eventer_add_asynch(&iep_jobq, newe);
419 }
420
421 static void connection_destroy(void *vd) {
422   struct iep_thread_driver *driver = vd;
423 #ifdef OPENWIRE
424   if(driver->connection) amqcs_disconnect(&driver->connection);
425 #else
426   if(driver->connection) stomp_disconnect(&driver->connection);
427 #endif
428   if(driver->pool) apr_pool_destroy(driver->pool);
429   free(driver);
430 }
431
432 jlog_streamer_ctx_t *
433 stratcon_jlog_streamer_iep_ctx_alloc(void) {
434   jlog_streamer_ctx_t *ctx;
435   ctx = stratcon_jlog_streamer_ctx_alloc();
436   ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED);
437   ctx->push = stratcon_iep_datastore_onlooker;
438   return ctx;
439 }
440
441 struct iep_daemon_info {
442   pid_t child;
443   int stdin_pipe[2];
444   int stderr_pipe[2];
445   char *directory;
446   char *command;
447 };
448
449 static void
450 iep_daemon_info_free(struct iep_daemon_info *info) {
451   if(!info) return;
452   if(info->directory) free(info->directory);
453   if(info->command) free(info->command);
454   if(info->stdin_pipe[0] >= 0) close(info->stdin_pipe[0]);
455   if(info->stdin_pipe[1] >= 0) close(info->stdin_pipe[1]);
456   if(info->stderr_pipe[0] >= 0) close(info->stderr_pipe[0]);
457   if(info->stderr_pipe[1] >= 0) close(info->stderr_pipe[1]);
458   free(info);
459 }
460
461 static int
462 stratcon_iep_err_handler(eventer_t e, int mask, void *closure,
463                          struct timeval *now) {
464   int len, newmask;
465   char buff[4096];
466   struct iep_daemon_info *info = (struct iep_daemon_info *)closure;
467
468   if(mask & EVENTER_EXCEPTION) {
469     int rv;
470    read_error:
471     kill(SIGKILL, info->child);
472     if(waitpid(info->child, &rv, 0) != info->child) {
473       noitL(noit_error, "Failed to reap IEP daemon\n");
474       exit(-1);
475     }
476     noitL(noit_error, "IEP daemon is done, starting a new one\n");
477     start_iep_daemon();
478     eventer_remove_fd(e->fd);
479     e->opset->close(e->fd, &newmask, e);
480     return 0;
481   }
482   while(1) {
483     len = e->opset->read(e->fd, buff, sizeof(buff)-1, &newmask, e);
484     if(len == -1 && (errno == EAGAIN || errno == EINTR))
485       return newmask | EVENTER_EXCEPTION;
486     if(len <= 0) goto read_error;
487     assert(len < sizeof(buff));
488     buff[len] = '\0';
489     noitL(noit_error, "IEP: %s", buff);
490   }
491 }
492
493 static void
494 start_iep_daemon() {
495   eventer_t newe;
496   struct iep_daemon_info *info;
497
498   info = calloc(1, sizeof(*info));
499   info->stdin_pipe[0] = info->stdin_pipe[1] = -1;
500   info->stderr_pipe[0] = info->stderr_pipe[1] = -1;
501
502   if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@directory",
503                            &info->directory))
504     info->directory = strdup(".");
505   if(!noit_conf_get_string(NULL, "/stratcon/iep/start/@command",
506                            &info->command)) {
507     noitL(noit_error, "No IEP start command provided.  You're on your own.\n");
508     goto bail;
509   }
510   if(pipe(info->stdin_pipe) != 0 ||
511      pipe(info->stderr_pipe) != 0) {
512     noitL(noit_error, "pipe: %s\n", strerror(errno));
513     goto bail;
514   }
515   info->child = fork();
516   if(info->child == -1) {
517     noitL(noit_error, "fork: %s\n", strerror(errno));
518     goto bail;
519   }
520   if(info->child == 0) {
521     char *argv[2] = { "run-iep", NULL };
522     int stdout_fileno;
523
524     if(chdir(info->directory) != 0) {
525       noitL(noit_error, "Starting IEP daemon, chdir failed: %s\n",
526             strerror(errno));
527       exit(-1);
528     }
529
530     close(info->stdin_pipe[1]);
531     close(info->stderr_pipe[0]);
532     dup2(info->stdin_pipe[0], 0);
533     dup2(info->stderr_pipe[1], 2);
534     stdout_fileno = open("/dev/null", O_WRONLY);
535     dup2(stdout_fileno, 1);
536
537     exit(execv(info->command, argv));
538   }
539   /* in the parent */
540   socklen_t on = 1;
541
542   close(info->stdin_pipe[0]);
543   info->stdin_pipe[0] = -1;
544   close(info->stderr_pipe[1]);
545   info->stderr_pipe[1] = -1;
546   if(ioctl(info->stderr_pipe[0], FIONBIO, &on)) {
547     goto bail;
548   }
549
550   newe = eventer_alloc();
551   newe->fd = info->stderr_pipe[0];
552   newe->mask = EVENTER_READ | EVENTER_EXCEPTION;
553   newe->callback = stratcon_iep_err_handler;
554   newe->closure = info;
555   eventer_add(newe);
556   info = NULL;
557
558   return;
559
560  bail:
561   if(info) {
562     iep_daemon_info_free(info);
563   }
564   noitL(noit_error, "Failed to start IEP daemon\n");
565   exit(-1);
566   return;
567 }
568
569 void
570 stratcon_iep_init() {
571   noit_boolean disabled = noit_false;
572   apr_initialize();
573   atexit(apr_terminate);   
574
575   if(noit_conf_get_boolean(NULL, "/stratcon/iep/@disabled", &disabled) &&
576      disabled == noit_true) {
577     noitL(noit_error, "IEP system is disabled!\n");
578     return;
579   }
580
581   start_iep_daemon();
582
583   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
584   pthread_key_create(&iep_connection, connection_destroy);
585
586   /* start up a thread pool of one */
587   memset(&iep_jobq, 0, sizeof(iep_jobq));
588   eventer_jobq_init(&iep_jobq, "iep_submitter");
589   iep_jobq.backq = eventer_default_backq();
590   eventer_jobq_increase_concurrency(&iep_jobq);
591
592   /* setup our live jlog stream */
593   stratcon_streamer_connection(NULL, NULL,
594                                stratcon_jlog_recv_handler,
595                                (void *(*)())stratcon_jlog_streamer_iep_ctx_alloc,
596                                NULL,
597                                jlog_streamer_ctx_free);
598 }
599
Note: See TracBrowser for help on using the browser.