[Reconnoiter-devel] Patching Esper to be more efficient

Dan Di Spaltro dan.dispaltro at gmail.com
Wed Sep 23 12:33:45 EDT 2009


I've got another one.  Fixed some bugs in the parsing.  I've got a
pretty dirty tree right now, so hopefully I didn't forget anything.

-Dan

On Wed, Sep 16, 2009 at 12:58 AM, Dan Di Spaltro
<dan.dispaltro at gmail.com> wrote:
> Well folks,
>
> I created a patch to replace the XML parsing with basically straight
> jlog parsing in an effort to control memory usage.  Esper was/is
> growing out of control on our system and to alleviate that I figured a
> quick place to start would be to rip out all that xml and make the
> protocol simpler and the objects more efficient.
>
> To be honest tracking the memory usage is pretty decent in java; but
> its still hard to get a feel if the program is leaking or the VM is
> just warming up, right now it expects something like 5k checks every
> 2.5 mins which could end up being about 16-20k messages every 2.5
> mins.  So at this rate I figured it was worth spending some hours
> tackling this easy problem and get a bit more performance without a
> lot of code.
>
> I didn't do a very good job isolating the changes, you'll notice the
> upgrade to Esper 3.2.  I figured this is all a moving target and saw
> the word "performance improvements" in their release notes.  So I am
> trying it out.
>
> Thanks,
>
> --
> Dan Di Spaltro
>



-- 
Dan Di Spaltro
-------------- next part --------------
Index: Makefile.in
===================================================================
--- Makefile.in	(revision 866)
+++ Makefile.in	(working copy)
@@ -29,7 +29,9 @@
 	com/omniti/reconnoiter/broker/RabbitListener.java \
 	com/omniti/reconnoiter/broker/AMQListener.java \
 	com/omniti/reconnoiter/broker/BrokerFactory.java \
-	com/omniti/reconnoiter/event/NoitEvent.java \
+	com/omniti/reconnoiter/event/NoitCheck.java \
+	com/omniti/reconnoiter/event/NoitStatus.java \
+	com/omniti/reconnoiter/event/NoitMetricText.java \
 	com/omniti/reconnoiter/event/NoitMetricNumeric.java \
 	com/omniti/reconnoiter/event/StratconStatement.java \
 	com/omniti/reconnoiter/event/StratconQueryBase.java \
Index: run-iep.sh.in
===================================================================
--- run-iep.sh.in	(revision 866)
+++ run-iep.sh.in	(working copy)
@@ -1,9 +1,9 @@
 #!/bin/sh
 
 JAVA=@JAVA@
-JPARAMS="-Xms32m -Xmx512m -XX:+UseConcMarkSweepGC"
+JPARAMS="-Xms32m -Xmx512m -server -XX:+UseConcMarkSweepGC -XX:+UseParNewGC"
 JARS="reconnoiter.jar activemq-all-5.2.0.jar antlr-runtime-3.1.1.jar \
-	esper-3.1.0.jar log4j-1.2.15.jar \
+	esper-3.2.0.jar log4j-1.2.15.jar \
 	spring-beans-2.5.5.jar spring-context-2.5.5.jar \
 	cglib-nodep-2.2.jar commons-pool-1.4.jar commons-dbcp-1.2.2.jar \
 	postgresql-8.3-604.jdbc3.jar rabbitmq-client.jar commons-io-1.2.jar \
Index: com/omniti/reconnoiter/EventHandler.java
===================================================================
--- com/omniti/reconnoiter/EventHandler.java	(revision 866)
+++ com/omniti/reconnoiter/EventHandler.java	(working copy)
@@ -7,12 +7,7 @@
 import com.espertech.esper.client.EPStatement;
 import com.espertech.esper.client.UpdateListener;
 import com.omniti.reconnoiter.broker.IMQBroker;
