root/src/java/src/com/omniti/reconnoiter/broker/RabbitBroker.java

Revision e8b241b833cdda315cc463b322a89ba888c5e781, 6.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

juggle files around for cleaner builds.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * The software in this package is published under the terms of the GPL license
5  * a copy of which can be found at:
6  * https://labs.omniti.com/reconnoiter/trunk/src/java/LICENSE
7  */
8
9 package com.omniti.reconnoiter.broker;
10
11 import java.io.IOException;
12
13 import org.apache.log4j.Logger;
14 import com.omniti.reconnoiter.IEventHandler;
15 import com.omniti.reconnoiter.StratconConfig;
16 import com.omniti.reconnoiter.event.StratconQuery;
17 import com.rabbitmq.client.Connection;
18 import com.rabbitmq.client.Channel;
19 import com.rabbitmq.client.ConnectionFactory;
20 import com.rabbitmq.client.QueueingConsumer;
21
22
23 public class RabbitBroker implements IMQBroker  {
24   static Logger logger = Logger.getLogger(RabbitBroker.class.getName());
25   private int cidx;
26   private Connection conn;
27   private Channel channel;
28   private boolean noAck = true;
29   private String userName;
30   private String password;
31   private String virtualHost;
32   private String hostName[];
33   private ConnectionFactory factory[];
34   private int portNumber;
35   private String exchangeName;
36   private String exchangeType;
37   private String declaredQueueName;
38   private String queueName;
39   private String returnedQueueName;
40   private String routingKey;
41   private String alertRoutingKey;
42   private String alertExchangeName;
43   private Integer heartBeat;
44   private Integer connectTimeout;
45   private Class listenerClass;
46   private boolean exclusiveQueue;
47   private boolean durableQueue;
48   private boolean durableExchange;
49
50   public RabbitBroker(StratconConfig config) {
51     this.conn = null;
52     this.cidx = 0;
53     this.userName = config.getBrokerParameter("username", "guest");
54     this.password = config.getBrokerParameter("password", "guest");
55     this.virtualHost = config.getBrokerParameter("virtualhost", "/");
56     this.hostName = config.getBrokerParameter("hostname", "127.0.0.1").split(",");
57     this.portNumber = Integer.parseInt(config.getBrokerParameter("port", "5672"));
58     this.heartBeat = Integer.parseInt(config.getBrokerParameter("heartbeat", "5000"));
59     this.heartBeat = (this.heartBeat + 999) / 1000; // (ms -> seconds, rounding up)
60     this.connectTimeout = Integer.parseInt(config.getBrokerParameter("connect_timeout", "5000"));
61
62     String className = config.getBrokerParameter("listenerClass", "com.omniti.reconnoiter.broker.RabbitListener");
63     try {
64       this.listenerClass = Class.forName(className);
65     }
66     catch(java.lang.NoClassDefFoundError e) { }
67     catch(java.lang.ClassNotFoundException e) {
68       logger.warn("Class " + className + " not found.");
69     }
70
71     this.exchangeType = config.getMQParameter("exchangetype", "fanout");
72     this.durableExchange = config.getMQParameter("durableexchange", "false").equals("true");
73     this.exchangeName = config.getMQParameter("exchange", "noit.firehose");
74     this.exclusiveQueue = config.getMQParameter("exclusivequeue", "false").equals("true");
75     this.durableQueue = config.getMQParameter("durablequeue", "false").equals("true");
76     this.declaredQueueName = config.getMQParameter("queue", "reconnoiter-{node}-{pid}");
77     this.routingKey = config.getMQParameter("routingkey", "");
78  
79     try {
80       this.queueName = this.declaredQueueName;
81       this.queueName = this.queueName.replace("{node}",
82         java.net.InetAddress.getLocalHost().getHostName());
83       this.queueName = this.queueName.replace("{pid}",
84         java.lang.management.ManagementFactory.getRuntimeMXBean()
85                                               .getName().replaceAll("@.+$", ""));
86     } catch(java.net.UnknownHostException uhe) {
87       System.err.println("Cannot self identify for queuename!");
88       System.exit(-1);
89     }     
90
91     this.alertRoutingKey = config.getBrokerParameter("routingkey", "noit.alerts.");
92     this.alertExchangeName = config.getBrokerParameter("exchange", "noit.alerts");
93
94     this.factory = new ConnectionFactory[this.hostName.length];
95     for(int i = 0; i<hostName.length; i++) {
96       this.factory[i] = new ConnectionFactory();
97       this.factory[i].setUsername(this.userName);
98       this.factory[i].setPassword(this.password);
99       this.factory[i].setVirtualHost(this.virtualHost);
100       this.factory[i].setRequestedHeartbeat(this.heartBeat);
101       this.factory[i].setConnectionTimeout(this.connectTimeout);
102       this.factory[i].setPort(this.portNumber);
103       this.factory[i].setHost(this.hostName[i]);
104     }
105   }
106  
107   //
108   public void disconnect() {
109     logger.info("AMQP disconnect.");
110     try {
111       channel.abort();
112     }
113     catch (Exception e) { }
114     channel = null;
115     try {
116       conn.abort();
117     }
118     catch (Exception e) { }
119     conn = null;
120   }
121   public void connect() throws Exception {
122     if(conn != null) disconnect();
123
124     int offset = ++cidx % factory.length;
125     logger.info("AMQP connect to " + this.hostName[offset]);
126     conn = factory[offset].newConnection();
127
128     if(conn == null) throw new Exception("connection failed");
129
130     channel = conn.createChannel();
131     boolean exclusive = false, internal = false, autoDelete = false;
132     channel.exchangeDeclare(exchangeName, exchangeType,
133                             durableExchange, autoDelete, internal, null);
134     autoDelete = true;
135
136     returnedQueueName = channel.queueDeclare(queueName, durableQueue,
137                                              exclusiveQueue, autoDelete, null).getQueue();
138     for (String rk : routingKey.split(",")) {
139         if ( rk.equalsIgnoreCase("null") ) rk = "";
140         channel.queueBind(returnedQueueName, exchangeName, rk);
141     }
142   }
143   public Channel getChannel() { return channel; }
144  
145   public void consume(IEventHandler eh) throws IOException {
146     QueueingConsumer consumer = new QueueingConsumer(channel);
147
148     channel.basicConsume(returnedQueueName, noAck, consumer);
149    
150     while (true) {
151       QueueingConsumer.Delivery delivery;
152       try {
153         delivery = consumer.nextDelivery();
154       } catch (InterruptedException ie) {
155         continue;
156       }
157       // (process the message components ...)
158    
159       String xml = new String(delivery.getBody());
160       try {
161         eh.processMessage(xml);
162       } catch (Exception e) {
163         // TODO Auto-generated catch block
164         e.printStackTrace();
165       }
166       if(!noAck)
167         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
168     }
169   }
170
171   public Class getListenerClass() { return listenerClass; }
172   public String getAlertExchangeName() { return alertExchangeName; }
173   public String getAlertRoutingKey() { return alertRoutingKey; }
174 }
Note: See TracBrowser for help on using the browser.