root/src/java/src/com/omniti/reconnoiter/broker/AMQListener.java

Revision e8b241b833cdda315cc463b322a89ba888c5e781, 2.9 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

juggle files around for cleaner builds.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  * The software in this package is published under the terms of the GPL license
5  * a copy of which can be found at:
6  * https://labs.omniti.com/reconnoiter/trunk/src/java/LICENSE
7  */
8
9 package com.omniti.reconnoiter.broker;
10
11 import com.omniti.reconnoiter.event.StratconQuery;
12 import java.lang.System;
13 import org.apache.activemq.ActiveMQConnectionFactory;
14 import javax.jms.Connection;
15 import javax.jms.Session;
16 import javax.jms.Destination;
17 import javax.jms.MessageProducer;
18 import javax.jms.DeliveryMode;
19 import javax.jms.TextMessage;
20 import javax.jms.JMSException;
21 import com.espertech.esper.client.EPServiceProvider;
22 import com.espertech.esper.client.EPStatement;
23 import com.espertech.esper.client.util.JSONEventRenderer;
24 import com.espertech.esper.client.EventBean;
25 import com.espertech.esper.client.UpdateListener;
26
27 public class AMQListener extends NoitListener implements Runnable {
28     private EPServiceProvider epService;
29     private EPStatement statement;
30     private ActiveMQConnectionFactory connectionFactory;
31     private Connection connection;
32     private Session session;
33     private Destination destination;
34     private MessageProducer producer;
35     private StratconQuery sq;
36
37     public AMQListener(EPServiceProvider epService, StratconQuery sq, AMQBroker broker, String binding, String unused) {
38       super();
39       try {
40         // we just need it started up
41         BrokerFactory.getAMQBrokerService();
42         this.epService = epService;
43         this.sq = sq;
44         this.statement = sq.getStatement();
45
46         connectionFactory = new ActiveMQConnectionFactory(binding);
47         connection = connectionFactory.createConnection();
48         connection.start();
49         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
50         destination = session.createTopic("noit.alerts." + sq.getName());
51
52         producer = session.createProducer(destination);
53         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
54         Thread thr = new Thread(this);
55         sq.setThread(thr);
56         thr.start();
57       } catch(Exception e) {
58         System.err.println("Cannot broker messages:");
59         e.printStackTrace();
60       }
61     }
62     public void processEvent(EventBean event) {
63         JSONEventRenderer jsonRenderer =
64             epService.getEPRuntime().
65                       getEventRenderer().
66                       getJSONRenderer(sq.getStatement().getEventType());
67         String output = jsonRenderer.render("r", event);
68         try {
69             TextMessage message = session.createTextMessage(output);
70             producer.send(message);
71         }
72         catch(JMSException e) {
73             System.err.println(e);
74         }
75         System.err.println(output);
76     }
77     public void run() {
78         while(sq.isActive()) {
79             try  { processEvent(queue.take()); }
80             catch (InterruptedException e) { }
81         }
82     }
83 }
Note: See TracBrowser for help on using the browser.