-import com.omniti.reconnoiter.event.NoitEvent;
-import com.omniti.reconnoiter.event.NoitMetricNumeric;
-import com.omniti.reconnoiter.event.StratconQueryBase;
-import com.omniti.reconnoiter.event.StratconStatement;
-import com.omniti.reconnoiter.event.StratconQuery;
-import com.omniti.reconnoiter.event.StratconQueryStop;
+import com.omniti.reconnoiter.event.*;
 
 public class EventHandler {
 	
@@ -27,7 +22,7 @@
   }
 	
   public void processMessage(String xml) throws Exception {
-    StratconMessage m = StratconMessage.makeMessage(xml);
+    Object m = StratconMessage.makeMessage(xml);
     if(m == null) {
       System.err.println("Can't grok:\n" + xml);
     }
@@ -63,11 +58,17 @@
         sq.destroy();
       }
     }
-    else if(m instanceof NoitEvent) {
-      epService.getEPRuntime().sendEvent(((NoitEvent) m).getDocument());
+     else if(m instanceof NoitMetricText) {
+      epService.getEPRuntime().sendEvent((NoitMetricText) m);
     }
     else if(m instanceof NoitMetricNumeric) {
       epService.getEPRuntime().sendEvent((NoitMetricNumeric) m);
     }
+    else if(m instanceof NoitCheck) {
+      epService.getEPRuntime().sendEvent((NoitCheck) m);
+    }
+    else if(m instanceof NoitStatus) {
+      epService.getEPRuntime().sendEvent((NoitStatus) m);
+    }
 	}
 }
Index: com/omniti/reconnoiter/event/NoitMetricNumeric.java
===================================================================
--- com/omniti/reconnoiter/event/NoitMetricNumeric.java	(revision 866)
+++ com/omniti/reconnoiter/event/NoitMetricNumeric.java	(working copy)
@@ -10,59 +10,33 @@
 
 import com.omniti.reconnoiter.StratconMessage;
 
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
-
 public class NoitMetricNumeric extends StratconMessage {
     private String uuid;
     private String name;
     private Double value;
     private String noit;
 
-    protected String get_string(Element e, String tag) throws NoitMetricNumericException {
-      NodeList vals = e.getElementsByTagName(tag);
-      if(vals.getLength() != 1)
-        throw new NoitMetricNumericException("Bad XML: tag " + tag + " (" + vals.getLength() + ")");
-      Node n = vals.item(0);
-      Node tn = n.getFirstChild();
-      if(tn.getNextSibling() != null)
-        throw new NoitMetricNumericException("Bad XML: " + tag + " has siblings");
-      if(tn.getNodeType() != Node.TEXT_NODE)
-        throw new NoitMetricNumericException("Bad XML: " + tag + " not text");
-      Text text_node = (Text)tn;
-      return text_node.getNodeValue();
-    }
-    protected Double get_double(Element e, String tag) throws NoitMetricNumericException {
-      Double d = null;
+  protected String getPrefix() {
+     return "M";
+   }
+   /*
+   'M' REMOTE TIMESTAMP UUID NAME TYPE VALUE
+    */
+   public NoitMetricNumeric(String[] parts) throws Exception {
+      super(parts);
+      noit = parts[1];
+      uuid = parts[3];
+      name = parts[4];
       try {
-        d = new Double(get_string(e, tag));
-      } catch(NumberFormatException nfe) {
+        value = Double.valueOf(parts[6]);
+      } catch (NumberFormatException nfe) {
+        value = null;
       }
-      return d;
-    }
 
-    public NoitMetricNumeric(Document document) throws NoitMetricNumericException {
-      Element e = document.getDocumentElement();
-      String tag = e.getTagName();
-      if(!tag.equals("NoitMetricNumeric"))
-        throw new NoitMetricNumericException("Bad XML");
-      uuid = get_string(e,"id");
-      name = get_string(e,"name");
-      value = get_double(e, "value");
-      noit = get_string(e,"remote");
-    }
+   }
 
-    public String getuuid() { return uuid; }
-    public String getname() { return name; }
-    public Double getvalue() { return value; }
-    public String getnoit() { return noit; }
-
-    private class NoitMetricNumericException extends Exception {
-      public NoitMetricNumericException(String s) {
-        super(s);
-      }
-    }
+    public String getUuid() { return uuid; }
+    public String getName() { return name; }
+    public Double getValue() { return value; }
+    public String getNoit() { return noit; }
 }
