Changeset 0335d9d261b5f2b2c4d7a1d1e1a8ebc851b17ffa
- Timestamp:
- 09/24/09 23:50:27 (4 years ago)
- git-parent:
- Files:
-
- src/java/Makefile.in (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/EventHandler.java (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/IEPEngine.java (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/StratconConfig.java (modified) (1 diff)
- src/java/com/omniti/reconnoiter/StratconMessage.java (modified) (1 diff)
- src/java/com/omniti/reconnoiter/broker/AMQBroker.java (modified) (3 diffs)
- src/java/com/omniti/reconnoiter/broker/RabbitBroker.java (modified) (3 diffs)
- src/java/com/omniti/reconnoiter/broker/RabbitListener.java (modified) (4 diffs)
- src/java/com/omniti/reconnoiter/event/NoitCheck.java (added)
- src/java/com/omniti/reconnoiter/event/NoitEvent.java (deleted)
- src/java/com/omniti/reconnoiter/event/NoitMetricNumeric.java (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/event/NoitMetricText.java (added)
- src/java/com/omniti/reconnoiter/event/NoitStatus.java (added)
- src/java/com/omniti/reconnoiter/event/StratconQuery.java (modified) (3 diffs)
- src/java/com/omniti/reconnoiter/event/StratconQueryBase.java (modified) (1 diff)
- src/java/com/omniti/reconnoiter/event/StratconQueryStop.java (modified) (1 diff)
- src/java/com/omniti/reconnoiter/event/StratconStatement.java (modified) (2 diffs)
- src/java/run-iep.sh.in (modified) (1 diff)
- src/stratcon_iep.c (modified) (8 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/java/Makefile.in
re591eb5 r0335d9d 30 30 com/omniti/reconnoiter/broker/AMQListener.java \ 31 31 com/omniti/reconnoiter/broker/BrokerFactory.java \ 32 com/omniti/reconnoiter/event/NoitEvent.java \ 32 com/omniti/reconnoiter/event/NoitCheck.java \ 33 com/omniti/reconnoiter/event/NoitStatus.java \ 34 com/omniti/reconnoiter/event/NoitMetricText.java \ 33 35 com/omniti/reconnoiter/event/NoitMetricNumeric.java \ 34 36 com/omniti/reconnoiter/event/StratconStatement.java \ … … 40 42 com/omniti/reconnoiter/StratconMessage.java 41 43 42 SUPPORT=lib/activemq-all-5.2.0.jar lib/antlr-runtime-3.1.1.jar lib/esper-3. 1.0.jar \44 SUPPORT=lib/activemq-all-5.2.0.jar lib/antlr-runtime-3.1.1.jar lib/esper-3.2.0.jar \ 43 45 lib/log4j-1.2.15.jar lib/spring-beans-2.5.5.jar lib/spring-context-2.5.5.jar \ 44 46 lib/cglib-nodep-2.2.jar lib/commons-pool-1.4.jar lib/commons-dbcp-1.2.2.jar \ src/java/com/omniti/reconnoiter/EventHandler.java
r699c97c r0335d9d 8 8 import com.espertech.esper.client.UpdateListener; 9 9 import com.omniti.reconnoiter.broker.IMQBroker; 10 import com.omniti.reconnoiter.event.NoitEvent; 11 import com.omniti.reconnoiter.event.NoitMetricNumeric; 12 import com.omniti.reconnoiter.event.StratconQueryBase; 13 import com.omniti.reconnoiter.event.StratconStatement; 14 import com.omniti.reconnoiter.event.StratconQuery; 15 import com.omniti.reconnoiter.event.StratconQueryStop; 10 import com.omniti.reconnoiter.event.*; 16 11 17 12 public class EventHandler { … … 64 59 } 65 60 } 66 else if(m instanceof NoitEvent) {67 epService.getEPRuntime().sendEvent( ((NoitEvent) m).getDocument());61 else if(m instanceof NoitMetricText) { 62 epService.getEPRuntime().sendEvent(m); 68 63 } 69 64 else if(m instanceof NoitMetricNumeric) { 70 epService.getEPRuntime().sendEvent((NoitMetricNumeric) m); 65 epService.getEPRuntime().sendEvent(m); 66 } 67 else if(m instanceof NoitCheck) { 68 epService.getEPRuntime().sendEvent(m); 69 } 70 else if(m instanceof NoitStatus) { 71 epService.getEPRuntime().sendEvent(m); 71 72 } 72 73 } src/java/com/omniti/reconnoiter/IEPEngine.java
r699c97c r0335d9d 14 14 import com.omniti.reconnoiter.MQListener; 15 15 import com.omniti.reconnoiter.broker.BrokerFactory; 16 import com.omniti.reconnoiter.event.NoitEvent;17 16 import com.omniti.reconnoiter.StratconConfig; 18 17 import com.espertech.esper.client.*; … … 32 31 config.addEventTypeAutoName("com.omniti.reconnoiter.event"); 33 32 EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); 34 NoitEvent.registerTypes(epService);35 33 36 34 MQListener l = new MQListener(epService, BrokerFactory.getBroker(sconf)); src/java/com/omniti/reconnoiter/StratconConfig.java
r4c3fb9b r0335d9d 51 51 catch (SAXException e) { 52 52 System.err.println("Bad XML: " + e.getMessage()); 53 return;54 53 } 55 54 catch (IOException e) { 56 55 System.out.println("Bad file: " + e.getMessage()); 57 return;58 56 } 59 57 } src/java/com/omniti/reconnoiter/StratconMessage.java
r99a21c2 r0335d9d 20 20 21 21 public class StratconMessage { 22 private static DocumentBuilderFactory factory = null;23 private static DocumentBuilder parser = null;24 22 25 static { 26 if(factory == null) { 27 factory = DocumentBuilderFactory.newInstance(); 28 factory.setIgnoringComments(true); 29 factory.setCoalescing(true); // Convert CDATA to Text nodes 30 factory.setNamespaceAware(false); // No namespaces: this is default 31 factory.setValidating(false); // Don't validate DTD: also default 23 // This is the text type in the noit_log.h 24 public final static String METRIC_STRING = "s"; 25 26 public static String[] parseToArray(String jlog, int num) { 27 // Get rid of the null parameter 28 return jlog.substring(0, jlog.length()-1).split("[\t]", num); 29 } 30 31 protected String getPrefix() { 32 return null; 33 } 34 35 protected int getLength() { 36 return -1; 37 } 38 39 // Check and make sure 40 public StratconMessage(String[] parts) throws Exception { 41 if (!parts[0].equals(this.getPrefix())) { 42 throw new Exception("Incorrect state prefix:" + getPrefix() + " not applicable for " + getClass()); 32 43 } 33 if(parser == null) { 34 try { 35 parser = factory.newDocumentBuilder(); 36 } catch(Exception e) { } 44 if (parts.length != getLength()) { 45 throw new Exception("Incorrect amount of parts parsed, tossing message."); 37 46 } 38 47 } 39 48 40 public static StratconMessage makeMessage(String xml) { 41 InputSource source = new InputSource(new StringReader(xml)); 42 49 public static StratconMessage makeMessage(String jlog) { 50 String[] parts; 51 // The numbers of the parse are pulled from stratcon and 52 // +1 for the extra remote 43 53 try { 44 Document document = parser.parse(source); 45 Element e = document.getDocumentElement(); 46 String tag = e.getTagName(); 47 // We have events 48 if(tag.equals("NoitStatus") || 49 tag.equals("NoitMetricText") || 50 tag.equals("NoitCheck")) 51 return new NoitEvent(document); 52 else if(tag.equals("NoitMetricNumeric")) { 53 // Numerics have a value that can be in scientific notation. 54 // This document gets passed places that do Xpath 1.0 queries 55 // which don't understand scientific notation... we have to hack it. 56 return new NoitMetricNumeric(document); 54 switch (jlog.charAt(0)) { 55 case 'C': 56 parts = parseToArray(jlog, 7); 57 return new NoitCheck(parts); 58 case 'S': 59 parts = parseToArray(jlog, 8); 60 return new NoitStatus(parts); 61 case 'M': 62 parts = parseToArray(jlog, 7); 63 if (parts[5].equals(METRIC_STRING)) { 64 return new NoitMetricText(parts); 65 } else { 66 return new NoitMetricNumeric(parts); 67 } 68 case 'D': 69 parts = parseToArray(jlog, 4); 70 return new StratconStatement(parts); 71 case 'Q': 72 parts = parseToArray(jlog, 5); 73 return new StratconQuery(parts); 74 case 'q': 75 parts = parseToArray(jlog, 3); 76 return new StratconQueryStop(parts); 57 77 } 58 // and requests59 else if(tag.equals("StratconStatement"))60 return new StratconStatement(document);61 else if(tag.equals("StratconQuery"))62 return new StratconQuery(document);63 else if(tag.equals("StratconQueryStop"))64 return new StratconQueryStop(document);65 78 } 66 79 catch(Exception e) { src/java/com/omniti/reconnoiter/broker/AMQBroker.java
r4c3fb9b r0335d9d 17 17 18 18 public class AMQBroker implements IMQBroker { 19 20 private StratconConfig config;21 19 22 20 public AMQBroker(StratconConfig config) { 23 this.config = config;24 // TODO Auto-generated constructor stub25 21 } 26 22 27 private ActiveMQConnectionFactory connectionFactory;28 private Connection connection;29 private Session session;30 private Destination destination;31 23 private MessageConsumer consumer; 32 24 33 25 public void connect() { 34 26 BrokerFactory.getAMQBrokerService(); 35 connectionFactory =new ActiveMQConnectionFactory("tcp://localhost:61616");27 ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://localhost:61616"); 36 28 try { 37 connection =connectionFactory.createConnection();29 Connection connection=connectionFactory.createConnection(); 38 30 connection.start(); 39 session =connection.createSession(false, Session.AUTO_ACKNOWLEDGE);40 destination =session.createQueue("noit.firehose");31 Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 32 Destination destination=session.createQueue("noit.firehose"); 41 33 42 34 consumer = session.createConsumer(destination); … … 51 43 try { 52 44 message = consumer.receive(1000); 53 } catch(Exception e) { 54 45 } catch(Exception ignored) { 55 46 } 56 47 if (message != null && message instanceof TextMessage) { … … 69 60 return new AMQListener(epService, sq); 70 61 } 71 72 62 } src/java/com/omniti/reconnoiter/broker/RabbitBroker.java
re372885 r0335d9d 15 15 16 16 17 public class RabbitBroker implements IMQBroker , Runnable{17 public class RabbitBroker implements IMQBroker { 18 18 19 19 private Channel channel; … … 29 29 private String alertQueue; 30 30 private String alertExchangeName; 31 private StratconConfig config ;32 31 33 32 public RabbitBroker(StratconConfig config) { 34 this.config = config;35 33 this.userName = config.getBrokerParameter("username", "guest"); 36 34 this.password = config.getBrokerParameter("password", "guest"); … … 110 108 return new RabbitListener(epService, sq, channel, alertExchangeName, alertQueue); 111 109 } 112 113 public void run() {114 }115 116 110 } src/java/com/omniti/reconnoiter/broker/RabbitListener.java
ra3f2dff r0335d9d 15 15 private EPServiceProvider epService; 16 16 private StratconQuery sq; 17 private EPStatement statement;18 17 private String routingKey; 19 18 private String exchangeName; … … 25 24 this.epService = epService; 26 25 this.sq = sq; 27 this.statement =sq.getStatement();26 EPStatement statement=sq.getStatement(); 28 27 this.routingKey = routingKey + sq.getName(); 29 28 this.exchangeName = exchangeName; 30 29 this.channel = channel; 31 // TODO Document this mapping32 30 33 34 31 // Create the connection and add an exchange 35 32 channel.exchangeDeclare(exchangeName, "topic", false, false, false, null); … … 41 38 42 39 public void update(EventBean[] newEvents, EventBean[] oldEvents) { 43 // TODO Auto-generated method stub 44 System.err.println("AMQOutput -> dispatch"); 40 System.err.println("RMQOutput -> dispatch"); 45 41 for(int i = 0; i < newEvents.length; i++) { 46 42 EventBean event = newEvents[i]; … … 52 48 try { 53 49 byte[] messageBodyBytes = output.getBytes(); 54 channel.basicPublish(exchangeName, routingKey, MessageProperties. PERSISTENT_TEXT_PLAIN, messageBodyBytes);50 channel.basicPublish(exchangeName, routingKey, MessageProperties.TEXT_PLAIN, messageBodyBytes); 55 51 } catch(Exception e) { 56 52 System.err.println(e); src/java/com/omniti/reconnoiter/event/NoitMetricNumeric.java
r2220278 r0335d9d 11 11 import com.omniti.reconnoiter.StratconMessage; 12 12 13 import org.w3c.dom.Document;14 import org.w3c.dom.Element;15 import org.w3c.dom.Node;16 import org.w3c.dom.NodeList;17 import org.w3c.dom.Text;18 19 13 public class NoitMetricNumeric extends StratconMessage { 20 14 private String uuid; … … 23 17 private String noit; 24 18 25 protected String get_string(Element e, String tag) throws NoitMetricNumericException { 26 NodeList vals = e.getElementsByTagName(tag); 27 if(vals.getLength() != 1) 28 throw new NoitMetricNumericException("Bad XML: tag " + tag + " (" + vals.getLength() + ")"); 29 Node n = vals.item(0); 30 Node tn = n.getFirstChild(); 31 if(tn.getNextSibling() != null) 32 throw new NoitMetricNumericException("Bad XML: " + tag + " has siblings"); 33 if(tn.getNodeType() != Node.TEXT_NODE) 34 throw new NoitMetricNumericException("Bad XML: " + tag + " not text"); 35 Text text_node = (Text)tn; 36 return text_node.getNodeValue(); 37 } 38 protected Double get_double(Element e, String tag) throws NoitMetricNumericException { 39 Double d = null; 19 protected String getPrefix() { 20 return "M"; 21 } 22 /* 23 'M' REMOTE TIMESTAMP UUID NAME TYPE VALUE 24 */ 25 public NoitMetricNumeric(String[] parts) throws Exception { 26 super(parts); 27 noit = parts[1]; 28 uuid = parts[3]; 29 name = parts[4]; 40 30 try { 41 d = new Double(get_string(e, tag)); 42 } catch(NumberFormatException nfe) { 31 value = Double.valueOf(parts[6]); 32 } catch (NumberFormatException nfe) { 33 value = null; 43 34 } 44 return d;45 }46 35 47 public NoitMetricNumeric(Document document) throws NoitMetricNumericException { 48 Element e = document.getDocumentElement(); 49 String tag = e.getTagName(); 50 if(!tag.equals("NoitMetricNumeric")) 51 throw new NoitMetricNumericException("Bad XML"); 52 uuid = get_string(e,"id"); 53 name = get_string(e,"name"); 54 value = get_double(e, "value"); 55 noit = get_string(e,"remote"); 56 } 36 } 57 37 58 public String get uuid() { return uuid; }59 public String get name() { return name; }60 public Double get value() { return value; }61 public String get noit() { return noit; }38 public String getUuid() { return uuid; } 39 public String getName() { return name; } 40 public Double getValue() { return value; } 41 public String getNoit() { return noit; } 62 42 63 private class NoitMetricNumericException extends Exception { 64 public NoitMetricNumericException(String s) { 65 super(s); 66 } 67 } 43 protected int getLength() { 44 return 7; 45 } 68 46 } src/java/com/omniti/reconnoiter/event/StratconQuery.java
ra3f2dff r0335d9d 13 13 import com.espertech.esper.client.UpdateListener; 14 14 import java.util.UUID; 15 import javax.xml.xpath.XPath;16 import javax.xml.xpath.XPathFactory;17 import javax.xml.xpath.XPathConstants;18 import javax.xml.xpath.XPathExpressionException;19 import org.w3c.dom.Document;20 15 21 16 public class StratconQuery extends StratconQueryBase { … … 23 18 protected String name; 24 19 25 public StratconQuery(Document d) { 26 XPath xpath = XPathFactory.newInstance().newXPath(); 27 try { 28 String id = (String) xpath.evaluate("/StratconQuery/id", d, XPathConstants.STRING); 29 if(id == null) uuid = UUID.randomUUID(); 30 else uuid = UUID.fromString(id); 31 name = (String) xpath.evaluate("/StratconQuery/name", d, XPathConstants.STRING); 32 expression = (String) xpath.evaluate("/StratconQuery/expression", d, XPathConstants.STRING); 33 } 34 catch(XPathExpressionException e) { 35 } 20 protected String getPrefix() { 21 return "Q"; 22 } 23 24 /* 'Q' REMOTE ID NAME QUERY */ 25 public StratconQuery(String[] parts) throws Exception { 26 super(parts); 27 String id = parts[2]; 28 name = parts[3]; 29 expression = parts[4]; 30 if(id == null) 31 uuid = UUID.randomUUID(); 32 else 33 uuid = UUID.fromString(id); 34 36 35 if(name == null) name = "default"; 37 36 if(uuid == null) uuid = UUID.randomUUID(); … … 47 46 statement.destroy(); 48 47 } 48 49 protected int getLength() { 50 return 5; 51 } 49 52 } src/java/com/omniti/reconnoiter/event/StratconQueryBase.java
ra3f2dff r0335d9d 19 19 protected String expression; 20 20 21 public StratconQueryBase(String parts[]) throws Exception { 22 super(parts); 23 } 24 21 25 public UUID getUUID() { 22 26 return uuid; src/java/com/omniti/reconnoiter/event/StratconQueryStop.java
ra3f2dff r0335d9d 14 14 import org.w3c.dom.Element; 15 15 16 public class StratconQueryStop extends StratconMessage { 17 private UUID uuid; 16 public class StratconQueryStop extends StratconQueryBase { 18 17 19 public StratconQueryStop(Document d) { 20 Element e = d.getDocumentElement(); 21 uuid = UUID.fromString(e.getTextContent()); 18 protected String getPrefix() { 19 return "q"; 20 } 21 22 /* 'q' REMOTE ID */ 23 public StratconQueryStop(String[] parts) throws Exception { 24 super(parts); 25 uuid = UUID.fromString(parts[2]); 22 26 } 23 27 public UUID getUUID() { 24 28 return uuid; 25 29 } 30 31 protected int getLength() { 32 return 3; 33 } 26 34 27 35 } src/java/com/omniti/reconnoiter/event/StratconStatement.java
ra3f2dff r0335d9d 10 10 11 11 import com.omniti.reconnoiter.event.StratconQueryBase; 12 import com.espertech.esper.client.UpdateListener; 13 12 14 import java.util.UUID; 13 15 import javax.xml.xpath.XPath; … … 18 20 19 21 public class StratconStatement extends StratconQueryBase { 20 public StratconStatement(Document d) { 21 XPath xpath = XPathFactory.newInstance().newXPath(); 22 try { 23 String id = (String) xpath.evaluate("/StratconStatement/id", d, XPathConstants.STRING); 24 if(id == null) uuid = UUID.randomUUID(); 25 else uuid = UUID.fromString(id); 26 expression = (String) xpath.evaluate("/StratconStatement/expression", d, XPathConstants.STRING); 27 } 28 catch(XPathExpressionException e) { 29 } 30 if(uuid == null) uuid = UUID.randomUUID(); 22 23 protected String getPrefix() { 24 return "D"; 25 } 26 27 /* 'D' REMOTE ID QUERY */ 28 public StratconStatement(String[] parts) throws Exception { 29 super(parts); 30 String id = parts[2]; 31 expression = parts[3]; 32 if(id == null) 33 uuid = UUID.randomUUID(); 34 else 35 uuid = UUID.fromString(id); 36 } 37 38 protected int getLength() { 39 return 4; 31 40 } 32 41 } src/java/run-iep.sh.in
r660168e r0335d9d 2 2 3 3 JAVA=@JAVA@ 4 JPARAMS="-Xms 512m -Xmx512m -server-XX:+UseParNewGC"4 JPARAMS="-Xms32m -Xmx512m -XX:+UseConcMarkSweepGC -XX:+UseParNewGC" 5 5 JARS="reconnoiter.jar activemq-all-5.2.0.jar antlr-runtime-3.1.1.jar \ 6 6 esper-3.2.0.jar log4j-1.2.15.jar \ src/stratcon_iep.c
rb9a4230 r0335d9d 41 41 #include "noit_conf.h" 42 42 #include "noit_check.h" 43 #include "noit_xml.h"44 43 45 44 #include <sys/types.h> … … 52 51 #endif 53 52 #include <assert.h> 54 #include <libxml/parser.h>55 #include <libxml/tree.h>56 #include <libxml/xmlsave.h>57 53 #ifdef OPENWIRE 58 54 #include "amqcs.h" … … 79 75 char *line; /* This is a copy and gets trashed during processing */ 80 76 char *remote; 81 xmlDocPtr doc;82 77 char *doc_str; 83 78 apr_pool_t *pool; … … 87 82 start_iep_daemon(); 88 83 89 static int90 bust_to_parts(char *in, char **p, int len) {91 int cnt = 0;92 char *s = in;93 while(cnt < len) {94 p[cnt++] = s;95 while(*s && *s != '\t') s++;96 if(!*s) break;97 *s++ = '\0';98 }99 while(*s) s++; /* Move to end */100 if(s > in && *(s-1) == '\n') *(s-1) = '\0'; /* chomp */101 return cnt;102 }103 104 #define ADDCHILD(a,b) \105 xmlNewTextChild(root, NULL, (xmlChar *)(a), (xmlChar *)(b))106 #define NEWDOC(xmldoc,n,stanza) do { \107 xmlNodePtr root; \108 xmldoc = xmlNewDoc((xmlChar *)"1.0"); \109 root = xmlNewDocNode(xmldoc, NULL, (xmlChar *)(n), NULL); \110 xmlDocSetRootElement(xmldoc, root); \111 stanza \112 } while(0)113 114 115 static xmlDocPtr116 stratcon_iep_doc_from_status(char *data, char *remote) {117 xmlDocPtr doc;118 char *parts[7];119 if(bust_to_parts(data, parts, 7) != 7) return NULL;120 /* 'S' TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE */121 NEWDOC(doc, "NoitStatus",122 {123 ADDCHILD("remote", remote);124 ADDCHILD("id", parts[2]);125 ADDCHILD("state", parts[3]);126 ADDCHILD("availability", parts[4]);127 ADDCHILD("duration", parts[5]);128 ADDCHILD("status", parts[6]);129 });130 return doc;131 }132 133 static xmlDocPtr134 stratcon_iep_doc_from_check(char *data, char *remote) {135 xmlDocPtr doc;136 char *parts[6];137 if(bust_to_parts(data, parts, 6) != 6) return NULL;138 /* 'C' TIMESTAMP UUID TARGET MODULE NAME */139 NEWDOC(doc, "NoitCheck",140 {141 ADDCHILD("remote", remote);142 ADDCHILD("id", parts[2]);143 ADDCHILD("target", parts[3]);144 ADDCHILD("module", parts[4]);145 ADDCHILD("name", parts[5]);146 });147 return doc;148 }149 150 static xmlDocPtr151 stratcon_iep_doc_from_metric(char *data, char *remote) {152 xmlDocPtr doc;153 char *parts[6];154 const char *rootname = "NoitMetricNumeric";155 const char *valuename = "value";156 if(bust_to_parts(data, parts, 6) != 6) return NULL;157 /* 'M' TIMESTAMP UUID NAME TYPE VALUE */158 159 if(*parts[4] == METRIC_STRING) {160 rootname = "NoitMetricText";161 valuename = "message";162 }163 NEWDOC(doc, rootname,164 {165 ADDCHILD("remote", remote);166 ADDCHILD("id", parts[2]);167 ADDCHILD("name", parts[3]);168 ADDCHILD(valuename, parts[5]);169 });170 return doc;171 }172 173 static xmlDocPtr174 stratcon_iep_doc_from_statement(char *data, char *remote) {175 xmlDocPtr doc;176 char *parts[3];177 if(bust_to_parts(data, parts, 3) != 3) return NULL;178 /* 'D' ID QUERY */179 180 NEWDOC(doc, "StratconStatement",181 {182 ADDCHILD("id", parts[1]);183 ADDCHILD("expression", parts[2]);184 });185 return doc;186 }187 188 static xmlDocPtr189 stratcon_iep_doc_from_query(char *data, char *remote) {190 xmlDocPtr doc;191 char *parts[4];192 if(bust_to_parts(data, parts, 4) != 4) return NULL;193 /* 'Q' ID NAME QUERY */194 195 NEWDOC(doc, "StratconQuery",196 {197 ADDCHILD("id", parts[1]);198 ADDCHILD("name", parts[2]);199 ADDCHILD("expression", parts[3]);200 });201 return doc;202 }203 204 static xmlDocPtr205 stratcon_iep_doc_from_querystop(char *data, char *remote) {206 xmlDocPtr doc;207 char *parts[2];208 if(bust_to_parts(data, parts, 2) != 2) return NULL;209 /* 'Q' ID */210 211 NEWDOC(doc, "StratconQueryStop",212 {213 xmlNodeSetContent(root, (xmlChar *)parts[1]);214 });215 return doc;216 }217 218 static xmlDocPtr219 stratcon_iep_doc_from_line(char *data, char *remote) {220 if(data) {221 switch(*data) {222 case 'C': return stratcon_iep_doc_from_check(data, remote);223 case 'S': return stratcon_iep_doc_from_status(data, remote);224 case 'M': return stratcon_iep_doc_from_metric(data, remote);225 case 'D': return stratcon_iep_doc_from_statement(data, remote);226 case 'Q': return stratcon_iep_doc_from_query(data, remote);227 case 'q': return stratcon_iep_doc_from_querystop(data, remote);228 }229 }230 return NULL;231 }232 84 233 85 static float … … 537 389 } 538 390 539 540 541 /*542 We don't use login/pass543 apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, "");544 apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, "");545 */546 391 frame.body = NULL; 547 392 frame.body_length = -1; … … 604 449 if(job->remote) free(job->remote); 605 450 if(job->doc_str) free(job->doc_str); 606 if(job->doc) xmlFreeDoc(job->doc);607 451 if(job->pool) apr_pool_destroy(job->pool); 608 452 free(job); … … 619 463 return 0; 620 464 } 621 job->doc = stratcon_iep_doc_from_line(job->line, job->remote); 622 if(job->doc) { 623 job->doc_str = noit_xmlSaveToBuffer(job->doc); 624 if(job->doc_str) { 625 /* Submit */ 626 if(driver && driver->pool && driver->connection) { 627 apr_status_t rc; 465 /* Submit */ 466 if(driver && driver->pool && driver->connection) { 467 apr_status_t rc; 468 int line_len = strlen(job->line); 469 int remote_len = strlen(job->remote); 628 470 #ifdef OPENWIRE 629 ow_ActiveMQQueue *dest;630 ow_ActiveMQTextMessage *message;631 632 apr_pool_create(&job->pool, driver->pool);633 message = ow_ActiveMQTextMessage_create(job->pool);634 message->content =635 ow_byte_array_create_with_data(job->pool,strlen(job->doc_str),636 job->doc_str);637 dest = ow_ActiveMQQueue_create(job->pool);638 dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");639 rc = amqcs_send(driver->connection,640 (ow_ActiveMQDestination*)dest,641 (ow_ActiveMQMessage*)message,642 1,4,0,job->pool);643 if(rc != APR_SUCCESS) {644 noitL(noit_error, "MQ send failed, disconnecting\n");645 if(driver->connection) amqcs_disconnect(&driver->connection);646 driver->connection = NULL;647 }471 ow_ActiveMQQueue *dest; 472 ow_ActiveMQTextMessage *message; 473 474 apr_pool_create(&job->pool, driver->pool); 475 message = ow_ActiveMQTextMessage_create(job->pool); 476 message->content = 477 ow_byte_array_create_with_data(job->pool,strlen(job->doc_str), 478 job->doc_str); 479 dest = ow_ActiveMQQueue_create(job->pool); 480 dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE"); 481 rc = amqcs_send(driver->connection, 482 (ow_ActiveMQDestination*)dest, 483 (ow_ActiveMQMessage*)message, 484 1,4,0,job->pool); 485 if(rc != APR_SUCCESS) { 486 noitL(noit_error, "MQ send failed, disconnecting\n"); 487 if(driver->connection) amqcs_disconnect(&driver->connection); 488 driver->connection = NULL; 489 } 648 490 #else 649 stomp_frame out; 650 651 apr_pool_create(&job->pool, driver->pool); 652 653 out.command = "SEND"; 654 out.headers = apr_hash_make(job->pool); 655 if (driver->exchange) 656 apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 657 658 apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 659 apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 660 661 out.body_length = -1; 662 out.body = job->doc_str; 663 rc = stomp_write(driver->connection, &out, job->pool); 664 if(rc != APR_SUCCESS) { 665 noitL(noit_error, "STOMP send failed, disconnecting\n"); 666 if(driver->connection) stomp_disconnect(&driver->connection); 667 driver->connection = NULL; 668 } 491 stomp_frame out; 492 493 job->doc_str = (char*)calloc(line_len + 1 /* \t */ + 494 remote_len + 2, 1); 495 strncpy(job->doc_str, job->line, 2); 496 strncat(job->doc_str, job->remote, remote_len); 497 strncat(job->doc_str, "\t", 1); 498 strncat(job->doc_str, job->line + 2, line_len - 2); 499 500 apr_pool_create(&job->pool, driver->pool); 501 502 out.command = "SEND"; 503 out.headers = apr_hash_make(job->pool); 504 if (driver->exchange) 505 apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 506 507 apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 508 apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 509 510 out.body_length = -1; 511 out.body = job->doc_str; 512 rc = stomp_write(driver->connection, &out, job->pool); 513 if(rc != APR_SUCCESS) { 514 noitL(noit_error, "STOMP send failed, disconnecting\n"); 515 if(driver->connection) stomp_disconnect(&driver->connection); 516 driver->connection = NULL; 517 } 669 518 #endif 670 }671 else {672 noitL(noit_error, "Not submitting event, no MQ\n");673 }674 }675 519 } 676 520 else { 677 noitL(noit_iep, "no iep handler for: '%s'\n", job->line); 521 noitL(noit_iep, "no iep handler for: '%s'\n", job->line); 678 522 } 679 523 return 0; … … 818 662 &info->command)) { 819 663 noitL(noit_error, "No IEP start command provided. You're on your own.\n"); 820 goto bail; 664 // goto bail; 665 // If you want to start it as a seperate process 666 return; 821 667 } 822 668 if(pipe(info->stdin_pipe) != 0 ||
