Changeset 31fab7554ce65d9d48275fe94fa845910c6efd9f

Show
Ignore:
Timestamp:
01/26/10 04:28:32 (4 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1264480112 +0000
git-parent:

[3739c78a534317b34706e055d7a120bdd59ab883]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1264480112 +0000
Message:

this is a first whack of duplicating the C function in Java, appears to work. refs #246

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/com/omniti/reconnoiter/EventHandler.java

    r54eb3fe r31fab75  
    4747  } 
    4848  public boolean isQueryRegistered(UUID id) { return queries.containsKey(id); } 
    49          
     49 
     50  public void processMessage(StratconMessage m) throws Exception { 
     51    m.handle(this); 
     52  } 
     53  public void processMessage(StratconMessage[] messages) throws Exception { 
     54    Exception last = null; 
     55    for ( StratconMessage m : messages ) { 
     56      if(m != null) try { processMessage(m); } catch (Exception e) { last = e; } 
     57    } 
     58    if(last != null) throw(last); 
     59  } 
    5060  public void processMessage(String payload) throws Exception { 
    5161    Exception last = null; 
     
    5464      System.err.println("Can't grok:\n" + payload); 
    5565    } 
    56     for ( StratconMessage m : messages ) { 
    57       if(m != null) try { m.handle(this); } catch (Exception e) { last = e; } 
    58     } 
    59     if(last != null) throw(last); 
     66    processMessage(messages); 
    6067  } 
    6168} 
  • src/java/com/omniti/reconnoiter/IEPEngine.java

    r1866283 r31fab75  
    1212import java.io.BufferedReader; 
    1313import java.io.InputStreamReader; 
     14import java.util.List; 
    1415import com.omniti.reconnoiter.MQListener; 
    1516import com.omniti.reconnoiter.broker.BrokerFactory; 
    1617import com.omniti.reconnoiter.StratconConfig; 
     18import com.omniti.reconnoiter.StratconMessage; 
    1719import com.espertech.esper.client.*; 
    1820import com.omniti.reconnoiter.esper.ExactStatViewFactory; 
     
    3537 
    3638    mql = new MQListener(epService, BrokerFactory.getBroker(sconf)); 
     39    try { 
     40      List<StratconMessage> mlist = sconf.getQueries(); 
     41      for ( StratconMessage m : mlist ) { 
     42        mql.preprocess(m); 
     43      } 
     44    } 
     45    catch (Exception e) { 
     46      System.err.println("Failed to load queries:"); 
     47      e.printStackTrace(); 
     48      System.exit(-1); 
     49    } 
    3750  } 
    3851 
  • src/java/com/omniti/reconnoiter/MQListener.java

    r5e2ef1b r31fab75  
    1515import com.espertech.esper.client.EPServiceProvider; 
    1616import java.util.concurrent.ConcurrentHashMap; 
     17import java.util.LinkedList; 
    1718import java.util.UUID; 
    1819 
     
    2122    private ConcurrentHashMap<UUID,StratconQueryBase> queries; 
    2223    private IMQBroker broker; 
     24    private LinkedList<StratconMessage> preproc; 
     25    private boolean booted = false; 
    2326 
    2427    public MQListener(EPServiceProvider epService, IMQBroker broker) { 
     
    2629      this.epService = epService; 
    2730      this.broker = broker; 
     31      preproc = new LinkedList<StratconMessage>(); 
     32    } 
     33 
     34    public void preprocess(StratconMessage m) throws Exception { 
     35      if(booted) throw new Exception("Already booted"); 
     36      preproc.add(m); 
    2837    } 
    2938     
    3039    public void run() { 
    3140      EventHandler eh = new EventHandler(queries, this.epService, broker); 
     41      for (StratconMessage m : preproc) { 
     42        try { eh.processMessage(m); } 
     43        catch (Exception e) { 
     44          System.err.println("Something went wrong preprocessing events:"); 
     45          e.printStackTrace(); 
     46          System.exit(-2); 
     47        } 
     48      } 
     49      booted = true; 
    3250      while(true) { 
    3351        broker.connect(); 
  • src/java/com/omniti/reconnoiter/StratconConfig.java

    r7afb4e3 r31fab75  
    1212import java.io.IOException; 
    1313import java.util.Properties; 
     14import java.util.List; 
     15import java.util.LinkedList; 
     16import java.util.Hashtable; 
    1417import javax.xml.parsers.DocumentBuilder; 
    1518import javax.xml.parsers.DocumentBuilderFactory; 
     
    2629import org.w3c.dom.Document; 
    2730import org.w3c.dom.NodeList; 
    28  
     31import org.w3c.dom.Node; 
     32 
     33import com.omniti.reconnoiter.StratconMessage; 
     34import com.omniti.reconnoiter.event.StratconStatement; 
     35import com.omniti.reconnoiter.event.StratconQuery; 
    2936 
    3037public class StratconConfig { 
    3138  private Document doc; 
    32    
    33    
    34  
    35  
     39 
     40 
     41  public class StatementNode { 
     42    public String id; 
     43    public String statement; 
     44    public String provides; 
     45    public int marked; 
     46    public int nrequires; 
     47    public StatementNode[] requires; 
     48  } 
     49   
    3650  public StratconConfig(String filename) { 
    3751    DocumentBuilder docBuilder; 
     
    5569        System.out.println("Bad file: " + e.getMessage()); 
    5670    } 
     71  } 
     72 
     73  public List<StratconMessage> getQueries() throws Exception { 
     74    LinkedList<StratconMessage> list = new LinkedList<StratconMessage>(); 
     75    Hashtable<String,StatementNode> stmt_by_id = new Hashtable<String,StatementNode>(); 
     76    Hashtable<String,StatementNode> stmt_by_provider = new Hashtable<String,StatementNode>(); 
     77    XPathFactory factory = XPathFactory.newInstance(); 
     78    XPath xpath = factory.newXPath(); 
     79    XPathExpression expr_epl, expr_req; 
     80    Object result; 
     81    NodeList queries; 
     82    try { 
     83      XPathExpression expr = 
     84        xpath.compile("/stratcon/iep/queries[@master=\"iep\"]//statement"); 
     85      result = expr.evaluate(doc, XPathConstants.NODESET); 
     86 
     87      XPathExpression qexpr = 
     88        xpath.compile("/stratcon/iep/queries[@master=\"iep\"]//query"); 
     89      queries = (NodeList) qexpr.evaluate(doc, XPathConstants.NODESET); 
     90 
     91      expr_epl = xpath.compile("self::node()/epl"); 
     92      expr_req = xpath.compile("self::node()/requires"); 
     93    } 
     94    catch(XPathExpressionException e) { 
     95      System.err.println("Bad expression: " + e.getMessage()); 
     96      return null; 
     97    } 
     98    NodeList nodes = (NodeList) result; 
     99 
     100    /* Phase 1: sweep in all the statements */ 
     101    for(int i = 0; i < nodes.getLength(); i++) { 
     102      StatementNode stmt = new StatementNode(); 
     103      Node node = nodes.item(i); 
     104      Node node_id = node.getAttributes().getNamedItem("id"); 
     105      if(node_id == null) continue; 
     106      stmt.id = node_id.getNodeValue(); 
     107      Node provides_id = node.getAttributes().getNamedItem("provides"); 
     108      if(provides_id != null) stmt.provides = provides_id.getNodeValue(); 
     109      NodeList nodes_epl = (NodeList) expr_epl.evaluate(node, XPathConstants.NODESET); 
     110      if(nodes_epl.getLength() != 1) continue; 
     111      stmt.statement = nodes_epl.item(0).getTextContent(); 
     112      stmt_by_id.put(stmt.id, stmt); 
     113      if(stmt.provides != null) 
     114        if(stmt_by_provider.put(stmt.provides, stmt) != null) 
     115          throw new Exception("Two statements provide: " + stmt.provides); 
     116    } 
     117 
     118    /* Phase 2: load the requires graph */ 
     119    for(int i = 0; i < nodes.getLength(); i++) { 
     120      StatementNode stmt; 
     121      Node node = nodes.item(i); 
     122      Node node_id = node.getAttributes().getNamedItem("id"); 
     123      if(node_id == null) continue; 
     124      String id = node_id.getNodeValue(); 
     125      stmt = stmt_by_id.get(id); 
     126      if(stmt == null) throw new Exception("Cannot find statement: " + id); 
     127      NodeList nodes_req = (NodeList) expr_req.evaluate(node, XPathConstants.NODESET); 
     128      stmt.nrequires = nodes_req.getLength(); 
     129      if(stmt.nrequires > 0) { 
     130        stmt.requires = new StatementNode[stmt.nrequires]; 
     131        for(int j = 0; j < stmt.nrequires; j++) { 
     132          String req = nodes_req.item(j).getTextContent(); 
     133          StatementNode stmt_req = stmt_by_provider.get(req); 
     134          if(stmt_req == null) throw new Exception("Statement " + stmt.id + " requires " + req + " which no one provides."); 
     135          stmt.requires[j] = stmt_req; 
     136        } 
     137      } 
     138    } 
     139 
     140    /* Phase 3: Recursive sweep and mark to detect cycles. 
     141       We're walking the graph backwards here from dependent to provider, 
     142       but a cycle is a cycle, so this validates the graph. */ 
     143    int mgen = 0; 
     144    for ( StatementNode stmt : stmt_by_id.values() ) 
     145      if(stmt_mark_dag(stmt, ++mgen) < 0) throw new Exception("Statement " + stmt.id + " has cyclic requirements."); 
     146 
     147    /* Phase 4: clean the markings */ 
     148    for ( StatementNode stmt : stmt_by_id.values() ) 
     149      stmt.marked = 0; 
     150 
     151    /* Phase 5: do the load */ 
     152    for ( StatementNode stmt : stmt_by_id.values() ) 
     153      stmt_append_to_list(list, stmt); 
     154 
     155 
     156    /* Phase 6: pull in the queries (which is much simpler) */ 
     157    for (int i=0; i<queries.getLength(); i++) { 
     158      Node node = queries.item(i); 
     159      Node node_id = node.getAttributes().getNamedItem("id"); 
     160      if(node_id == null) continue; 
     161      String id = node_id.getNodeValue(); 
     162      Node node_topic = node.getAttributes().getNamedItem("topic"); 
     163      if(node_topic == null) continue; 
     164      String topic = node_topic.getNodeValue(); 
     165      NodeList nodes_epl = (NodeList) expr_epl.evaluate(node, XPathConstants.NODESET); 
     166      if(nodes_epl.getLength() != 1) continue; 
     167      String statement = nodes_epl.item(0).getTextContent(); 
     168      list.add(new StratconQuery(new String[] { "Q", "", id, topic, statement})); 
     169    } 
     170    return list; 
     171  } 
     172 
     173  protected void stmt_append_to_list(List<StratconMessage> l, StatementNode stmt) throws Exception { 
     174    if(stmt.marked > 0) return; 
     175    for(int i=0; i<stmt.nrequires; i++) 
     176      stmt_append_to_list(l, stmt.requires[i]); 
     177 
     178    l.add(new StratconStatement(new String [] { "D", "", stmt.id, stmt.statement })); 
     179 
     180    stmt.marked = 1; 
     181  } 
     182 
     183  protected int stmt_mark_dag(StatementNode stmt, int mgen) { 
     184    if(stmt.marked >= mgen) return -1; 
     185    if(stmt.marked > 0) return 0; 
     186    stmt.marked = mgen; 
     187    for(int i=0; i<stmt.nrequires; i++) 
     188      if(stmt_mark_dag(stmt.requires[i], mgen) < 0) return -1; 
     189    return 0; 
    57190  } 
    58191 
  • src/stratcon.conf.in

    r3739c78 r31fab75  
    4949    <broker adapter="rabbitmq" /> 
    5050    --> 
    51     <queries
     51    <queries master="iep"
    5252      <statement id="6cc613a4-7f9c-11de-973f-db7e8ccb2e5c" provides="CheckDetails-ddl"> 
    5353        <epl>create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck</epl> 
  • src/stratcon_iep.c

    r35d80a0 r31fab75  
    153153  int klen, mgen = 0; 
    154154 
    155   snprintf(path, sizeof(path), "/stratcon/iep/queries//statement"); 
     155  snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//statement"); 
    156156  statement_configs = noit_conf_get_sections(NULL, path, &cnt); 
    157157  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 
     
    272272  char path[256]; 
    273273 
    274   snprintf(path, sizeof(path), "/stratcon/iep/queries//query"); 
     274  snprintf(path, sizeof(path), "/stratcon/iep/queries[@master=\"stratcond\"]//query"); 
    275275  query_configs = noit_conf_get_sections(NULL, path, &cnt); 
    276276  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path);