Index: com/omniti/reconnoiter/event/StratconStatement.java
===================================================================
--- com/omniti/reconnoiter/event/StratconStatement.java	(revision 866)
+++ com/omniti/reconnoiter/event/StratconStatement.java	(working copy)
@@ -9,6 +9,8 @@
 package com.omniti.reconnoiter.event;
 
 import com.omniti.reconnoiter.event.StratconQueryBase;
+import com.espertech.esper.client.UpdateListener;
+
 import java.util.UUID;
 import javax.xml.xpath.XPath;
 import javax.xml.xpath.XPathFactory;
@@ -17,16 +19,19 @@
 import org.w3c.dom.Document;
 
 public class StratconStatement extends StratconQueryBase {
-  public StratconStatement(Document d) {
-    XPath xpath = XPathFactory.newInstance().newXPath();
-    try {
-      String id = (String) xpath.evaluate("/StratconStatement/id", d, XPathConstants.STRING);
-      if(id == null) uuid = UUID.randomUUID();
-      else uuid = UUID.fromString(id);
-      expression = (String) xpath.evaluate("/StratconStatement/expression", d, XPathConstants.STRING);
-    }
-    catch(XPathExpressionException e) {
-    }
-    if(uuid == null) uuid = UUID.randomUUID();
-  }
+
+   protected String getPrefix() {
+     return "D";
+   }
+
+   /*  'D' REMOTE ID QUERY  */
+   public StratconStatement(String[] parts) throws Exception {
+      super(parts);
+      String id = parts[2];
+      expression = parts[3];
+      if(id == null)
+        uuid = UUID.randomUUID();
+      else
+        uuid = UUID.fromString(id);
+   }
 }
Index: com/omniti/reconnoiter/event/NoitMetricText.java
===================================================================
--- com/omniti/reconnoiter/event/NoitMetricText.java	(revision 0)
+++ com/omniti/reconnoiter/event/NoitMetricText.java	(revision 0)
@@ -0,0 +1,31 @@
+package com.omniti.reconnoiter.event;
+
+import com.omniti.reconnoiter.StratconMessage;
+
+public class NoitMetricText extends StratconMessage {
+   String uuid;
+   String name;
+   String message;
+   String noit;
+
+   protected String getPrefix() {
+     return "M";
+   }
+   /*
+   'M' REMOTE TIMESTAMP UUID NAME TYPE VALUE
+    */
+   public NoitMetricText(String[] parts) throws Exception {
+      super(parts);
+      noit = parts[1];
+      uuid = parts[3];
+      name = parts[4];
+      message = parts[6];
+
+   }
+    public String getUuid() { return uuid; }
+    public String getName() { return name; }
+    public String getMessage() { return message; }
+    public String getNoit() { return noit; }
+
+
+}
Index: com/omniti/reconnoiter/event/StratconQuery.java
===================================================================
--- com/omniti/reconnoiter/event/StratconQuery.java	(revision 866)
+++ com/omniti/reconnoiter/event/StratconQuery.java	(working copy)
@@ -12,27 +12,26 @@
 import com.omniti.reconnoiter.event.StratconQueryBase;
 import com.espertech.esper.client.UpdateListener;
 import java.util.UUID;
