Changeset 5e2ef1b77ed96b19cce1cfe2406c122478a53dd6

Show
Ignore:
Timestamp:
01/22/10 16:24:20 (4 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1264177460 +0000
git-parent:

[d3854117f2d3202c93f2df208dbeee0d2f82e529]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1264177460 +0000
Message:

fixes #242

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/com/omniti/reconnoiter/MQListener.java

    r699c97c r5e2ef1b  
    2626      this.epService = epService; 
    2727      this.broker = broker; 
    28       broker.connect(); 
    2928    } 
    3029     
    3130    public void run() { 
    3231      EventHandler eh = new EventHandler(queries, this.epService, broker); 
    33       broker.consume(eh); 
     32      while(true) { 
     33        broker.connect(); 
     34        try { broker.consume(eh); } catch (Exception anything) {} 
     35        broker.disconnect(); 
     36        try { Thread.sleep(1000); } catch (InterruptedException ignore) {} 
     37      } 
    3438    } 
    3539} 
  • src/java/com/omniti/reconnoiter/broker/AMQBroker.java

    r54eb3fe r5e2ef1b  
    3434  private MessageConsumer consumer; 
    3535 
     36  public void disconnect() { 
     37  } 
    3638  public void connect() { 
    3739    BrokerFactory.getAMQBrokerService("stomp://" + hostName + ":" + portNumber); 
  • src/java/com/omniti/reconnoiter/broker/IMQBroker.java

    r54eb3fe r5e2ef1b  
    1717   
    1818  public void connect(); 
     19  public void disconnect(); 
    1920  public void consume(EventHandler eh); 
    2021  public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq); 
  • src/java/com/omniti/reconnoiter/broker/RabbitBroker.java

    r54eb3fe r5e2ef1b  
    5757   
    5858  //  
     59  public void disconnect() { 
     60    try { 
     61      channel.getConnection().abort(); 
     62      channel.abort(); 
     63    } 
     64    catch (Exception e) { } 
     65  } 
    5966  public void connect() { 
    6067    ConnectionParameters params = new ConnectionParameters(); 
     
    6774      Connection conn = factory.newConnection(hostName, portNumber); 
    6875      channel = conn.createChannel(); 
    69        
    70       channel.exchangeDeclare(exchangeName, "fanout"); 
    71       channel.queueDeclare(queueName); 
     76 
     77      boolean passive = false, exclusive = true, durable = true, autoDelete = false; 
     78      channel.exchangeDeclare(exchangeName, "fanout", passive, durable, autoDelete, null); 
     79      channel.queueDeclare(queueName, passive, durable, exclusive, autoDelete, null); 
    7280      channel.queueBind(queueName, exchangeName, routingKey); 
    7381