Changeset 4d679699dd35d6592904cae5bf13486f9394d779

Show
Ignore:
Timestamp:
01/26/10 05:09:54 (4 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1264482594 +0000
git-parent:

[edbe4308093106ba65240b5ef68fb0223b3a005b]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1264482594 +0000
Message:

queries need to know about the broker, so we need to reload these as we reconnect to the MQ, refs #246

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/com/omniti/reconnoiter/MQListener.java

    r31fab75 r4d67969  
    1616import java.util.concurrent.ConcurrentHashMap; 
    1717import java.util.LinkedList; 
     18import java.util.List; 
    1819import java.util.UUID; 
    1920 
     
    2324    private IMQBroker broker; 
    2425    private LinkedList<StratconMessage> preproc; 
     26    private LinkedList<StratconMessage> queries_toload; 
    2527    private boolean booted = false; 
    2628 
     
    3032      this.broker = broker; 
    3133      preproc = new LinkedList<StratconMessage>(); 
     34      queries_toload = new LinkedList<StratconMessage>(); 
    3235    } 
    3336 
    3437    public void preprocess(StratconMessage m) throws Exception { 
    3538      if(booted) throw new Exception("Already booted"); 
    36       preproc.add(m); 
     39      if(m instanceof StratconQuery) 
     40        queries_toload.add(m); 
     41      else 
     42        preproc.add(m); 
     43    } 
     44 
     45    protected void process(EventHandler eh, List<StratconMessage> l) { 
     46      for (StratconMessage m : l) { 
     47        try { eh.processMessage(m); } 
     48        catch (Exception e) { 
     49          System.err.println("Something went wrong preprocessing events:"); 
     50          e.printStackTrace(); 
     51        } 
     52      } 
    3753    } 
    3854     
    3955    public void run() { 
    4056      EventHandler eh = new EventHandler(queries, this.epService, broker); 
    41       for (StratconMessage m : preproc) { 
    42         try { eh.processMessage(m); } 
    43         catch (Exception e) { 
    44           System.err.println("Something went wrong preprocessing events:"); 
    45           e.printStackTrace(); 
    46           System.exit(-2); 
    47         } 
    48       } 
     57      process(eh, preproc); 
    4958      booted = true; 
    5059      while(true) { 
    5160        broker.connect(); 
     61        process(eh, queries_toload); 
    5262        try { broker.consume(eh); } catch (Exception anything) {} 
    5363        broker.disconnect(); 
  • src/java/com/omniti/reconnoiter/broker/RabbitListener.java

    rd9bf4bf r4d67969  
    3939      // Create the connection and add an exchange 
    4040      boolean passive = false, durable = true, autoDelete = false; 
    41 System.err.println("channel.exchangeDeclare -> " + exchangeName); 
     41      System.err.println("channel.exchangeDeclare -> " + exchangeName); 
    4242      channel.exchangeDeclare(exchangeName, "topic", passive, durable, autoDelete, null);   
    4343    } catch(Exception e) {