Changeset 31fab7554ce65d9d48275fe94fa845910c6efd9f
- Timestamp:
- 01/26/10 04:28:32 (3 years ago)
- git-parent:
- Files:
-
- src/java/com/omniti/reconnoiter/EventHandler.java (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/IEPEngine.java (modified) (2 diffs)
- src/java/com/omniti/reconnoiter/MQListener.java (modified) (3 diffs)
- src/java/com/omniti/reconnoiter/StratconConfig.java (modified) (3 diffs)
- src/stratcon.conf.in (modified) (1 diff)
- src/stratcon_iep.c (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/java/com/omniti/reconnoiter/EventHandler.java
r54eb3fe r31fab75 47 47 } 48 48 public boolean isQueryRegistered(UUID id) { return queries.containsKey(id); } 49 49 50 public void processMessage(StratconMessage m) throws Exception { 51 m.handle(this); 52 } 53 public void processMessage(StratconMessage[] messages) throws Exception { 54 Exception last = null; 55 for ( StratconMessage m : messages ) { 56 if(m != null) try { processMessage(m); } catch (Exception e) { last = e; } 57 } 58 if(last != null) throw(last); 59 } 50 60 public void processMessage(String payload) throws Exception { 51 61 Exception last = null; … … 54 64 System.err.println("Can't grok:\n" + payload); 55 65 } 56 for ( StratconMessage m : messages ) { 57 if(m != null) try { m.handle(this); } catch (Exception e) { last = e; } 58 } 59 if(last != null) throw(last); 66 processMessage(messages); 60 67 } 61 68 } src/java/com/omniti/reconnoiter/IEPEngine.java
r1866283 r31fab75 12 12 import java.io.BufferedReader; 13 13 import java.io.InputStreamReader; 14 import java.util.List; 14 15 import com.omniti.reconnoiter.MQListener; 15 16 import com.omniti.reconnoiter.broker.BrokerFactory; 16 17 import com.omniti.reconnoiter.StratconConfig; 18 import com.omniti.reconnoiter.StratconMessage; 17 19 import com.espertech.esper.client.*; 18 20 import com.omniti.reconnoiter.esper.ExactStatViewFactory; … … 35 37 36 38 mql = new MQListener(epService, BrokerFactory.getBroker(sconf)); 39 try { 40 List<StratconMessage> mlist = sconf.getQueries(); 41 for ( StratconMessage m : mlist ) { 42 mql.preprocess(m); 43 } 44 } 45 catch (Exception e) { 46 System.err.println("Failed to load queries:"); 47 e.printStackTrace(); 48 System.exit(-1); 49 } 37 50 } 38 51 src/java/com/omniti/reconnoiter/MQListener.java
r5e2ef1b r31fab75 15 15 import com.espertech.esper.client.EPServiceProvider; 16 16 import java.util.concurrent.ConcurrentHashMap; 17 import java.util.LinkedList; 17 18 import java.util.UUID; 18 19 … … 21 22 private ConcurrentHashMap<UUID,StratconQueryBase> queries; 22 23 private IMQBroker broker; 24 private LinkedList<StratconMessage> preproc; 25 private boolean booted = false; 23 26 24 27 public MQListener(EPServiceProvider epService, IMQBroker broker) { … … 26 29 this.epService = epService; 27 30 this.broker = broker; 31 preproc = new LinkedList<StratconMessage>(); 32 } 33 34 public void preprocess(StratconMessage m) throws Exception { 35 if(booted) throw new Exception("Already booted"); 36 preproc.add(m); 28 37 } 29 38 30 39 public void run() { 31 40 EventHandler eh = new EventHandler(queries, this.epService, broker); 41 for (StratconMessage m : preproc) { 42 try { eh.processMessage(m); } 43 catch (Exception e) { 44 System.err.println("Something went wrong preprocessing events:"); 45 e.printStackTrace(); 46 System.exit(-2); 47 } 48 } 49 booted = true; 32 50 while(true) { 33 51 broker.connect(); src/java/com/omniti/reconnoiter/StratconConfig.java
r7afb4e3 r31fab75 12 12 import java.io.IOException; 13 13 import java.util.Properties; 14 import java.util.List; 15 import java.util.LinkedList; 16 import java.util.Hashtable; 14 17 import javax.xml.parsers.DocumentBuilder; 15 18 import javax.xml.parsers.DocumentBuilderFactory; … … 26 29 import org.w3c.dom.Document; 27 30 import org.w3c.dom.NodeList; 28 31 import org.w3c.dom.Node; 32 33 import com.omniti.reconnoiter.StratconMessage; 34 import com.omniti.reconnoiter.event.StratconStatement; 35 import com.omniti.reconnoiter.event.StratconQuery; 29 36 30 37 public class StratconConfig { 31 38 private Document doc; 32 33 34 35 39 40 41 public class StatementNode { 42 public String id; 43 public String statement; 44 public String provides; 45 public int marked; 46 public int nrequires; 47 public StatementNode[] requires; 48 } 49 36 50 public StratconConfig(String filename) { 37 51 DocumentBuilder docBuilder; … … 55 69 System.out.println("Bad file: " + e.getMessage()); 56 70 } 71 } 72 73 public List<StratconMessage> getQueries() throws Exception { 74 LinkedList<StratconMessage> list = new LinkedList<StratconMessage>(); 75 Hashtable<String,StatementNode> stmt_by_id = new Hashtable<String,StatementNode>(); 76 Hashtable<String,StatementNode> stmt_by_provider = new Hashtable<String,StatementNode>(); 77 XPathFactory factory = XPathFactory.newInstance(); 78 XPath xpath = factory.newXPath(); 79 XPathExpression expr_epl, expr_req; 80 Object result; 81 NodeList queries; 82 try { 83 XPathExpression expr = 84 xpath.compile("/stratcon/iep/queries[@master=\"iep\"]//statement"); 85 result = expr.evaluate(doc, XPathConstants.NODESET); 86 87 XPathExpression qexpr = 88 xpath.compile("/stratcon/iep/queries[@master=\"iep\"]//query"); 89 queries = (NodeList) qexpr.evaluate(doc, XPathConstants.NODESET); 90 91 expr_epl = xpath.compile("self::node()/epl"); 92 expr_req = xpath.compile("self::node()/requires"); 93 } 94 catch(XPathExpressionException e) { 95 System.err.println("Bad expression: " + e.getMessage()); 96 return null; 97 } 98 NodeList nodes = (NodeList) result; 99 100 /* Phase 1: sweep in all the statements */ 101 for(int i = 0; i < nodes.getLength(); i++) { 102 StatementNode stmt = new StatementNode(); 103 Node node = nodes.item(i); 104 Node node_id = node.getAttributes().getNamedItem("id"); 105 if(node_id == null) continue; 106 stmt.id = node_id.getNodeValue(); 107 Node provides_id = node.getAttributes().getNamedItem("provides"); 108 if(provides_id != null) stmt.provides = provides_id.getNodeValue(); 109 NodeList nodes_epl = (NodeList) expr_epl.evaluate(node, XPathConstants.NODESET); 110 if(nodes_epl.getLength() != 1) continue; 111 stmt.statement = nodes_epl.item(0).getTextContent(); 112 stmt_by_id.put(stmt.id, stmt); 113 if(stmt.provides != null) 114 if(stmt_by_provider.put(stmt.provides, stmt) != null) 115 throw new Exception("Two statements provide: " + stmt.provides); 116 } 117 118 /* Phase 2: load the requires graph */ 119 for(int i = 0; i < nodes.getLength(); i++) { 120 StatementNode stmt; 121 Node node = nodes.item(i); 122 Node node_id = node.getAttributes().getNamedItem("id"); 123 if(node_id == null) continue; 124 String id = node_id.getNodeValue(); 125 stmt = stmt_by_id.get(id); 126 if(stmt == null) throw new Exception("Cannot find statement: " + id); 127 NodeList nodes_req = (NodeList) expr_req.evaluate(node, XPathConstants.NODESET); 128 stmt.nrequires = nodes_req.getLength(); 129 if(stmt.nrequires > 0) { 130 stmt.requires = new StatementNode[stmt.nrequires]; 131 for(int j = 0; j < stmt.nrequires; j++) { 132 String req = nodes_req.item(j).getTextContent(); 133 StatementNode stmt_req = stmt_by_provider.get(req); 134 if(stmt_req == null) throw new Exception("Statement " + stmt.id + " requires " + req + " which no one provides."); 135 stmt.requires[j] = stmt_req; 136 } 137 } 138 } 139 140 /* Phase 3: Recursive sweep and mark to detect cycles. 141 We're walking the graph backwards here from dependent to provider, 142 but a cycle is a cycle, so this validates the graph. */ 143 int mgen = 0; 144 for ( StatementNode stmt : stmt_by_id.values() ) 145 if(stmt_mark_dag(stmt, ++mgen) < 0) throw new Exception("Statement " + stmt.id + " has cyclic requirements."); 146 147 /* Phase 4: clean the markings */ 148 for ( StatementNode stmt : stmt_by_id.values() ) 149 stmt.marked = 0; 150 151 /* Phase 5: do the load */ 152 for ( StatementNode stmt : stmt_by_id.values() ) 153 stmt_append_to_list(list, stmt); 154 155 156 /* Phase 6: pull in the queries (which is much simpler) */ 157 for (int i=0; i<queries.getLength(); i++) { 158 Node node = queries.item(i); 159 Node node_id = node.getAttributes().getNamedItem("id"); 160 if(node_id == null) continue; 161 String id = node_id.getNodeValue(); 162 Node node_topic = node.getAttributes().getNamedItem("topic"); 163 if(node_topic == null) continue; 164 String topic = node_topic.getNodeValue(); 165 NodeList nodes_epl = (NodeList) expr_epl.evaluate(node, XPathConstants.NODESET); 166 if(nodes_epl.getLength() != 1) continue; 167 String statement = nodes_epl.item(0).getTextContent(); 168 list.add(new StratconQuery(new String[] { "Q", "", id, topic, statement})); 169 } 170 return list; 171 } 172 173 protected void stmt_append_to_list(List<StratconMessage> l, StatementNode stmt) throws Exception { 174 if(stmt.marked > 0) return; 175 for(int i=0; i<stmt.nrequires; i++) 176 stmt_append_to_list(l, stmt.requires[i]); 177 178 l.add(new StratconStatement(new String [] { "D", "", stmt.id, stmt.statement })); 179 180 stmt.marked = 1; 181 } 182 183 protected int stmt_mark_dag(StatementNode stmt, int mgen) { 184 if(stmt.marked >= mgen) return -1; 185 if(stmt.marked > 0) return 0; 186 stmt.marked = mgen; 187 for(int i=0; i<stmt.nrequires; i++) 188 if(stmt_mark_dag(stmt.requires[i], mgen) < 0) return -1; 189 return 0; 57 190 } 58 191 src/stratcon.conf.in
r3739c78 r31fab75 49 49 <broker adapter="rabbitmq" /> 50 50 --> 51 <queries >51 <queries master="iep"> 52 52 <statement id="6cc613a4-7f9c-11de-973f-db7e8ccb2e5c" provides="CheckDetails-ddl"> 53 53 <epl>create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck</epl> src/stratcon_iep.c
r35d80a0 r31fab75 153 153 int klen, mgen = 0; 154 154 155 snprintf(path, sizeof(path), "/stratcon/iep/queries //statement");155 snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//statement"); 156 156 statement_configs = noit_conf_get_sections(NULL, path, &cnt); 157 157 noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); … … 272 272 char path[256]; 273 273 274 snprintf(path, sizeof(path), "/stratcon/iep/queries //query");274 snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//query"); 275 275 query_configs = noit_conf_get_sections(NULL, path, &cnt); 276 276 noitL(noit_debug, "Found %d %s stanzas\n", cnt, path);
