Changeset 4c3fb9bb20f55d05939b6f601cc27622060215e8

Show
Ignore:
Timestamp:
07/14/09 02:29:36 (5 years ago)
Author:
Dan Di Spaltro <dan.dispaltro@gmail.com>
git-committer:
Dan Di Spaltro <dan.dispaltro@gmail.com> 1247538576 +0000
git-parent:

[3db67ae5b26eab5ce769c235b8a2e672325cadb7]

git-author:
Dan Di Spaltro <dan.dispaltro@gmail.com> 1247538576 +0000
Message:

first go at integrating RMQ and AMQ and giving the user a choice on configuration options, refs #157

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/Makefile.in

    r4fd2f1f r4c3fb9b  
    2222top_srcdir=@top_srcdir@ 
    2323 
    24 JAVA=com/omniti/reconnoiter/AMQBrokerSingleton.java \ 
    25         com/omniti/reconnoiter/AMQListener.java \ 
    26         com/omniti/reconnoiter/AMQOutput.java \ 
     24JAVA=com/omniti/reconnoiter/MQListener.java \ 
     25        com/omniti/reconnoiter/EventHandler.java \ 
     26        com/omniti/reconnoiter/broker/RabbitBroker.java \ 
     27        com/omniti/reconnoiter/broker/IMQBroker.java \ 
     28        com/omniti/reconnoiter/broker/AMQBroker.java \ 
     29        com/omniti/reconnoiter/broker/RabbitListener.java \ 
     30        com/omniti/reconnoiter/broker/AMQListener.java \ 
     31        com/omniti/reconnoiter/broker/BrokerFactory.java \ 
    2732        com/omniti/reconnoiter/event/NoitEvent.java \ 
    2833        com/omniti/reconnoiter/event/NoitMetricNumeric.java \ 
     
    3641        lib/log4j-1.2.15.jar lib/spring-beans-2.5.5.jar lib/spring-context-2.5.5.jar \ 
    3742        lib/cglib-nodep-2.2.jar lib/commons-pool-1.4.jar lib/commons-dbcp-1.2.2.jar \ 
    38         lib/postgresql-8.3-604.jdbc3.jar 
     43        lib/postgresql-8.3-604.jdbc3.jar lib/rabbitmq-client.jar lib/commons-io-1.2.jar \ 
     44        lib/commons-cli-1.1.jar 
    3945 
    4046all:    reconnoiter.jar 
  • src/java/com/omniti/reconnoiter/AMQListener.java

    r2220278 r4c3fb9b  
    1 /* 
    2  * Copyright (c) 2009, OmniTI Computer Consulting, Inc. 
    3  * All rights reserved. 
    4  * The software in this package is published under the terms of the GPL license 
    5  * a copy of which can be found at: 
    6  * https://labs.omniti.com/reconnoiter/trunk/src/java/LICENSE 
    7  */ 
    8  
    9 package com.omniti.reconnoiter; 
    10  
    11 import com.omniti.reconnoiter.AMQBrokerSingleton; 
    12 import com.omniti.reconnoiter.StratconMessage; 
    13 import com.omniti.reconnoiter.event.*; 
    14 import java.lang.System; 
    15 import java.lang.Runnable; 
    16 import org.apache.activemq.ActiveMQConnectionFactory; 
    17 import javax.jms.Connection; 
    18 import javax.jms.Session; 
    19 import javax.jms.Destination; 
    20 import javax.jms.MessageConsumer; 
    21 import javax.jms.DeliveryMode; 
    22 import javax.jms.Message; 
    23 import javax.jms.TextMessage; 
    24 import javax.jms.JMSException; 
    25 import com.espertech.esper.client.EventBean; 
    26 import com.espertech.esper.client.UpdateListener; 
    27 import com.espertech.esper.client.EPServiceProvider; 
    28 import com.espertech.esper.client.EPStatement; 
    29 import java.util.concurrent.ConcurrentHashMap; 
    30 import java.util.UUID; 
    31  
    32 public class AMQListener implements Runnable { 
    33     private EPServiceProvider epService; 
    34     private ActiveMQConnectionFactory connectionFactory; 
    35     private Connection connection; 
    36     private Session session; 
    37     private Destination destination; 
    38     private MessageConsumer consumer; 
    39     private ConcurrentHashMap<UUID,StratconQuery> queries; 
    40  
    41     public AMQListener(EPServiceProvider epService) { 
    42       this.queries = new ConcurrentHashMap<UUID,StratconQuery>(); 
    43       this.epService = epService; 
    44       try { 
    45         // we just need it started up 
    46         AMQBrokerSingleton.getBroker(); 
    47  
    48         connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 
    49         connection = connectionFactory.createConnection(); 
    50         connection.start(); 
    51         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    52         destination = session.createQueue("noit.firehose"); 
    53  
    54         consumer = session.createConsumer(destination); 
    55       } catch(Exception e) { 
    56         System.err.println("Cannot broker messages"); 
    57       } 
    58     } 
    59     public void run() { 
    60       while(true) { 
    61         Message message = null; 
    62         try { 
    63           message = consumer.receive(1000); 
    64         } catch(Exception e) { 
    65            
    66         } 
    67         if (message != null && message instanceof TextMessage) { 
    68           TextMessage textMessage = (TextMessage) message; 
    69           try { 
    70             String xml = textMessage.getText(); 
    71             StratconMessage m = StratconMessage.makeMessage(xml); 
    72             if(m == null) { 
    73               System.err.println("Can't grok:\n" + xml); 
    74             } 
    75             if(m instanceof StratconQuery) { 
    76               StratconQuery sq = (StratconQuery) m; 
    77  
    78               if(queries.containsKey(sq.getUUID())) throw (new Exception("Duplicate Query")); 
    79  
    80               EPStatement statement = epService.getEPAdministrator().createEPL(sq.getExpression()); 
    81               AMQOutput o = new AMQOutput(epService, sq); 
    82  
    83               statement.addListener(o); 
    84               sq.setStatement(statement); 
    85               sq.setListener(o); 
    86               queries.put(sq.getUUID(), sq); 
    87               System.err.println("Creating Query: " + sq.getUUID()); 
    88             } 
    89             else if(m instanceof StratconQueryStop) { 
    90               StratconQuery sq = queries.get(((StratconQueryStop) m).getUUID()); 
    91               if(sq != null) { 
    92                 queries.remove(sq.getUUID()); 
    93                 sq.destroy(); 
    94               } 
    95             } 
    96             else if(m instanceof NoitEvent) { 
    97               epService.getEPRuntime().sendEvent(((NoitEvent) m).getDocument()); 
    98             } 
    99             else if(m instanceof NoitMetricNumeric) { 
    100               epService.getEPRuntime().sendEvent((NoitMetricNumeric) m); 
    101             } 
    102           } catch(Exception ie) { 
    103             System.err.println(ie); 
    104           } 
    105         } 
    106       } 
    107     } 
    108 } 
  • src/java/com/omniti/reconnoiter/IEPEngine.java

    r4fd2f1f r4c3fb9b  
    1212import java.io.BufferedReader; 
    1313import java.io.InputStreamReader; 
    14 import com.omniti.reconnoiter.AMQListener; 
     14import com.omniti.reconnoiter.MQListener; 
     15import com.omniti.reconnoiter.broker.BrokerFactory; 
    1516import com.omniti.reconnoiter.event.NoitEvent; 
    16 import com.omniti.reconnoiter.event.NoitMetricNumeric; 
    1717import com.omniti.reconnoiter.StratconConfig; 
    1818import com.espertech.esper.client.*; 
    19 import com.espertech.esper.client.soda.*; 
    20  
    2119import org.apache.log4j.BasicConfigurator; 
    2220 
     
    3937    epService.getEPAdministrator().createEPL("insert into CheckDetails select * from NoitCheck"); 
    4038 
    41     AMQListener l = new AMQListener(epService); 
     39    MQListener l = new MQListener(epService, BrokerFactory.getBroker(sconf)); 
    4240 
    4341    Thread listener = new Thread(l); 
  • src/java/com/omniti/reconnoiter/StratconConfig.java

    r4fd2f1f r4c3fb9b  
    2020import javax.xml.xpath.XPathFactory; 
    2121import javax.xml.xpath.XPathConstants; 
    22 import org.xml.sax.InputSource; 
    2322import org.xml.sax.SAXException; 
    2423 
     
    2827import org.w3c.dom.NodeList; 
    2928 
     29 
    3030public class StratconConfig { 
    3131  private Document doc; 
     32   
     33   
     34 
    3235 
    3336  public StratconConfig(String filename) { 
     
    7578    return null; 
    7679  } 
     80  public String getBroker() { 
     81    XPathFactory factory = XPathFactory.newInstance(); 
     82    XPath xpath = factory.newXPath(); 
     83    Object result; 
     84    try { 
     85      XPathExpression expr = 
     86        xpath.compile("/stratcon/iep/broker/@adapter"); 
     87      result = expr.evaluate(doc, XPathConstants.NODESET); 
     88    } 
     89    catch(XPathExpressionException e) { 
     90      System.err.println("Bad expression: " + e.getMessage()); 
     91      return null; 
     92    } 
     93    NodeList nodes = (NodeList) result; 
     94    if(nodes.getLength() > 0) { 
     95      return nodes.item(nodes.getLength() -1).getNodeValue(); 
     96    } 
     97    return null; 
     98  } 
     99   
     100  public String getBrokerParameter(String param, String or) { 
     101    String result = getBrokerParameter(param); 
     102    if (result == null) 
     103      return or; 
     104    return result; 
     105  } 
     106   
     107  public String getBrokerParameter(String param) { 
     108    return getIepParameter("broker", param); 
     109  } 
     110   
     111  public String getStompParameter(String param, String or) { 
     112    String result = getStompParameter(param); 
     113    if (result == null) 
     114      return or; 
     115    return result; 
     116  } 
     117   
     118  public String getStompParameter(String param) { 
     119    return getIepParameter("stomp", param); 
     120  } 
     121   
     122  public String getIepParameter(String which, String param) { 
     123    XPathFactory factory = XPathFactory.newInstance(); 
     124    XPath xpath = factory.newXPath(); 
     125    Object result; 
     126    try { 
     127      XPathExpression expr = 
     128        xpath.compile("/stratcon/iep/" + which + "/" + param + "/text()"); 
     129      result = expr.evaluate(doc, XPathConstants.NODESET); 
     130    } 
     131    catch(XPathExpressionException e) { 
     132      System.err.println("Bad expression: " + e.getMessage()); 
     133      return null; 
     134    } 
     135    NodeList nodes = (NodeList) result; 
     136    if(nodes.getLength() > 0) { 
     137      return nodes.item(nodes.getLength() -1).getNodeValue(); 
     138    } 
     139    return null; 
     140  } 
    77141  public ConfigurationDBRef getDBConfig() { 
    78142    Properties props = new Properties(); 
  • src/java/run-iep.sh.in

    r4fd2f1f r4c3fb9b  
    77        spring-beans-2.5.5.jar spring-context-2.5.5.jar \ 
    88        cglib-nodep-2.2.jar commons-pool-1.4.jar commons-dbcp-1.2.2.jar \ 
    9         postgresql-8.3-604.jdbc3.jar" 
     9        postgresql-8.3-604.jdbc3.jar rabbitmq-client.jar commons-io-1.2.jar \ 
     10        commons-cli-1.1.jar" 
    1011 
    11 DIRS=". lib @prefix@/java" 
     12DIRS=". @prefix@/java/lib @prefix@/java" 
    1213CP= 
    1314 
  • src/lua/src/lobject.c

    r22d1941 r4c3fb9b  
    77#include <ctype.h> 
    88#include <stdarg.h> 
     9#undef __USE_XOPEN2K8  
    910#include <stdio.h> 
    1011#include <stdlib.h> 
  • src/stratcon_iep.c

    r4fd2f1f r4c3fb9b  
    6969#endif 
    7070  apr_pool_t *pool; 
     71  char* exchange; 
    7172}; 
    7273pthread_key_t iep_connection; 
     
    288289    int port; 
    289290    char hostname[128]; 
    290     if(!noit_conf_get_int(NULL, "/stratcon/iep/port", &port)) 
     291    if(!noit_conf_get_int(NULL, "/stratcon/iep/stomp/port", &port)) 
    291292      port = 61613; 
    292     if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/hostname", 
     293    if(!noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/hostname", 
    293294                                hostname, sizeof(hostname))) 
    294295      strlcpy(hostname, "127.0.0.1", sizeof(hostname)); 
     
    313314    { 
    314315      stomp_frame frame; 
     316      char username[128]; 
     317      char password[128]; 
     318      char* exchange = malloc(128); 
    315319      frame.command = "CONNECT"; 
    316320      frame.headers = apr_hash_make(driver->pool); 
     321      // This is for RabbitMQ Support 
     322      if((noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/username", 
     323                                  username, sizeof(username))) && 
     324         (noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/password", 
     325                                  password, sizeof(password)))) 
     326      { 
     327        apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, username); 
     328        apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, password); 
     329      } 
     330 
     331 
     332      // This is for RabbitMQ support 
     333      driver->exchange = NULL; 
     334      if(noit_conf_get_stringbuf(NULL, "/stratcon/iep/stomp/exchange", 
     335                                  exchange, 128)) 
     336      { 
     337        if (!driver->exchange) 
     338          driver->exchange = exchange; 
     339        else 
     340          free(exchange); 
     341        apr_hash_set(frame.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
     342      } 
     343 
     344 
     345 
    317346/* 
    318347      We don't use login/pass 
     
    408437        out.command = "SEND"; 
    409438        out.headers = apr_hash_make(job->pool); 
     439        if (driver->exchange) 
     440          apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
     441 
    410442        apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 
    411443        apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 
     
    488520#else 
    489521  if(driver->connection) stomp_disconnect(&driver->connection); 
     522  if(driver->exchange) free(driver->exchange); 
    490523#endif 
    491524  if(driver->pool) apr_pool_destroy(driver->pool);