Show
Ignore:
Timestamp:
08/06/09 13:21:06 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1249564866 +0000
git-parent:

[e754a7ed9a2306bb7b477de6a099909fc3234c53]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1249564866 +0000
Message:

works for me. Ensures there is a (perhaps non-connected) DAG of statement dependencies and runs them, then loads data, then runs queries. refs #162

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/com/omniti/reconnoiter/EventHandler.java

    r4c3fb9b r699c97c  
    1010import com.omniti.reconnoiter.event.NoitEvent; 
    1111import com.omniti.reconnoiter.event.NoitMetricNumeric; 
     12import com.omniti.reconnoiter.event.StratconQueryBase; 
     13import com.omniti.reconnoiter.event.StratconStatement; 
    1214import com.omniti.reconnoiter.event.StratconQuery; 
    1315import com.omniti.reconnoiter.event.StratconQueryStop; 
     
    1517public class EventHandler { 
    1618         
    17        private EPServiceProvider epService; 
    18        private ConcurrentHashMap<UUID, StratconQuery> queries; 
    19        private IMQBroker broker; 
     19  private EPServiceProvider epService; 
     20  private ConcurrentHashMap<UUID, StratconQueryBase> queries; 
     21  private IMQBroker broker; 
    2022 
    21        public EventHandler(ConcurrentHashMap<UUID,StratconQuery> queries, EPServiceProvider epService, IMQBroker broker) { 
    22                this.epService = epService; 
    23                this.queries = queries; 
    24                this.broker = broker; 
    25        
     23  public EventHandler(ConcurrentHashMap<UUID,StratconQueryBase> queries, EPServiceProvider epService, IMQBroker broker) { 
     24    this.epService = epService; 
     25    this.queries = queries; 
     26    this.broker = broker; 
     27 
    2628         
    27         public void processMessage(String xml) throws Exception { 
    28                  
    29                 StratconMessage m = StratconMessage.makeMessage(xml); 
     29  public void processMessage(String xml) throws Exception { 
     30    StratconMessage m = StratconMessage.makeMessage(xml); 
    3031    if(m == null) { 
    3132      System.err.println("Can't grok:\n" + xml); 
    3233    } 
    33     if(m instanceof StratconQuery) { 
     34    if(m instanceof StratconStatement) { 
     35      StratconStatement sq = (StratconStatement) m; 
     36 
     37      if(queries.containsKey(sq.getUUID())) throw (new Exception("Duplicate Query")); 
     38 
     39      EPStatement statement = epService.getEPAdministrator().createEPL(sq.getExpression()); 
     40      sq.setStatement(statement); 
     41      queries.put(sq.getUUID(), sq); 
     42      System.err.println("Creating Statement: " + sq.getUUID()); 
     43    } 
     44    else if(m instanceof StratconQuery) { 
    3445      StratconQuery sq = (StratconQuery) m; 
    3546 
     
    4657    } 
    4758    else if(m instanceof StratconQueryStop) { 
    48       StratconQuery sq = queries.get(((StratconQueryStop) m).getUUID()); 
     59      /* QueryStop stops both queries and statements */ 
     60      StratconQueryBase sq = queries.get(((StratconQueryStop) m).getUUID()); 
    4961      if(sq != null) { 
    5062        queries.remove(sq.getUUID()); 
  • src/java/com/omniti/reconnoiter/IEPEngine.java

    r4c3fb9b r699c97c  
    3434    NoitEvent.registerTypes(epService); 
    3535 
    36     epService.getEPAdministrator().createEPL("create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck"); 
    37     epService.getEPAdministrator().createEPL("insert into CheckDetails select * from NoitCheck"); 
    38  
    3936    MQListener l = new MQListener(epService, BrokerFactory.getBroker(sconf)); 
    4037 
  • src/java/com/omniti/reconnoiter/MQListener.java

    r4c3fb9b r699c97c  
    1919public class MQListener implements Runnable { 
    2020    private EPServiceProvider epService; 
    21     private ConcurrentHashMap<UUID,StratconQuery> queries; 
     21    private ConcurrentHashMap<UUID,StratconQueryBase> queries; 
    2222    private IMQBroker broker; 
    2323 
    2424    public MQListener(EPServiceProvider epService, IMQBroker broker) { 
    25       this.queries = new ConcurrentHashMap<UUID,StratconQuery>(); 
     25      this.queries = new ConcurrentHashMap<UUID,StratconQueryBase>(); 
    2626      this.epService = epService; 
    2727      this.broker = broker; 
  • src/java/com/omniti/reconnoiter/StratconMessage.java

    r2220278 r699c97c  
    5959      } 
    6060      // and requests 
     61      else if(tag.equals("StratconStatement")) 
     62        return new StratconStatement(document); 
    6163      else if(tag.equals("StratconQuery")) 
    6264        return new StratconQuery(document); 
  • src/java/com/omniti/reconnoiter/event/StratconQuery.java

    r2220278 r699c97c  
    1111import java.lang.System; 
    1212 
     13import com.omniti.reconnoiter.event.StratconQueryBase; 
    1314import com.omniti.reconnoiter.StratconMessage; 
    1415import com.espertech.esper.client.EPStatement; 
     
    2425import org.w3c.dom.NodeList; 
    2526 
    26 public class StratconQuery extends StratconMessage { 
    27   private EPStatement statement; 
    28   private UpdateListener listener; 
    29   private UUID uuid; 
    30   private String name; 
    31   private String expression; 
     27public class StratconQuery extends StratconQueryBase { 
     28  protected UpdateListener listener; 
     29  protected String name; 
    3230 
    3331  public StratconQuery(Document d) { 
     
    4543    if(uuid == null) uuid = UUID.randomUUID(); 
    4644  } 
    47   public UUID getUUID() { 
    48     return uuid; 
    49   } 
    50   public EPStatement getStatement() { 
    51     return statement; 
    52   } 
    53   public String getExpression() { 
    54     return expression; 
    55   } 
    5645  public String getName() { 
    5746    return name; 
    58   } 
    59   public void setStatement(EPStatement s) { 
    60     this.statement = s; 
    6147  } 
    6248  public void setListener(UpdateListener l) {