Changeset 4d679699dd35d6592904cae5bf13486f9394d779
- Timestamp:
- 01/26/10 05:09:54
(3 years ago)
- Author:
- Theo Schlossnagle <jesus@omniti.com>
- git-committer:
- Theo Schlossnagle <jesus@omniti.com> 1264482594 +0000
- git-parent:
[edbe4308093106ba65240b5ef68fb0223b3a005b]
- git-author:
- Theo Schlossnagle <jesus@omniti.com> 1264482594 +0000
- Message:
queries need to know about the broker, so we need to reload these as we reconnect to the MQ, refs #246
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| r31fab75 |
r4d67969 |
|
| 16 | 16 | import java.util.concurrent.ConcurrentHashMap; |
|---|
| 17 | 17 | import java.util.LinkedList; |
|---|
| | 18 | import java.util.List; |
|---|
| 18 | 19 | import java.util.UUID; |
|---|
| 19 | 20 | |
|---|
| … | … | |
| 23 | 24 | private IMQBroker broker; |
|---|
| 24 | 25 | private LinkedList<StratconMessage> preproc; |
|---|
| | 26 | private LinkedList<StratconMessage> queries_toload; |
|---|
| 25 | 27 | private boolean booted = false; |
|---|
| 26 | 28 | |
|---|
| … | … | |
| 30 | 32 | this.broker = broker; |
|---|
| 31 | 33 | preproc = new LinkedList<StratconMessage>(); |
|---|
| | 34 | queries_toload = new LinkedList<StratconMessage>(); |
|---|
| 32 | 35 | } |
|---|
| 33 | 36 | |
|---|
| 34 | 37 | public void preprocess(StratconMessage m) throws Exception { |
|---|
| 35 | 38 | if(booted) throw new Exception("Already booted"); |
|---|
| 36 | | preproc.add(m); |
|---|
| | 39 | if(m instanceof StratconQuery) |
|---|
| | 40 | queries_toload.add(m); |
|---|
| | 41 | else |
|---|
| | 42 | preproc.add(m); |
|---|
| | 43 | } |
|---|
| | 44 | |
|---|
| | 45 | protected void process(EventHandler eh, List<StratconMessage> l) { |
|---|
| | 46 | for (StratconMessage m : l) { |
|---|
| | 47 | try { eh.processMessage(m); } |
|---|
| | 48 | catch (Exception e) { |
|---|
| | 49 | System.err.println("Something went wrong preprocessing events:"); |
|---|
| | 50 | e.printStackTrace(); |
|---|
| | 51 | } |
|---|
| | 52 | } |
|---|
| 37 | 53 | } |
|---|
| 38 | 54 | |
|---|
| 39 | 55 | public void run() { |
|---|
| 40 | 56 | EventHandler eh = new EventHandler(queries, this.epService, broker); |
|---|
| 41 | | for (StratconMessage m : preproc) { |
|---|
| 42 | | try { eh.processMessage(m); } |
|---|
| 43 | | catch (Exception e) { |
|---|
| 44 | | System.err.println("Something went wrong preprocessing events:"); |
|---|
| 45 | | e.printStackTrace(); |
|---|
| 46 | | System.exit(-2); |
|---|
| 47 | | } |
|---|
| 48 | | } |
|---|
| | 57 | process(eh, preproc); |
|---|
| 49 | 58 | booted = true; |
|---|
| 50 | 59 | while(true) { |
|---|
| 51 | 60 | broker.connect(); |
|---|
| | 61 | process(eh, queries_toload); |
|---|
| 52 | 62 | try { broker.consume(eh); } catch (Exception anything) {} |
|---|
| 53 | 63 | broker.disconnect(); |
|---|
| rd9bf4bf |
r4d67969 |
|
| 39 | 39 | // Create the connection and add an exchange |
|---|
| 40 | 40 | boolean passive = false, durable = true, autoDelete = false; |
|---|
| 41 | | System.err.println("channel.exchangeDeclare -> " + exchangeName); |
|---|
| | 41 | System.err.println("channel.exchangeDeclare -> " + exchangeName); |
|---|
| 42 | 42 | channel.exchangeDeclare(exchangeName, "topic", passive, durable, autoDelete, null); |
|---|
| 43 | 43 | } catch(Exception e) { |
|---|