Changeset 2534eec9709c2c836a34394e9854dda289f8a208
- Timestamp:
- 05/22/12 21:51:03
(1 year ago)
- Author:
- Theo Schlossnagle <jesus@omniti.com>
- git-committer:
- Theo Schlossnagle <jesus@omniti.com> 1337723463 -0400
- git-parent:
[27a5e331e0c69b61663894284bea87f17f1ed062]
- git-author:
- Theo Schlossnagle <jesus@omniti.com> 1337723463 -0400
- Message:
attempt to decouple the esper stuff from the broker stuff so the code can be reused without esper (passes iep-related test suite still)
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| ra968da7 |
r2534eec |
|
| 25 | 25 | R_JAVA=com/omniti/reconnoiter/MQListener.java \ |
|---|
| 26 | 26 | com/omniti/reconnoiter/EventHandler.java \ |
|---|
| | 27 | com/omniti/reconnoiter/IEventHandler.java \ |
|---|
| 27 | 28 | com/omniti/reconnoiter/MessageHandler.java \ |
|---|
| 28 | 29 | com/omniti/reconnoiter/CheckStatus.java \ |
|---|
| r372c4a8 |
r2534eec |
|
| 19 | 19 | import com.omniti.reconnoiter.event.*; |
|---|
| 20 | 20 | import com.omniti.reconnoiter.MessageHandler; |
|---|
| | 21 | import com.omniti.reconnoiter.IEventHandler; |
|---|
| 21 | 22 | import java.util.Map; |
|---|
| 22 | 23 | import java.util.HashMap; |
|---|
| 23 | 24 | import java.util.concurrent.atomic.AtomicLong; |
|---|
| 24 | 25 | |
|---|
| 25 | | public class EventHandler { |
|---|
| | 26 | public class EventHandler implements IEventHandler { |
|---|
| 26 | 27 | private LinkedList<MessageHandler> alternates; |
|---|
| 27 | 28 | private EPServiceProvider epService; |
|---|
| … | … | |
| 101 | 102 | return false; |
|---|
| 102 | 103 | } |
|---|
| | 104 | public void sendEvent(StratconMessage m) { |
|---|
| | 105 | getService().getEPRuntime().sendEvent(m); |
|---|
| | 106 | } |
|---|
| 103 | 107 | } |
|---|
| r31fab75 |
r2534eec |
|
| 17 | 17 | import com.omniti.reconnoiter.StratconConfig; |
|---|
| 18 | 18 | import com.omniti.reconnoiter.StratconMessage; |
|---|
| | 19 | import com.omniti.reconnoiter.event.*; |
|---|
| 19 | 20 | import com.espertech.esper.client.*; |
|---|
| 20 | 21 | import com.omniti.reconnoiter.esper.ExactStatViewFactory; |
|---|
| … | … | |
| 22 | 23 | import com.omniti.reconnoiter.esper.CounterViewFactory; |
|---|
| 23 | 24 | import org.apache.log4j.BasicConfigurator; |
|---|
| | 25 | import java.util.UUID; |
|---|
| | 26 | import java.util.concurrent.ConcurrentHashMap; |
|---|
| | 27 | import com.omniti.reconnoiter.broker.IMQBroker; |
|---|
| 24 | 28 | |
|---|
| 25 | 29 | class IEPEngine { |
|---|
| … | … | |
| 36 | 40 | EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); |
|---|
| 37 | 41 | |
|---|
| 38 | | mql = new MQListener(epService, BrokerFactory.getBroker(sconf)); |
|---|
| | 42 | IMQBroker broker = BrokerFactory.getBroker(sconf); |
|---|
| | 43 | EventHandler eh = new EventHandler( |
|---|
| | 44 | new ConcurrentHashMap<UUID,StratconQueryBase>(), |
|---|
| | 45 | epService, broker); |
|---|
| | 46 | mql = new MQListener(eh, broker); |
|---|
| 39 | 47 | try { |
|---|
| 40 | 48 | List<StratconMessage> mlist = sconf.getQueries(); |
|---|
| rd78a062 |
r2534eec |
|
| 14 | 14 | import java.lang.Runnable; |
|---|
| 15 | 15 | |
|---|
| 16 | | import com.espertech.esper.client.EPServiceProvider; |
|---|
| 17 | 16 | import java.util.concurrent.ConcurrentHashMap; |
|---|
| 18 | 17 | import java.util.LinkedList; |
|---|
| … | … | |
| 21 | 20 | |
|---|
| 22 | 21 | public class MQListener implements Runnable { |
|---|
| 23 | | private EPServiceProvider epService; |
|---|
| 24 | | private ConcurrentHashMap<UUID,StratconQueryBase> queries; |
|---|
| 25 | 22 | private IMQBroker broker; |
|---|
| 26 | 23 | private LinkedList<StratconMessage> preproc; |
|---|
| … | … | |
| 28 | 25 | private LinkedList<MessageHandler> alternates; |
|---|
| 29 | 26 | private boolean booted = false; |
|---|
| 30 | | private EventHandler eh = null; |
|---|
| | 27 | private IEventHandler eh = null; |
|---|
| 31 | 28 | |
|---|
| 32 | | public MQListener(EPServiceProvider epService, IMQBroker broker) { |
|---|
| 33 | | this.queries = new ConcurrentHashMap<UUID,StratconQueryBase>(); |
|---|
| 34 | | this.epService = epService; |
|---|
| | 29 | public MQListener(IEventHandler eh, IMQBroker broker) { |
|---|
| 35 | 30 | this.broker = broker; |
|---|
| | 31 | this.eh = eh; |
|---|
| 36 | 32 | preproc = new LinkedList<StratconMessage>(); |
|---|
| 37 | 33 | queries_toload = new LinkedList<StratconMessage>(); |
|---|
| … | … | |
| 50 | 46 | } |
|---|
| 51 | 47 | |
|---|
| 52 | | protected void process(EventHandler eh, List<StratconMessage> l) { |
|---|
| | 48 | protected void process(IEventHandler eh, List<StratconMessage> l) { |
|---|
| 53 | 49 | for (StratconMessage m : l) { |
|---|
| 54 | 50 | try { eh.processMessage(m); } |
|---|
| … | … | |
| 62 | 58 | booted = true; |
|---|
| 63 | 59 | } |
|---|
| 64 | | public EventHandler getEventHandler() { return eh; } |
|---|
| | 60 | public IEventHandler getEventHandler() { return eh; } |
|---|
| 65 | 61 | public IMQBroker getBroker() { return broker; } |
|---|
| 66 | 62 | public void run() { |
|---|
| 67 | | eh = new EventHandler(queries, this.epService, broker); |
|---|
| 68 | 63 | for ( MessageHandler mh : alternates ) eh.addObserver(mh); |
|---|
| 69 | 64 | process(eh, preproc); |
|---|
| r373974a |
r2534eec |
|
| 22 | 22 | import com.espertech.esper.client.EPServiceProvider; |
|---|
| 23 | 23 | import com.espertech.esper.client.UpdateListener; |
|---|
| 24 | | import com.omniti.reconnoiter.EventHandler; |
|---|
| | 24 | import com.omniti.reconnoiter.IEventHandler; |
|---|
| 25 | 25 | import com.omniti.reconnoiter.StratconConfig; |
|---|
| 26 | 26 | import com.omniti.reconnoiter.event.StratconQuery; |
|---|
| … | … | |
| 39 | 39 | try { |
|---|
| 40 | 40 | this.listenerClass = Class.forName(className); |
|---|
| 41 | | this.con = this.listenerClass.getDeclaredConstructor( |
|---|
| 42 | | new Class[] { EPServiceProvider.class, StratconQuery.class, String.class } |
|---|
| 43 | | ); |
|---|
| 44 | 41 | } |
|---|
| 45 | 42 | catch(java.lang.ClassNotFoundException e) { |
|---|
| 46 | 43 | throw new RuntimeException("Cannot find class: " + className); |
|---|
| 47 | 44 | } |
|---|
| 48 | | catch(java.lang.NoSuchMethodException e) { |
|---|
| 49 | | throw new RuntimeException("Cannot find constructor for class: " + className); |
|---|
| 50 | | } |
|---|
| 51 | | |
|---|
| 52 | 45 | } |
|---|
| 53 | 46 | |
|---|
| … | … | |
| 68 | 61 | } |
|---|
| 69 | 62 | |
|---|
| 70 | | public void consume(EventHandler eh) { |
|---|
| | 63 | public void consume(IEventHandler eh) { |
|---|
| 71 | 64 | while (true) { |
|---|
| 72 | 65 | Message message = null; |
|---|
| … | … | |
| 86 | 79 | } |
|---|
| 87 | 80 | } |
|---|
| 88 | | |
|---|
| 89 | | public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq) { |
|---|
| 90 | | UpdateListener l = null; |
|---|
| 91 | | try { |
|---|
| 92 | | l = con.newInstance(epService, sq, "vm://localhost"); |
|---|
| 93 | | } |
|---|
| 94 | | catch(java.lang.InstantiationException ie) { } |
|---|
| 95 | | catch(java.lang.IllegalAccessException ie) { } |
|---|
| 96 | | catch(java.lang.reflect.InvocationTargetException ie) { } |
|---|
| 97 | | return l; |
|---|
| 98 | | } |
|---|
| | 81 | |
|---|
| | 82 | public Class getListenerClass() { return listenerClass; } |
|---|
| | 83 | public String getAlertRoutingKey() { return "vm://localhost"; } |
|---|
| | 84 | public String getAlertExchangeName() { return ""; } |
|---|
| 99 | 85 | } |
|---|
| r7566c98 |
r2534eec |
|
| 35 | 35 | private StratconQuery sq; |
|---|
| 36 | 36 | |
|---|
| 37 | | public AMQListener(EPServiceProvider epService, StratconQuery sq, String binding) { |
|---|
| | 37 | public AMQListener(EPServiceProvider epService, StratconQuery sq, AMQBroker broker, String binding, String unused) { |
|---|
| 38 | 38 | super(); |
|---|
| 39 | 39 | try { |
|---|
| r6279a55 |
r2534eec |
|
| 11 | 11 | import com.espertech.esper.client.EPServiceProvider; |
|---|
| 12 | 12 | import com.espertech.esper.client.UpdateListener; |
|---|
| 13 | | import com.omniti.reconnoiter.EventHandler; |
|---|
| | 13 | import com.omniti.reconnoiter.IEventHandler; |
|---|
| 14 | 14 | import com.omniti.reconnoiter.event.StratconQuery; |
|---|
| 15 | 15 | import java.io.IOException; |
|---|
| 16 | 16 | |
|---|
| 17 | 17 | public interface IMQBroker { |
|---|
| 18 | | |
|---|
| 19 | 18 | public void connect() throws Exception; |
|---|
| 20 | 19 | public void disconnect(); |
|---|
| 21 | | public void consume(EventHandler eh) throws IOException; |
|---|
| 22 | | public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq); |
|---|
| 23 | | |
|---|
| | 20 | public void consume(IEventHandler eh) throws IOException; |
|---|
| | 21 | public Class getListenerClass(); |
|---|
| | 22 | public String getAlertExchangeName(); |
|---|
| | 23 | public String getAlertRoutingKey(); |
|---|
| 24 | 24 | } |
|---|
| ra2f3c05 |
r2534eec |
|
| 10 | 10 | |
|---|
| 11 | 11 | import java.io.IOException; |
|---|
| 12 | | import java.lang.reflect.Constructor; |
|---|
| 13 | 12 | |
|---|
| 14 | 13 | import org.apache.log4j.Logger; |
|---|
| 15 | | import com.espertech.esper.client.EPServiceProvider; |
|---|
| 16 | | import com.espertech.esper.client.UpdateListener; |
|---|
| 17 | | import com.omniti.reconnoiter.EventHandler; |
|---|
| | 14 | import com.omniti.reconnoiter.IEventHandler; |
|---|
| 18 | 15 | import com.omniti.reconnoiter.StratconConfig; |
|---|
| 19 | 16 | import com.omniti.reconnoiter.event.StratconQuery; |
|---|
| … | … | |
| 50 | 47 | private boolean durableQueue; |
|---|
| 51 | 48 | private boolean durableExchange; |
|---|
| 52 | | private Constructor<UpdateListener> con; |
|---|
| 53 | 49 | |
|---|
| 54 | | @SuppressWarnings("unchecked") |
|---|
| 55 | 50 | public RabbitBroker(StratconConfig config) { |
|---|
| 56 | 51 | this.conn = null; |
|---|
| … | … | |
| 64 | 59 | this.heartBeat = (this.heartBeat + 999) / 1000; // (ms -> seconds, rounding up) |
|---|
| 65 | 60 | this.connectTimeout = Integer.parseInt(config.getBrokerParameter("connect_timeout", "5000")); |
|---|
| 66 | | |
|---|
| | 61 | |
|---|
| 67 | 62 | String className = config.getBrokerParameter("listenerClass", "com.omniti.reconnoiter.broker.RabbitListener"); |
|---|
| 68 | 63 | try { |
|---|
| 69 | 64 | this.listenerClass = Class.forName(className); |
|---|
| 70 | | this.con = this.listenerClass.getDeclaredConstructor( |
|---|
| 71 | | new Class[] { EPServiceProvider.class, StratconQuery.class, RabbitBroker.class, |
|---|
| 72 | | String.class, String.class } |
|---|
| 73 | | ); |
|---|
| 74 | 65 | } |
|---|
| 75 | 66 | catch(java.lang.ClassNotFoundException e) { |
|---|
| 76 | 67 | throw new RuntimeException("Cannot find class: " + className); |
|---|
| 77 | | } |
|---|
| 78 | | catch(java.lang.NoSuchMethodException e) { |
|---|
| 79 | | throw new RuntimeException("Cannot find constructor for class: " + className); |
|---|
| 80 | 68 | } |
|---|
| 81 | 69 | |
|---|
| … | … | |
| 154 | 142 | public Channel getChannel() { return channel; } |
|---|
| 155 | 143 | |
|---|
| 156 | | public void consume(EventHandler eh) throws IOException { |
|---|
| | 144 | public void consume(IEventHandler eh) throws IOException { |
|---|
| 157 | 145 | QueueingConsumer consumer = new QueueingConsumer(channel); |
|---|
| 158 | 146 | |
|---|
| … | … | |
| 180 | 168 | } |
|---|
| 181 | 169 | |
|---|
| 182 | | public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq) { |
|---|
| 183 | | UpdateListener l = null; |
|---|
| 184 | | try { |
|---|
| 185 | | l = con.newInstance(epService, sq, this, alertExchangeName, alertRoutingKey); |
|---|
| 186 | | } |
|---|
| 187 | | catch(java.lang.InstantiationException ie) { } |
|---|
| 188 | | catch(java.lang.IllegalAccessException ie) { } |
|---|
| 189 | | catch(java.lang.reflect.InvocationTargetException ie) { } |
|---|
| 190 | | return l; |
|---|
| 191 | | } |
|---|
| | 170 | public Class getListenerClass() { return listenerClass; } |
|---|
| | 171 | public String getAlertExchangeName() { return alertExchangeName; } |
|---|
| | 172 | public String getAlertRoutingKey() { return alertRoutingKey; } |
|---|
| 192 | 173 | } |
|---|
| r69c4c69 |
r2534eec |
|
| 36 | 36 | public void handle(EventHandler eh) { |
|---|
| 37 | 37 | long start = System.nanoTime(); |
|---|
| 38 | | if(nmn != null) eh.getService().getEPRuntime().sendEvent(nmn); |
|---|
| 39 | | if(nmt != null) eh.getService().getEPRuntime().sendEvent(nmt); |
|---|
| | 38 | if(nmn != null) eh.sendEvent(nmn); |
|---|
| | 39 | if(nmt != null) eh.sendEvent(nmt); |
|---|
| 40 | 40 | long nanos = System.nanoTime() - start; |
|---|
| 41 | 41 | logger.debug("sendEvent("+getUuid()+"-"+getName()+") took "+(nanos/1000)+"us"); |
|---|
| rd483232 |
r2534eec |
|
| 9 | 9 | package com.omniti.reconnoiter.event; |
|---|
| 10 | 10 | |
|---|
| 11 | | |
|---|
| | 11 | import java.lang.reflect.Constructor; |
|---|
| 12 | 12 | import com.omniti.reconnoiter.event.StratconQueryBase; |
|---|
| 13 | 13 | import com.omniti.reconnoiter.EventHandler; |
|---|
| | 14 | import com.omniti.reconnoiter.IEventHandler; |
|---|
| | 15 | import com.omniti.reconnoiter.broker.IMQBroker; |
|---|
| 14 | 16 | import com.espertech.esper.client.EPStatement; |
|---|
| 15 | 17 | import com.espertech.esper.client.UpdateListener; |
|---|
| | 18 | import com.espertech.esper.client.EPServiceProvider; |
|---|
| 16 | 19 | import java.util.UUID; |
|---|
| 17 | 20 | |
|---|
| … | … | |
| 63 | 66 | public int numparts() { return 5; } |
|---|
| 64 | 67 | |
|---|
| | 68 | @SuppressWarnings("unchecked") |
|---|
| | 69 | public UpdateListener getListener(EventHandler eh, StratconQuery sq) { |
|---|
| | 70 | UpdateListener l = null; |
|---|
| | 71 | try { |
|---|
| | 72 | Constructor<UpdateListener> con; |
|---|
| | 73 | IMQBroker broker = eh.getBroker(); |
|---|
| | 74 | con = broker.getListenerClass().getDeclaredConstructor( |
|---|
| | 75 | new Class[] { EPServiceProvider.class, StratconQuery.class, broker.getClass(), |
|---|
| | 76 | String.class, String.class } |
|---|
| | 77 | ); |
|---|
| | 78 | l = con.newInstance(eh.getService(), sq, broker, |
|---|
| | 79 | broker.getAlertExchangeName(), broker.getAlertRoutingKey()); |
|---|
| | 80 | } |
|---|
| | 81 | catch(java.lang.NoSuchMethodException e) {} |
|---|
| | 82 | catch(java.lang.InstantiationException ie) { } |
|---|
| | 83 | catch(java.lang.IllegalAccessException ie) { } |
|---|
| | 84 | catch(java.lang.reflect.InvocationTargetException ie) { } |
|---|
| | 85 | return l; |
|---|
| | 86 | } |
|---|
| | 87 | |
|---|
| 65 | 88 | public void handle(EventHandler eh) { |
|---|
| 66 | 89 | eh.deregisterQuery(getUUID()); |
|---|
| 67 | 90 | |
|---|
| 68 | 91 | EPStatement statement = eh.getService().getEPAdministrator().createEPL(getExpression()); |
|---|
| 69 | | UpdateListener o = eh.getBroker().getListener(eh.getService(), this); |
|---|
| | 92 | UpdateListener o = getListener(eh, this); |
|---|
| 70 | 93 | |
|---|
| 71 | 94 | statement.addListener(o); |
|---|