Changeset b4b7c7494c8f781ad72d6bab6201972638884362
- Timestamp:
- 04/15/11 15:16:52
(2 years ago)
- Author:
- Theo Schlossnagle <jesus@omniti.com>
- git-committer:
- Theo Schlossnagle <jesus@omniti.com> 1302880612 -0400
- git-parent:
[659d7f741d58716e2ca937d476b1d449ec825f88]
- git-author:
- Theo Schlossnagle <jesus@omniti.com> 1302880583 -0400
- Message:
upgrade to the RabbitMQ 2.4.1 client
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| rf6f81cb |
rb4b7c74 |
|
| 81 | 81 | lib/log4j-1.2.15.jar lib/spring-beans-2.5.5.jar lib/spring-context-2.5.5.jar \ |
|---|
| 82 | 82 | lib/cglib-nodep-2.2.jar lib/commons-pool-1.4.jar lib/commons-dbcp-1.2.2.jar \ |
|---|
| 83 | | lib/postgresql-8.3-604.jdbc3.jar lib/rabbitmq-client.jar lib/commons-io-1.2.jar \ |
|---|
| | 83 | lib/postgresql-8.3-604.jdbc3.jar lib/rabbitmq-client-2.4.1.jar lib/commons-io-1.2.jar \ |
|---|
| 84 | 84 | lib/commons-cli-1.1.jar lib/commons-logging-1.1.1.jar |
|---|
| 85 | 85 | |
|---|
| ra03ab64 |
rb4b7c74 |
|
| 12 | 12 | import java.lang.reflect.Constructor; |
|---|
| 13 | 13 | |
|---|
| | 14 | import org.apache.log4j.Logger; |
|---|
| 14 | 15 | import com.espertech.esper.client.EPServiceProvider; |
|---|
| 15 | 16 | import com.espertech.esper.client.UpdateListener; |
|---|
| … | … | |
| 20 | 21 | import com.rabbitmq.client.Channel; |
|---|
| 21 | 22 | import com.rabbitmq.client.ConnectionFactory; |
|---|
| 22 | | import com.rabbitmq.client.ConnectionParameters; |
|---|
| 23 | 23 | import com.rabbitmq.client.QueueingConsumer; |
|---|
| 24 | 24 | |
|---|
| 25 | 25 | |
|---|
| 26 | 26 | public class RabbitBroker implements IMQBroker { |
|---|
| | 27 | static Logger logger = Logger.getLogger(RabbitBroker.class.getName()); |
|---|
| | 28 | private int cidx; |
|---|
| 27 | 29 | private Connection conn; |
|---|
| 28 | 30 | private Channel channel; |
|---|
| … | … | |
| 31 | 33 | private String password; |
|---|
| 32 | 34 | private String virtualHost; |
|---|
| 33 | | private String hostName; |
|---|
| | 35 | private String hostName[]; |
|---|
| | 36 | private ConnectionFactory factory[]; |
|---|
| 34 | 37 | private int portNumber; |
|---|
| 35 | 38 | private String exchangeName; |
|---|
| … | … | |
| 40 | 43 | private String alertRoutingKey; |
|---|
| 41 | 44 | private String alertExchangeName; |
|---|
| | 45 | private Integer heartBeat; |
|---|
| | 46 | private Integer connectTimeout; |
|---|
| 42 | 47 | private Class listenerClass; |
|---|
| 43 | 48 | private Constructor<UpdateListener> con; |
|---|
| … | … | |
| 45 | 50 | @SuppressWarnings("unchecked") |
|---|
| 46 | 51 | public RabbitBroker(StratconConfig config) { |
|---|
| | 52 | this.cidx = 0; |
|---|
| 47 | 53 | this.userName = config.getBrokerParameter("username", "guest"); |
|---|
| 48 | 54 | this.password = config.getBrokerParameter("password", "guest"); |
|---|
| 49 | 55 | this.virtualHost = config.getBrokerParameter("virtualhost", "/"); |
|---|
| 50 | | this.hostName = config.getBrokerParameter("hostname", "127.0.0.1"); |
|---|
| | 56 | this.hostName = config.getBrokerParameter("hostname", "127.0.0.1").split(","); |
|---|
| 51 | 57 | this.portNumber = Integer.parseInt(config.getBrokerParameter("port", "5672")); |
|---|
| | 58 | this.heartBeat = Integer.parseInt(config.getBrokerParameter("heartbeat", "5000")); |
|---|
| | 59 | this.heartBeat = (this.heartBeat + 999) / 1000; // (ms -> seconds, rounding up) |
|---|
| | 60 | this.connectTimeout = Integer.parseInt(config.getBrokerParameter("connect_timeout", "5000")); |
|---|
| 52 | 61 | |
|---|
| 53 | 62 | String className = config.getBrokerParameter("listenerClass", "com.omniti.reconnoiter.broker.RabbitListener"); |
|---|
| … | … | |
| 73 | 82 | this.alertRoutingKey = config.getBrokerParameter("routingkey", "noit.alerts."); |
|---|
| 74 | 83 | this.alertExchangeName = config.getBrokerParameter("exchange", "noit.alerts"); |
|---|
| | 84 | |
|---|
| | 85 | this.factory = new ConnectionFactory[this.hostName.length]; |
|---|
| | 86 | for(int i = 0; i<hostName.length; i++) { |
|---|
| | 87 | this.factory[i] = new ConnectionFactory(); |
|---|
| | 88 | this.factory[i].setUsername(this.userName); |
|---|
| | 89 | this.factory[i].setPassword(this.password); |
|---|
| | 90 | this.factory[i].setVirtualHost(this.virtualHost); |
|---|
| | 91 | this.factory[i].setRequestedHeartbeat(this.heartBeat); |
|---|
| | 92 | this.factory[i].setConnectionTimeout(this.connectTimeout); |
|---|
| | 93 | this.factory[i].setPort(this.portNumber); |
|---|
| | 94 | this.factory[i].setHost(this.hostName[i]); |
|---|
| | 95 | } |
|---|
| 75 | 96 | } |
|---|
| 76 | 97 | |
|---|
| 77 | 98 | // |
|---|
| 78 | 99 | public void disconnect() { |
|---|
| | 100 | logger.info("AMQP disconnect."); |
|---|
| 79 | 101 | try { |
|---|
| 80 | 102 | channel.abort(); |
|---|
| … | … | |
| 89 | 111 | } |
|---|
| 90 | 112 | public void connect() throws Exception { |
|---|
| 91 | | ConnectionParameters params = new ConnectionParameters(); |
|---|
| 92 | | params.setUsername(userName); |
|---|
| 93 | | params.setPassword(password); |
|---|
| 94 | | params.setVirtualHost(virtualHost); |
|---|
| 95 | | params.setRequestedHeartbeat(0); |
|---|
| 96 | | ConnectionFactory factory = new ConnectionFactory(params); |
|---|
| 97 | | conn = factory.newConnection(hostName, portNumber); |
|---|
| | 113 | int offset = ++cidx % factory.length; |
|---|
| | 114 | logger.info("AMQP connect to " + this.hostName[offset]); |
|---|
| | 115 | conn = factory[offset].newConnection(); |
|---|
| 98 | 116 | |
|---|
| 99 | 117 | if(conn == null) throw new Exception("connection failed"); |
|---|
| … | … | |
| 103 | 121 | channel.exchangeDeclare(exchangeName, exchangeType, passive, durable, autoDelete, null); |
|---|
| 104 | 122 | exclusive = true; autoDelete = true; |
|---|
| 105 | | queueName = channel.queueDeclare(declaredQueueName, passive, durable, exclusive, autoDelete, null).getQueue(); |
|---|
| | 123 | queueName = channel.queueDeclare(declaredQueueName, durable, exclusive, autoDelete, null).getQueue(); |
|---|
| 106 | 124 | channel.queueBind(queueName, exchangeName, routingKey); |
|---|
| 107 | 125 | if(!routingKey.equals("")) |
|---|
| r6279a55 |
rb4b7c74 |
|
| 9 | 9 | package com.omniti.reconnoiter.broker; |
|---|
| 10 | 10 | |
|---|
| | 11 | import org.apache.log4j.Logger; |
|---|
| 11 | 12 | import com.espertech.esper.client.EPServiceProvider; |
|---|
| 12 | 13 | import com.espertech.esper.client.EPStatement; |
|---|
| … | … | |
| 19 | 20 | |
|---|
| 20 | 21 | public class RabbitListener implements UpdateListener { |
|---|
| | 22 | static Logger logger = Logger.getLogger(RabbitListener.class.getName()); |
|---|
| 21 | 23 | protected EPServiceProvider epService; |
|---|
| 22 | 24 | protected StratconQuery sq; |
|---|
| … | … | |
| 26 | 28 | |
|---|
| 27 | 29 | |
|---|
| 28 | | public RabbitListener(EPServiceProvider epService, StratconQuery sq, RabbitBroker broker, String exchangeName, String routingKey) { |
|---|
| | 30 | public RabbitListener(EPServiceProvider epService, StratconQuery sq, |
|---|
| | 31 | RabbitBroker broker, String exchangeName, String routingKey) { |
|---|
| 29 | 32 | try { |
|---|
| 30 | 33 | this.epService = epService; |
|---|
| … | … | |
| 36 | 39 | |
|---|
| 37 | 40 | // Create the connection and add an exchange |
|---|
| 38 | | boolean passive = false, durable = true, autoDelete = false; |
|---|
| 39 | | System.err.println("channel.exchangeDeclare -> " + exchangeName); |
|---|
| 40 | | broker.getChannel().exchangeDeclare(exchangeName, "topic", passive, durable, autoDelete, null); |
|---|
| | 41 | boolean internal = false, durable = true, autoDelete = false; |
|---|
| | 42 | logger.debug("channel.exchangeDeclare -> " + exchangeName); |
|---|
| | 43 | broker.getChannel().exchangeDeclare(exchangeName, "topic", durable, autoDelete, internal, null); |
|---|
| 41 | 44 | } catch(Exception e) { |
|---|
| 42 | 45 | e.printStackTrace(); |
|---|
| rf6f81cb |
rb4b7c74 |
|
| 15 | 15 | spring-beans-2.5.5.jar spring-context-2.5.5.jar \ |
|---|
| 16 | 16 | cglib-nodep-2.2.jar commons-pool-1.4.jar commons-dbcp-1.2.2.jar \ |
|---|
| 17 | | postgresql-8.3-604.jdbc3.jar rabbitmq-client.jar commons-io-1.2.jar \ |
|---|
| 18 | | commons-cli-1.1.jar commons-logging-1.1.1.jar" |
|---|
| | 17 | postgresql-8.3-604.jdbc3.jar rabbitmq-client-2.4.1.jar \ |
|---|
| | 18 | commons-io-1.2.jar commons-cli-1.1.jar commons-logging-1.1.1.jar" |
|---|
| 19 | 19 | |
|---|
| 20 | 20 | DIRS=". lib @datarootdir@/java/lib @datarootdir@/java" |
|---|