-import javax.xml.xpath.XPath;
-import javax.xml.xpath.XPathFactory;
-import javax.xml.xpath.XPathConstants;
-import javax.xml.xpath.XPathExpressionException;
-import org.w3c.dom.Document;
 
 public class StratconQuery extends StratconQueryBase {
   protected UpdateListener listener;
   protected String name;
 
-  public StratconQuery(Document d) {
-    XPath xpath = XPathFactory.newInstance().newXPath();
-    try {
-      String id = (String) xpath.evaluate("/StratconQuery/id", d, XPathConstants.STRING);
-      if(id == null) uuid = UUID.randomUUID();
-      else uuid = UUID.fromString(id);
-      name = (String) xpath.evaluate("/StratconQuery/name", d, XPathConstants.STRING);
-      expression = (String) xpath.evaluate("/StratconQuery/expression", d, XPathConstants.STRING);
-    }
-    catch(XPathExpressionException e) {
-    }
+  protected String getPrefix() {
+     return "Q";
+   }
+
+  /*  'Q' REMOTE ID NAME QUERY  */
+  public StratconQuery(String[] parts) throws Exception {
+    super(parts);
+    String id = parts[2];
+    name = parts[3];
+    expression = parts[4];
+    if(id == null)
+      uuid = UUID.randomUUID();
+    else
+      uuid = UUID.fromString(id);
+
     if(name == null) name = "default";
     if(uuid == null) uuid = UUID.randomUUID();
   }
Index: com/omniti/reconnoiter/event/NoitCheck.java
===================================================================
--- com/omniti/reconnoiter/event/NoitCheck.java	(revision 0)
+++ com/omniti/reconnoiter/event/NoitCheck.java	(revision 0)
@@ -0,0 +1,36 @@
+package com.omniti.reconnoiter.event;
+
+import com.omniti.reconnoiter.StratconMessage;
+
+
+public class NoitCheck extends StratconMessage {
+   String uuid; 
+   String target; 
+   String module;
+   String name; 
+   String noit;
+
+   protected String getPrefix() {
+     return "C";
+   }
+
+   /*
+    'C' REMOTE TIMESTAMP UUID TARGET MODULE NAME
+    */
+   public NoitCheck(String[] parts) throws Exception {
+      super(parts);
+      noit = parts[1];
+      uuid = parts[3];
+      target = parts[4];
+      module = parts[5];
+      name = parts[6];
+
+   }
+
+    public String getUuid() { return uuid; }
+    public String getName() { return name; }
+    public String getTarget() { return target; }
+    public String getModule() { return module; }
+    public String getNoit() { return noit; }
+
+}
\ No newline at end of file
Index: com/omniti/reconnoiter/event/StratconQueryStop.java
===================================================================
--- com/omniti/reconnoiter/event/StratconQueryStop.java	(revision 866)
+++ com/omniti/reconnoiter/event/StratconQueryStop.java	(working copy)
@@ -13,12 +13,16 @@
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
-public class StratconQueryStop extends StratconMessage {
-  private UUID uuid;
+public class StratconQueryStop extends StratconQueryBase {
 
-  public StratconQueryStop(Document d) {
-    Element e = d.getDocumentElement();
-    uuid = UUID.fromString(e.getTextContent());
+   protected String getPrefix() {
+     return "q";
+   }
+
+/*  'q' REMOTE ID */
+  public StratconQueryStop(String[] parts) throws Exception {
+    super(parts);
+    uuid = UUID.fromString(parts[2]);
   }
   public UUID getUUID() {
     return uuid;
Index: com/omniti/reconnoiter/event/NoitEvent.java
===================================================================
--- com/omniti/reconnoiter/event/NoitEvent.java	(revision 866)
+++ com/omniti/reconnoiter/event/NoitEvent.java	(working copy)
@@ -1,63 +0,0 @@
-/*
- * Copyright (c) 2009, OmniTI Computer Consulting, Inc.
- * All rights reserved.
- * The software in this package is published under the terms of the GPL license
- * a copy of which can be found at:
- * https://labs.omniti.com/reconnoiter/trunk/src/java/LICENSE
- */
-
-package com.omniti.reconnoiter.event;
-
-import com.omniti.reconnoiter.StratconMessage;
-
-import com.espertech.esper.client.EPServiceProvider;
-import com.espertech.esper.client.ConfigurationEventTypeXMLDOM;
-
-import javax.xml.xpath.XPathConstants;
-
-import org.w3c.dom.Document;
-
-public class NoitEvent extends StratconMessage {
-  protected Document document;
-
-  public static void registerTypes(EPServiceProvider epService) {
-    ConfigurationEventTypeXMLDOM cfg;
-
-    cfg = new ConfigurationEventTypeXMLDOM();
-    cfg.addXPathProperty("uuid", "/NoitCheck/id", XPathConstants.STRING);
-    cfg.addXPathProperty("target", "/NoitCheck/target", XPathConstants.STRING);
-    cfg.addXPathProperty("module", "/NoitCheck/module", XPathConstants.STRING);
-    cfg.addXPathProperty("name", "/NoitCheck/name", XPathConstants.STRING);
-    cfg.addXPathProperty("noit", "/NoitCheck/remote", XPathConstants.STRING);
-    cfg.setRootElementName("NoitCheck");
-    epService.getEPAdministrator().getConfiguration()
-             .addEventType("NoitCheck", cfg);
-
-    cfg = new ConfigurationEventTypeXMLDOM();
-    cfg.addXPathProperty("uuid", "/NoitStatus/id", XPathConstants.STRING);
-    cfg.addXPathProperty("status", "/NoitStatus/status", XPathConstants.STRING);
-    cfg.addXPathProperty("state", "/NoitStatus/state", XPathConstants.STRING);
-    cfg.addXPathProperty("availability", "/NoitStatus/availability", XPathConstants.STRING);
-    cfg.addXPathProperty("duration", "/NoitStatus/duration", XPathConstants.NUMBER);
-    cfg.addXPathProperty("noit", "/NoitStatus/remote", XPathConstants.STRING);
-    cfg.setRootElementName("NoitStatus");
-    epService.getEPAdministrator().getConfiguration()
-             .addEventType("NoitStatus", cfg);
-    cfg = new ConfigurationEventTypeXMLDOM();
-    cfg.addXPathProperty("uuid", "/NoitMetricText/id", XPathConstants.STRING);
-    cfg.addXPathProperty("name", "/NoitMetricText/name", XPathConstants.STRING);
-    cfg.addXPathProperty("message", "/NoitMetricText/message", XPathConstants.STRING);
-    cfg.addXPathProperty("noit", "/NoitMetricText/remote", XPathConstants.STRING);
-    cfg.setRootElementName("NoitMetricText");
-    epService.getEPAdministrator().getConfiguration()
-             .addEventType("NoitMetricText", cfg);
-  }
-
-  public NoitEvent(Document d) {
-    document = d;
-  }
-  public Document getDocument() {
-    return document;
-  }
-}
-
Index: com/omniti/reconnoiter/event/NoitStatus.java
===================================================================
--- com/omniti/reconnoiter/event/NoitStatus.java	(revision 0)
+++ com/omniti/reconnoiter/event/NoitStatus.java	(revision 0)
@@ -0,0 +1,40 @@
+package com.omniti.reconnoiter.event;
+
+import com.omniti.reconnoiter.StratconMessage;
+
+
+public class NoitStatus  extends StratconMessage {
+   String uuid;
+   String status;
+   String state;
+   String availability;
+   Double duration;
+   String noit;
+
+  @Override
+  protected String getPrefix() {
+     return "S";
+   }
+
+   /*
+    'S' REMOTE TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE
+    */
+   public NoitStatus(String[] parts) throws Exception {
+      super(parts);
+      noit = parts[1];
+      uuid = parts[3];
+      state = parts[4];
+      availability = parts[5];
+      duration = Double.parseDouble(parts[6]);
+      status = parts[7];
+
+   }
+
+    public String getUuid() { return uuid; }
+    public String getStatus() { return status; }
+    public String getState() { return state; }
+    public String getAvailability() { return availability; }
+    public Double getDuration() { return duration; }
+    public String getNoit() { return noit; }
+
+}
\ No newline at end of file
Index: com/omniti/reconnoiter/event/StratconQueryBase.java
===================================================================
--- com/omniti/reconnoiter/event/StratconQueryBase.java	(revision 866)
+++ com/omniti/reconnoiter/event/StratconQueryBase.java	(working copy)
@@ -18,6 +18,10 @@
   protected UUID uuid;
   protected String expression;
 
+  public StratconQueryBase(String parts[]) throws Exception {
+    super(parts);
+  }
+
   public UUID getUUID() {
     return uuid;
   }
Index: com/omniti/reconnoiter/StratconMessage.java
===================================================================
--- com/omniti/reconnoiter/StratconMessage.java	(revision 866)
+++ com/omniti/reconnoiter/StratconMessage.java	(working copy)
@@ -19,49 +19,55 @@
 import com.omniti.reconnoiter.event.*;
 
 public class StratconMessage {
-  private static DocumentBuilderFactory factory = null;
-  private static DocumentBuilder parser = null;
 
-  static {
-    if(factory == null) {
-      factory = DocumentBuilderFactory.newInstance();
-      factory.setIgnoringComments(true);
-      factory.setCoalescing(true); // Convert CDATA to Text nodes
-      factory.setNamespaceAware(false); // No namespaces: this is default
-      factory.setValidating(false); // Don't validate DTD: also default
+  // This is the text type in the noit_log.h
+  public final static String METRIC_STRING = "s";
+  
+  public static String[] parseToArray(String jlog, int num) {
+    // Get rid of the null parameter
+    return jlog.substring(0, jlog.length()-1).split("[\t]", num);
+  }
+
+  protected String getPrefix() {
+     return null;
+   }
+
+  // Check and make sure 
+  public StratconMessage(String[] parts) throws Exception {
+    if (!parts[0].equals(this.getPrefix())) {
+      throw new Exception("Incorrect state prefix:" + getPrefix() + " not applicable for " + getClass());
     }
-    if(parser == null) {
-      try {
-        parser = factory.newDocumentBuilder();
-      } catch(Exception e) { }
-    }
   }
 
-  public static StratconMessage makeMessage(String xml) {
-    InputSource source = new InputSource(new StringReader(xml));
-
+  public static Object makeMessage(String jlog) {
+    String[] parts;
+    // The numbers of the parse are pulled from stratcon and
+    // +1 for the extra remote
     try {
-      Document document = parser.parse(source);
-      Element e = document.getDocumentElement();
-      String tag = e.getTagName();
-      // We have events
-      if(tag.equals("NoitStatus") ||
-         tag.equals("NoitMetricText") ||
-         tag.equals("NoitCheck"))
-        return new NoitEvent(document);
-      else if(tag.equals("NoitMetricNumeric")) {
-        // Numerics have a value that can be in scientific notation.
-        // This document gets passed places that do Xpath 1.0 queries
-        // which don't understand scientific notation... we have to hack it.
-        return new NoitMetricNumeric(document);
+      switch (jlog.charAt(0)) {
+        case 'C':
+          parts = parseToArray(jlog, 7);
+          return new NoitCheck(parts);
+        case 'S':
+          parts = parseToArray(jlog, 8);
+          return new NoitStatus(parts);
+        case 'M':
+          parts = parseToArray(jlog, 7);
+          if (parts[5].equals(METRIC_STRING)) {
+            return new NoitMetricText(parts);
+          } else {
+            return new NoitMetricNumeric(parts);
+          }
+        case 'D':
+          parts = parseToArray(jlog, 4);
+          return new StratconStatement(parts);
+        case 'Q':
+          parts = parseToArray(jlog, 5);
+          return new StratconQuery(parts);
+        case 'q':
+          parts = parseToArray(jlog, 3);
+          return new StratconQueryStop(parts);
       }
-      // and requests
-      else if(tag.equals("StratconStatement"))
-        return new StratconStatement(document);
-      else if(tag.equals("StratconQuery"))
-        return new StratconQuery(document);
-      else if(tag.equals("StratconQueryStop"))
-        return new StratconQueryStop(document);
     }
     catch(Exception e) {
       System.err.println("makeMessage: " + e);
Index: com/omniti/reconnoiter/IEPEngine.java
===================================================================
--- com/omniti/reconnoiter/IEPEngine.java	(revision 866)
+++ com/omniti/reconnoiter/IEPEngine.java	(working copy)
@@ -13,7 +13,6 @@
 import java.io.InputStreamReader;
 import com.omniti.reconnoiter.MQListener;
 import com.omniti.reconnoiter.broker.BrokerFactory;
-import com.omniti.reconnoiter.event.NoitEvent;
 import com.omniti.reconnoiter.StratconConfig;
 import com.espertech.esper.client.*;
 import org.apache.log4j.BasicConfigurator;
@@ -31,7 +30,6 @@
     config.addDatabaseReference("recondb", sconf.getDBConfig());
     config.addEventTypeAutoName("com.omniti.reconnoiter.event");
     EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
-    NoitEvent.registerTypes(epService);
 
     MQListener l = new MQListener(epService, BrokerFactory.getBroker(sconf));
 
Index: com/omniti/reconnoiter/broker/AMQBroker.java
===================================================================
--- com/omniti/reconnoiter/broker/AMQBroker.java	(revision 866)
+++ com/omniti/reconnoiter/broker/AMQBroker.java	(working copy)
@@ -68,5 +68,4 @@
   public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq) {
     return new AMQListener(epService, sq);
   }
-
 }
Index: com/omniti/reconnoiter/broker/RabbitBroker.java
===================================================================
--- com/omniti/reconnoiter/broker/RabbitBroker.java	(revision 866)
+++ com/omniti/reconnoiter/broker/RabbitBroker.java	(working copy)
@@ -14,7 +14,7 @@
 import com.rabbitmq.client.QueueingConsumer;
 
 
-public class RabbitBroker implements IMQBroker, Runnable  {
+public class RabbitBroker implements IMQBroker  {
 
   private Channel channel;
   private boolean noAck = false;
@@ -28,10 +28,8 @@
   private String routingKey;
   private String alertQueue;
   private String alertExchangeName;
-  private StratconConfig config ;
 
   public RabbitBroker(StratconConfig config) {
-    this.config = config;
     this.userName = config.getBrokerParameter("username", "guest");
     this.password = config.getBrokerParameter("password", "guest");
     this.virtualHost = config.getBrokerParameter("virtualhost", "/");
@@ -109,8 +107,4 @@
   public UpdateListener getListener(EPServiceProvider epService, StratconQuery sq) {
     return new RabbitListener(epService, sq, channel, alertExchangeName, alertQueue);
   }
-
-  public void run() {
-  }
-
 }
Index: src/stratcon_iep.c
===================================================================
--- src/stratcon_iep.c	(revision 866)
+++ src/stratcon_iep.c	(working copy)
@@ -536,13 +536,6 @@
         apr_hash_set(frame.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange);
       }
 
-
-
-/*
-      We don't use login/pass
-      apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, "");
-      apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, "");
-*/
       frame.body = NULL;
       frame.body_length = -1;
       rc = stomp_write(driver->connection, &frame, driver->pool);
@@ -618,63 +611,67 @@
     noitL(noit_debug, "Skipping old event %f second old.\n", age);
     return 0;
   }
-  job->doc = stratcon_iep_doc_from_line(job->line, job->remote);
-  if(job->doc) {
-    job->doc_str = noit_xmlSaveToBuffer(job->doc);
-    if(job->doc_str) {
+  //job->doc = stratcon_iep_doc_from_line(job->line, job->remote);
+  //if(job->doc) {
+  //  job->doc_str = noit_xmlSaveToBuffer(job->doc);
+  //  if(job->doc_str) {
       /* Submit */
-      if(driver && driver->pool && driver->connection) {
-        apr_status_t rc;
+  if(driver && driver->pool && driver->connection) {
+    apr_status_t rc;
+    int line_len = strlen(job->line);
+    int remote_len = strlen(job->remote);
 #ifdef OPENWIRE
-        ow_ActiveMQQueue *dest;
-        ow_ActiveMQTextMessage *message;
+    ow_ActiveMQQueue *dest;
+    ow_ActiveMQTextMessage *message;
 
-        apr_pool_create(&job->pool, driver->pool);
-        message = ow_ActiveMQTextMessage_create(job->pool);
-        message->content =
-          ow_byte_array_create_with_data(job->pool,strlen(job->doc_str),
-                                         job->doc_str);
-        dest = ow_ActiveMQQueue_create(job->pool);
-        dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");         
-        rc = amqcs_send(driver->connection,
-                        (ow_ActiveMQDestination*)dest,
-                        (ow_ActiveMQMessage*)message,
-                        1,4,0,job->pool);
-        if(rc != APR_SUCCESS) {
-          noitL(noit_error, "MQ send failed, disconnecting\n");
-          if(driver->connection) amqcs_disconnect(&driver->connection);
-          driver->connection = NULL;
-        }
+    apr_pool_create(&job->pool, driver->pool);
+    message = ow_ActiveMQTextMessage_create(job->pool);
+    message->content =
+      ow_byte_array_create_with_data(job->pool,strlen(job->doc_str),
+                                     job->doc_str);
+    dest = ow_ActiveMQQueue_create(job->pool);
+    dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");         
+    rc = amqcs_send(driver->connection,
+                    (ow_ActiveMQDestination*)dest,
+                    (ow_ActiveMQMessage*)message,
+                    1,4,0,job->pool);
+    if(rc != APR_SUCCESS) {
+      noitL(noit_error, "MQ send failed, disconnecting\n");
+      if(driver->connection) amqcs_disconnect(&driver->connection);
+      driver->connection = NULL;
+    }
 #else
-        stomp_frame out;
+    stomp_frame out;
 
-        apr_pool_create(&job->pool, driver->pool);
+    job->doc_str = (char*)calloc(line_len + 1 /* \t */ +
+        remote_len + 2, 1);
+    strncpy(job->doc_str, job->line, 2);
+    strncat(job->doc_str, job->remote, remote_len);
+    strncat(job->doc_str, "\t", 1);
+    strncat(job->doc_str, job->line + 2, line_len - 2);
 
-        out.command = "SEND";
-        out.headers = apr_hash_make(job->pool);
-        if (driver->exchange)
-          apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange);
+    apr_pool_create(&job->pool, driver->pool);
 
-        apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose");
-        apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto");
-      
-        out.body_length = -1;
-        out.body = job->doc_str;
-        rc = stomp_write(driver->connection, &out, job->pool);
-        if(rc != APR_SUCCESS) {
-          noitL(noit_error, "STOMP send failed, disconnecting\n");
-          if(driver->connection) stomp_disconnect(&driver->connection);
-          driver->connection = NULL;
-        }
-#endif
-      }
-      else {
-        noitL(noit_error, "Not submitting event, no MQ\n");
-      }
+    out.command = "SEND";
+    out.headers = apr_hash_make(job->pool);
+    if (driver->exchange)
+      apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange);
+
+    apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose");
+    apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto");
+  
+    out.body_length = -1;
+    out.body = job->doc_str;
+    rc = stomp_write(driver->connection, &out, job->pool);
+    if(rc != APR_SUCCESS) {
+      noitL(noit_error, "STOMP send failed, disconnecting\n");
+      if(driver->connection) stomp_disconnect(&driver->connection);
+      driver->connection = NULL;
     }
+#endif
   }
   else {
-    noitL(noit_iep, "no iep handler for: '%s'\n", job->line);
+    noitL(noit_error, "Not submitting event, no MQ\n");
   }
   return 0;
 }


More information about the Reconnoiter-devel mailing list