Changeset b4b7c7494c8f781ad72d6bab6201972638884362

Show
Ignore:
Timestamp:
04/15/11 15:16:52 (3 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1302880612 -0400
git-parent:

[659d7f741d58716e2ca937d476b1d449ec825f88]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1302880583 -0400
Message:

upgrade to the RabbitMQ 2.4.1 client

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/Makefile.in

    rf6f81cb rb4b7c74  
    8181        lib/log4j-1.2.15.jar lib/spring-beans-2.5.5.jar lib/spring-context-2.5.5.jar \ 
    8282        lib/cglib-nodep-2.2.jar lib/commons-pool-1.4.jar lib/commons-dbcp-1.2.2.jar \ 
    83         lib/postgresql-8.3-604.jdbc3.jar lib/rabbitmq-client.jar lib/commons-io-1.2.jar \ 
     83        lib/postgresql-8.3-604.jdbc3.jar lib/rabbitmq-client-2.4.1.jar lib/commons-io-1.2.jar \ 
    8484        lib/commons-cli-1.1.jar lib/commons-logging-1.1.1.jar 
    8585 
  • src/java/com/omniti/reconnoiter/broker/RabbitBroker.java

    ra03ab64 rb4b7c74  
    1212import java.lang.reflect.Constructor; 
    1313 
     14import org.apache.log4j.Logger; 
    1415import com.espertech.esper.client.EPServiceProvider; 
    1516import com.espertech.esper.client.UpdateListener; 
     
    2021import com.rabbitmq.client.Channel; 
    2122import com.rabbitmq.client.ConnectionFactory; 
    22 import com.rabbitmq.client.ConnectionParameters; 
    2323import com.rabbitmq.client.QueueingConsumer; 
    2424 
    2525 
    2626public class RabbitBroker implements IMQBroker  { 
     27  static Logger logger = Logger.getLogger(RabbitBroker.class.getName()); 
     28  private int cidx; 
    2729  private Connection conn; 
    2830  private Channel channel; 
     
    3133  private String password; 
    3234  private String virtualHost; 
    33   private String hostName; 
     35  private String hostName[]; 
     36  private ConnectionFactory factory[]; 
    3437  private int portNumber; 
    3538  private String exchangeName; 
     
    4043  private String alertRoutingKey; 
    4144  private String alertExchangeName; 
     45  private Integer heartBeat; 
     46  private Integer connectTimeout; 
    4247  private Class listenerClass; 
    4348  private Constructor<UpdateListener> con; 
     
    4550  @SuppressWarnings("unchecked")  
    4651  public RabbitBroker(StratconConfig config) { 
     52    this.cidx = 0; 
    4753    this.userName = config.getBrokerParameter("username", "guest"); 
    4854    this.password = config.getBrokerParameter("password", "guest"); 
    4955    this.virtualHost = config.getBrokerParameter("virtualhost", "/"); 
    50     this.hostName = config.getBrokerParameter("hostname", "127.0.0.1")
     56    this.hostName = config.getBrokerParameter("hostname", "127.0.0.1").split(",")
    5157    this.portNumber = Integer.parseInt(config.getBrokerParameter("port", "5672")); 
     58    this.heartBeat = Integer.parseInt(config.getBrokerParameter("heartbeat", "5000")); 
     59    this.heartBeat = (this.heartBeat + 999) / 1000; // (ms -> seconds, rounding up) 
     60    this.connectTimeout = Integer.parseInt(config.getBrokerParameter("connect_timeout", "5000")); 
    5261     
    5362    String className = config.getBrokerParameter("listenerClass", "com.omniti.reconnoiter.broker.RabbitListener"); 
     
    7382    this.alertRoutingKey = config.getBrokerParameter("routingkey", "noit.alerts."); 
    7483    this.alertExchangeName = config.getBrokerParameter("exchange", "noit.alerts"); 
     84 
     85    this.factory = new ConnectionFactory[this.hostName.length]; 
     86    for(int i = 0; i<hostName.length; i++) { 
     87      this.factory[i] = new ConnectionFactory(); 
     88      this.factory[i].setUsername(this.userName); 
     89      this.factory[i].setPassword(this.password); 
     90      this.factory[i].setVirtualHost(this.virtualHost); 
     91      this.factory[i].setRequestedHeartbeat(this.heartBeat); 
     92      this.factory[i].setConnectionTimeout(this.connectTimeout); 
     93      this.factory[i].setPort(this.portNumber); 
     94      this.factory[i].setHost(this.hostName[i]); 
     95    } 
    7596  } 
    7697   
    7798  //  
    7899  public void disconnect() { 
     100    logger.info("AMQP disconnect."); 
    79101    try { 
    80102      channel.abort(); 
     
    89111  } 
    90112  public void connect() throws Exception { 
    91     ConnectionParameters params = new ConnectionParameters(); 
    92     params.setUsername(userName); 
    93     params.setPassword(password); 
    94     params.setVirtualHost(virtualHost); 
    95     params.setRequestedHeartbeat(0); 
    96     ConnectionFactory factory = new ConnectionFactory(params); 
    97     conn = factory.newConnection(hostName, portNumber); 
     113    int offset = ++cidx % factory.length; 
     114    logger.info("AMQP connect to " + this.hostName[offset]); 
     115    conn = factory[offset].newConnection(); 
    98116 
    99117    if(conn == null) throw new Exception("connection failed"); 
     
    103121    channel.exchangeDeclare(exchangeName, exchangeType, passive, durable, autoDelete, null); 
    104122    exclusive = true; autoDelete = true; 
    105     queueName = channel.queueDeclare(declaredQueueName, passive, durable, exclusive, autoDelete, null).getQueue(); 
     123    queueName = channel.queueDeclare(declaredQueueName, durable, exclusive, autoDelete, null).getQueue(); 
    106124    channel.queueBind(queueName, exchangeName, routingKey); 
    107125    if(!routingKey.equals("")) 
  • src/java/com/omniti/reconnoiter/broker/RabbitListener.java

    r6279a55 rb4b7c74  
    99package com.omniti.reconnoiter.broker; 
    1010 
     11import org.apache.log4j.Logger; 
    1112import com.espertech.esper.client.EPServiceProvider; 
    1213import com.espertech.esper.client.EPStatement; 
     
    1920 
    2021public class RabbitListener implements UpdateListener { 
     22  static Logger logger = Logger.getLogger(RabbitListener.class.getName()); 
    2123  protected EPServiceProvider epService; 
    2224  protected StratconQuery sq; 
     
    2628 
    2729 
    28   public RabbitListener(EPServiceProvider epService, StratconQuery sq, RabbitBroker broker, String exchangeName, String routingKey) { 
     30  public RabbitListener(EPServiceProvider epService, StratconQuery sq, 
     31                        RabbitBroker broker, String exchangeName, String routingKey) { 
    2932    try { 
    3033      this.epService = epService; 
     
    3639       
    3740      // Create the connection and add an exchange 
    38       boolean passive = false, durable = true, autoDelete = false; 
    39       System.err.println("channel.exchangeDeclare -> " + exchangeName); 
    40       broker.getChannel().exchangeDeclare(exchangeName, "topic", passive, durable, autoDelete, null);   
     41      boolean internal = false, durable = true, autoDelete = false; 
     42      logger.debug("channel.exchangeDeclare -> " + exchangeName); 
     43      broker.getChannel().exchangeDeclare(exchangeName, "topic", durable, autoDelete, internal, null);   
    4144    } catch(Exception e) { 
    4245      e.printStackTrace(); 
  • src/java/run-iep.sh.in

    rf6f81cb rb4b7c74  
    1515        spring-beans-2.5.5.jar spring-context-2.5.5.jar \ 
    1616        cglib-nodep-2.2.jar commons-pool-1.4.jar commons-dbcp-1.2.2.jar \ 
    17         postgresql-8.3-604.jdbc3.jar rabbitmq-client.jar commons-io-1.2.jar \ 
    18         commons-cli-1.1.jar commons-logging-1.1.1.jar" 
     17        postgresql-8.3-604.jdbc3.jar rabbitmq-client-2.4.1.jar \ 
     18        commons-io-1.2.jar commons-cli-1.1.jar commons-logging-1.1.1.jar" 
    1919 
    2020DIRS=". lib @datarootdir@/java/lib @datarootdir@/java"