root/src/java/src/com/omniti/reconnoiter/broker/AMQBroker.java

Revision e8b241b833cdda315cc463b322a89ba888c5e781, 2.6 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

juggle files around for cleaner builds.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * The software in this package is published under the terms of the GPL license
5  * a copy of which can be found at:
6  * https://labs.omniti.com/reconnoiter/trunk/src/java/LICENSE
7  */
8
9 package com.omniti.reconnoiter.broker;
10
11 import java.lang.reflect.Constructor;
12
13 import javax.jms.Connection;
14 import javax.jms.Destination;
15 import javax.jms.MessageConsumer;
16 import javax.jms.Session;
17 import javax.jms.Message;
18 import javax.jms.TextMessage;
19
20 import org.apache.activemq.ActiveMQConnectionFactory;
21
22 import com.espertech.esper.client.EPServiceProvider;
23 import com.omniti.reconnoiter.IEventHandler;
24 import com.omniti.reconnoiter.StratconConfig;
25 import com.omniti.reconnoiter.event.StratconQuery;
26
27 public class AMQBroker implements IMQBroker {
28   private String hostName;
29   private int portNumber;
30   private Class listenerClass;
31
32   @SuppressWarnings("unchecked")
33   public AMQBroker(StratconConfig config) {
34     this.hostName = config.getBrokerParameter("hostname", "127.0.0.1");
35     this.portNumber = Integer.parseInt(config.getBrokerParameter("port", "61613"));
36     String className = config.getBrokerParameter("listenerClass", "com.omniti.reconnoiter.broker.AMQListener");
37     try {
38       this.listenerClass = Class.forName(className);
39     }
40     catch(java.lang.ClassNotFoundException e) {
41     }
42   }
43
44   private MessageConsumer consumer;
45
46   public void disconnect() {
47   }
48   public void connect() throws Exception {
49     BrokerFactory.getAMQBrokerService("stomp://" + hostName + ":" + portNumber);
50     ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("vm://localhost");
51
52     Connection connection=connectionFactory.createConnection();
53     connection.start();
54     Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
55     Destination destination=session.createQueue("noit.firehose");
56
57     consumer = session.createConsumer(destination);
58   }
59  
60   public void consume(IEventHandler eh) {
61     while (true) {
62       Message message = null;
63       try {
64         message = consumer.receive(1000);
65       } catch(Exception ignored) {
66       }
67       if (message != null && message instanceof TextMessage) {
68         TextMessage textMessage = (TextMessage) message;
69         try {
70           String xml = textMessage.getText();
71           eh.processMessage(xml);
72         } catch(Exception ie) {
73           System.err.println(ie);
74         }
75       }
76     }
77   }
78
79   public Class getListenerClass() { return listenerClass; }
80   public String getAlertRoutingKey() { return ""; }
81   public String getAlertExchangeName() { return "vm://localhost"; }
82 }
Note: See TracBrowser for help on using the browser.