Changeset 2534eec9709c2c836a34394e9854dda289f8a208

Show
Ignore:
Timestamp:
05/22/12 21:51:03 (2 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1337723463 -0400
git-parent:

[27a5e331e0c69b61663894284bea87f17f1ed062]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1337723463 -0400
Message:

attempt to decouple the esper stuff from the broker stuff so the code can be reused without esper (passes iep-related test suite still)

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/Makefile.in

    ra968da7 r2534eec  
    2525R_JAVA=com/omniti/reconnoiter/MQListener.java \ 
    2626        com/omniti/reconnoiter/EventHandler.java \ 
     27        com/omniti/reconnoiter/IEventHandler.java \ 
    2728        com/omniti/reconnoiter/MessageHandler.java \ 
    2829        com/omniti/reconnoiter/CheckStatus.java \ 
  • src/java/com/omniti/reconnoiter/EventHandler.java

    r372c4a8 r2534eec  
    1919import com.omniti.reconnoiter.event.*; 
    2020import com.omniti.reconnoiter.MessageHandler; 
     21import com.omniti.reconnoiter.IEventHandler; 
    2122import java.util.Map; 
    2223import java.util.HashMap; 
    2324import java.util.concurrent.atomic.AtomicLong; 
    2425 
    25 public class EventHandler
     26public class EventHandler implements IEventHandler
    2627  private LinkedList<MessageHandler> alternates; 
    2728  private EPServiceProvider epService; 
     
    101102    return false; 
    102103  } 
     104  public void sendEvent(StratconMessage m) { 
     105    getService().getEPRuntime().sendEvent(m); 
     106  } 
    103107} 
  • src/java/com/omniti/reconnoiter/IEPEngine.java

    r31fab75 r2534eec  
    1717import com.omniti.reconnoiter.StratconConfig; 
    1818import com.omniti.reconnoiter.StratconMessage; 
     19import com.omniti.reconnoiter.event.*; 
    1920import com.espertech.esper.client.*; 
    2021import com.omniti.reconnoiter.esper.ExactStatViewFactory; 
     
    2223import com.omniti.reconnoiter.esper.CounterViewFactory; 
    2324import org.apache.log4j.BasicConfigurator; 
     25import java.util.UUID; 
     26import java.util.concurrent.ConcurrentHashMap; 
     27import com.omniti.reconnoiter.broker.IMQBroker; 
    2428 
    2529class IEPEngine { 
     
    3640    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); 
    3741 
    38     mql = new MQListener(epService, BrokerFactory.getBroker(sconf)); 
     42    IMQBroker broker = BrokerFactory.getBroker(sconf); 
     43    EventHandler eh = new EventHandler( 
     44        new ConcurrentHashMap<UUID,StratconQueryBase>(), 
     45        epService, broker); 
     46    mql = new MQListener(eh, broker); 
    3947    try { 
    4048      List<StratconMessage> mlist = sconf.getQueries(); 
  • src/java/com/omniti/reconnoiter/MQListener.java

    rd78a062 r2534eec  
    1414import java.lang.Runnable; 
    1515 
    16 import com.espertech.esper.client.EPServiceProvider; 
    1716import java.util.concurrent.ConcurrentHashMap; 
    1817import java.util.LinkedList; 
     
    2120 
    2221public class MQListener implements Runnable { 
    23     private EPServiceProvider epService; 
    24     private ConcurrentHashMap<UUID,StratconQueryBase> queries; 
    2522    private IMQBroker broker; 
    2623    private LinkedList<StratconMessage> preproc; 
     
    2825    private LinkedList<MessageHandler>  alternates; 
    2926    private boolean booted = false; 
    30     private EventHandler eh = null; 
     27    private IEventHandler eh = null; 
    3128 
    32     public MQListener(EPServiceProvider epService, IMQBroker broker) { 
    33       this.queries = new ConcurrentHashMap<UUID,StratconQueryBase>(); 
    34       this.epService = epService; 
     29    public MQListener(IEventHandler eh, IMQBroker broker) { 
    3530      this.broker = broker; 
     31      this.eh = eh; 
    3632      preproc = new LinkedList<StratconMessage>(); 
    3733      queries_toload = new LinkedList<StratconMessage>(); 
     
    5046    } 
    5147 
    52     protected void process(EventHandler eh, List<StratconMessage> l) { 
     48    protected void process(IEventHandler eh, List<StratconMessage> l) { 
    5349      for (StratconMessage m : l) { 
    5450        try { eh.processMessage(m); } 
     
    6258      booted = true; 
    6359    } 
    64     public EventHandler getEventHandler() { return eh; } 
     60    public IEventHandler getEventHandler() { return eh; } 
    6561    public IMQBroker getBroker() { return broker; } 
    6662    public void run() { 
    67       eh = new EventHandler(queries, this.epService, broker); 
    6863      for ( MessageHandler mh : alternates ) eh.addObserver(mh); 
    6964      process(eh, preproc); 
  • src/java/com/omniti/reconnoiter/broker/AMQBroker.java

    r373974a r2534eec  
    2222import com.espertech.esper.client.EPServiceProvider; 
    2323import com.espertech.esper.client.UpdateListener; 
    24 import com.omniti.reconnoiter.EventHandler; 
     24import com.omniti.reconnoiter.IEventHandler; 
    2525import com.omniti.reconnoiter.StratconConfig; 
    2626import com.omniti.reconnoiter.event.StratconQuery; 
     
    3939    try { 
    4040      this.listenerClass = Class.forName(className); 
    41       this.con = this.listenerClass.getDeclaredConstructor( 
    42           new Class[] { EPServiceProvider.class, StratconQuery.class, String.class } 
    43       ); 
    4441    } 
    4542    catch(java.lang.ClassNotFoundException e) { 
    4643      throw new RuntimeException("Cannot find class: " + className); 
    4744    } 
    48     catch(java.lang.NoSuchMethodException e) { 
    49       throw new RuntimeException("Cannot find constructor for class: " + className); 
    50     } 
    51  
    5245  } 
    5346 
     
    6861  } 
    6962   
    70   public void consume(EventHandler eh) { 
     63  public void consume(IEventHandler eh) { 
    7164    while (true) { 
    7265      Message message = null; 
     
    8679    } 
    8780  } 
    88    
    89   public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq) { 
    90     UpdateListener l = null; 
    91     try { 
    92       l = con.newInstance(epService, sq, "vm://localhost"); 
    93     } 
    94     catch(java.lang.InstantiationException ie) { } 
    95     catch(java.lang.IllegalAccessException ie) { } 
    96     catch(java.lang.reflect.InvocationTargetException ie) { } 
    97     return l; 
    98   } 
     81 
     82  public Class getListenerClass() { return listenerClass; } 
     83  public String getAlertRoutingKey() { return "vm://localhost"; } 
     84  public String getAlertExchangeName() { return ""; } 
    9985} 
  • src/java/com/omniti/reconnoiter/broker/AMQListener.java

    r7566c98 r2534eec  
    3535    private StratconQuery sq; 
    3636 
    37     public AMQListener(EPServiceProvider epService, StratconQuery sq, String binding) { 
     37    public AMQListener(EPServiceProvider epService, StratconQuery sq, AMQBroker broker, String binding, String unused) { 
    3838      super(); 
    3939      try { 
  • src/java/com/omniti/reconnoiter/broker/IMQBroker.java

    r6279a55 r2534eec  
    1111import com.espertech.esper.client.EPServiceProvider; 
    1212import com.espertech.esper.client.UpdateListener; 
    13 import com.omniti.reconnoiter.EventHandler; 
     13import com.omniti.reconnoiter.IEventHandler; 
    1414import com.omniti.reconnoiter.event.StratconQuery; 
    1515import java.io.IOException; 
    1616 
    1717public interface IMQBroker { 
    18    
    1918  public void connect() throws Exception; 
    2019  public void disconnect(); 
    21   public void consume(EventHandler eh) throws IOException; 
    22   public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq); 
    23  
     20  public void consume(IEventHandler eh) throws IOException; 
     21  public Class getListenerClass(); 
     22  public String getAlertExchangeName(); 
     23  public String getAlertRoutingKey(); 
    2424} 
  • src/java/com/omniti/reconnoiter/broker/RabbitBroker.java

    ra2f3c05 r2534eec  
    1010 
    1111import java.io.IOException; 
    12 import java.lang.reflect.Constructor; 
    1312 
    1413import org.apache.log4j.Logger; 
    15 import com.espertech.esper.client.EPServiceProvider; 
    16 import com.espertech.esper.client.UpdateListener; 
    17 import com.omniti.reconnoiter.EventHandler; 
     14import com.omniti.reconnoiter.IEventHandler; 
    1815import com.omniti.reconnoiter.StratconConfig; 
    1916import com.omniti.reconnoiter.event.StratconQuery; 
     
    5047  private boolean durableQueue; 
    5148  private boolean durableExchange; 
    52   private Constructor<UpdateListener> con; 
    5349 
    54   @SuppressWarnings("unchecked")  
    5550  public RabbitBroker(StratconConfig config) { 
    5651    this.conn = null; 
     
    6459    this.heartBeat = (this.heartBeat + 999) / 1000; // (ms -> seconds, rounding up) 
    6560    this.connectTimeout = Integer.parseInt(config.getBrokerParameter("connect_timeout", "5000")); 
    66      
     61 
    6762    String className = config.getBrokerParameter("listenerClass", "com.omniti.reconnoiter.broker.RabbitListener"); 
    6863    try { 
    6964      this.listenerClass = Class.forName(className); 
    70       this.con = this.listenerClass.getDeclaredConstructor( 
    71           new Class[] { EPServiceProvider.class, StratconQuery.class, RabbitBroker.class, 
    72                         String.class, String.class } 
    73       ); 
    7465    } 
    7566    catch(java.lang.ClassNotFoundException e) { 
    7667      throw new RuntimeException("Cannot find class: " + className); 
    77     } 
    78     catch(java.lang.NoSuchMethodException e) { 
    79       throw new RuntimeException("Cannot find constructor for class: " + className); 
    8068    } 
    8169 
     
    154142  public Channel getChannel() { return channel; } 
    155143   
    156   public void consume(EventHandler eh) throws IOException { 
     144  public void consume(IEventHandler eh) throws IOException { 
    157145    QueueingConsumer consumer = new QueueingConsumer(channel); 
    158146 
     
    180168  } 
    181169 
    182   public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq) { 
    183     UpdateListener l = null; 
    184     try { 
    185       l = con.newInstance(epService, sq, this, alertExchangeName, alertRoutingKey); 
    186     } 
    187     catch(java.lang.InstantiationException ie) { } 
    188     catch(java.lang.IllegalAccessException ie) { } 
    189     catch(java.lang.reflect.InvocationTargetException ie) { } 
    190     return l; 
    191   } 
     170  public Class getListenerClass() { return listenerClass; } 
     171  public String getAlertExchangeName() { return alertExchangeName; } 
     172  public String getAlertRoutingKey() { return alertRoutingKey; } 
    192173} 
  • src/java/com/omniti/reconnoiter/event/NoitMetric.java

    r69c4c69 r2534eec  
    3636  public void handle(EventHandler eh) { 
    3737    long start = System.nanoTime(); 
    38     if(nmn != null) eh.getService().getEPRuntime().sendEvent(nmn); 
    39     if(nmt != null) eh.getService().getEPRuntime().sendEvent(nmt); 
     38    if(nmn != null) eh.sendEvent(nmn); 
     39    if(nmt != null) eh.sendEvent(nmt); 
    4040    long nanos = System.nanoTime() - start; 
    4141    logger.debug("sendEvent("+getUuid()+"-"+getName()+") took "+(nanos/1000)+"us"); 
  • src/java/com/omniti/reconnoiter/event/StratconQuery.java

    rd483232 r2534eec  
    99package com.omniti.reconnoiter.event; 
    1010 
    11  
     11import java.lang.reflect.Constructor; 
    1212import com.omniti.reconnoiter.event.StratconQueryBase; 
    1313import com.omniti.reconnoiter.EventHandler; 
     14import com.omniti.reconnoiter.IEventHandler; 
     15import com.omniti.reconnoiter.broker.IMQBroker; 
    1416import com.espertech.esper.client.EPStatement; 
    1517import com.espertech.esper.client.UpdateListener; 
     18import com.espertech.esper.client.EPServiceProvider; 
    1619import java.util.UUID; 
    1720 
     
    6366  public int numparts() { return 5; } 
    6467 
     68  @SuppressWarnings("unchecked") 
     69  public UpdateListener getListener(EventHandler eh, StratconQuery sq) { 
     70    UpdateListener l = null; 
     71    try { 
     72      Constructor<UpdateListener> con; 
     73      IMQBroker broker = eh.getBroker(); 
     74      con = broker.getListenerClass().getDeclaredConstructor( 
     75            new Class[] { EPServiceProvider.class, StratconQuery.class, broker.getClass(), 
     76                          String.class, String.class } 
     77        ); 
     78      l = con.newInstance(eh.getService(), sq, broker, 
     79                          broker.getAlertExchangeName(), broker.getAlertRoutingKey()); 
     80    } 
     81    catch(java.lang.NoSuchMethodException e) {} 
     82    catch(java.lang.InstantiationException ie) { } 
     83    catch(java.lang.IllegalAccessException ie) { } 
     84    catch(java.lang.reflect.InvocationTargetException ie) { } 
     85    return l; 
     86  } 
     87 
    6588  public void handle(EventHandler eh) { 
    6689    eh.deregisterQuery(getUUID()); 
    6790 
    6891    EPStatement statement = eh.getService().getEPAdministrator().createEPL(getExpression()); 
    69     UpdateListener o = eh.getBroker().getListener(eh.getService(), this); 
     92    UpdateListener o = getListener(eh, this); 
    7093 
    7194    statement.addListener(o);