Changeset 5cffd17c6e77c2634aa900dbfa5a0385fa9cf9e8

Show
Ignore:
Timestamp:
11/11/09 20:37:20 (4 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1257971840 +0000
git-parent:

[8f8ea5729bc7adce9e392ef0eb4ab5c6755f9dcd]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1257971840 +0000
Message:

pretty hefty refactoring using reflection stuff to allow registering StratconMessage? types on-the-fly. Supports batching. All else should function as it did (no API/ABI/payload BC breakages that I'm aware of). refs #217

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/Makefile.in

    rb4d0df9 r5cffd17  
    3131        com/omniti/reconnoiter/broker/AMQListener.java \ 
    3232        com/omniti/reconnoiter/broker/BrokerFactory.java \ 
     33        com/omniti/reconnoiter/event/NoitEvent.java \ 
    3334        com/omniti/reconnoiter/event/NoitCheck.java \ 
    3435        com/omniti/reconnoiter/event/NoitStatus.java \ 
     36        com/omniti/reconnoiter/event/NoitMetric.java \ 
    3537        com/omniti/reconnoiter/event/NoitMetricText.java \ 
    3638        com/omniti/reconnoiter/event/NoitMetricNumeric.java \ 
     
    5153        com/omniti/reconnoiter/IEPEngine.java \ 
    5254        com/omniti/reconnoiter/StratconConfig.java \ 
    53         com/omniti/reconnoiter/StratconMessage.java 
     55        com/omniti/reconnoiter/StratconMessage.java \ 
     56        com/omniti/reconnoiter/StratconMessageFactory.java 
    5457 
    5558SUPPORT=lib/activemq-all-5.2.0.jar lib/antlr-runtime-3.1.1.jar lib/esper-3.2.0.jar \ 
  • src/java/com/omniti/reconnoiter/EventHandler.java

    r7ebe017 r5cffd17  
    2121    this.broker = broker; 
    2222  } 
     23  public EPServiceProvider getService() { return epService; } 
     24  public IMQBroker getBroker() { return broker; } 
     25  public boolean registerQuery(StratconQueryBase sq) { 
     26    if(queries.containsKey(sq.getUUID())) return false; 
     27    queries.put(sq.getUUID(), sq); 
     28    return true; 
     29  } 
     30  public boolean deregisterQuery(UUID id) { 
     31    StratconQueryBase sq = queries.get(id); 
     32    if(sq != null) { 
     33      queries.remove(sq.getUUID()); 
     34      sq.destroy(); 
     35      System.err.println("Stopping Query/Statement: " + id); 
     36      return true; 
     37    } 
     38    return false; 
     39  } 
     40  public boolean isQueryRegistered(UUID id) { return queries.containsKey(id); } 
    2341         
    24   public void processMessage(String xml) throws Exception { 
    25     StratconMessage m = StratconMessage.makeMessage(xml); 
    26     if(m == null) { 
    27       System.err.println("Can't grok:\n" + xml); 
     42  public void processMessage(String payload) throws Exception { 
     43    Exception last = null; 
     44    StratconMessage[] messages = StratconMessage.makeMessages(payload); 
     45    if(messages == null) { 
     46      System.err.println("Can't grok:\n" + payload); 
    2847    } 
    29     if(m instanceof StratconStatement) { 
    30       StratconStatement sq = (StratconStatement) m; 
    31  
    32       if(queries.containsKey(sq.getUUID())) throw (new Exception("Duplicate Query")); 
    33  
    34       EPStatement statement = epService.getEPAdministrator().createEPL(sq.getExpression()); 
    35       sq.setStatement(statement); 
    36       queries.put(sq.getUUID(), sq); 
    37       System.err.println("Creating Statement: " + sq.getUUID()); 
     48    for ( StratconMessage m : messages ) { 
     49      if(m != null) try { m.handle(this); } catch (Exception e) { last = e; } 
    3850    } 
    39     else if(m instanceof StratconQuery) { 
    40       StratconQuery sq = (StratconQuery) m; 
    41  
    42       if(queries.containsKey(sq.getUUID())) throw (new Exception("Duplicate Query")); 
    43  
    44       EPStatement statement = epService.getEPAdministrator().createEPL(sq.getExpression()); 
    45       UpdateListener o = broker.getListener(this.epService, sq); 
    46  
    47       statement.addListener(o); 
    48       sq.setStatement(statement); 
    49       sq.setListener(o); 
    50       queries.put(sq.getUUID(), sq); 
    51       System.err.println("Creating Query: " + sq.getUUID()); 
    52     } 
    53     else if(m instanceof StratconQueryStop) { 
    54       /* QueryStop stops both queries and statements */ 
    55       StratconQueryBase sq = queries.get(((StratconQueryStop) m).getUUID()); 
    56       if(sq != null) { 
    57         queries.remove(sq.getUUID()); 
    58         sq.destroy(); 
    59       } 
    60     } 
    61      else if(m instanceof NoitMetricText) { 
    62       epService.getEPRuntime().sendEvent(m); 
    63     } 
    64     else if(m instanceof NoitMetricNumeric) { 
    65       epService.getEPRuntime().sendEvent(m); 
    66     } 
    67     else if(m instanceof NoitCheck) { 
    68       epService.getEPRuntime().sendEvent(m); 
    69     } 
    70     else if(m instanceof NoitStatus) { 
    71       epService.getEPRuntime().sendEvent(m); 
    72     } 
     51    if(last != null) throw(last); 
    7352  } 
    7453} 
  • src/java/com/omniti/reconnoiter/StratconMessage.java

    r5640498 r5cffd17  
    1010 
    1111import java.io.StringReader; 
    12 import javax.xml.parsers.DocumentBuilder; 
    13 import javax.xml.parsers.DocumentBuilderFactory; 
    14 import org.xml.sax.InputSource; 
    15  
    16 import org.w3c.dom.Document; 
    17 import org.w3c.dom.Element; 
    18  
     12import java.util.HashMap; 
     13import java.lang.reflect.Constructor; 
     14import java.lang.reflect.Method; 
     15import java.lang.reflect.InvocationTargetException; 
    1916import com.omniti.reconnoiter.event.*; 
    2017 
    21 public class StratconMessage { 
    22  
     18public abstract class StratconMessage { 
    2319  // This is the text type in the noit_log.h 
    2420  public final static String METRIC_STRING = "s"; 
    25    
     21  public final static HashMap<String,StratconMessageFactory> quicklookup = new HashMap<String,StratconMessageFactory>(); 
     22 
     23  public static boolean registerType(Class clazz) { 
     24    boolean success = false; 
     25    try { 
     26      Method meth = clazz.getMethod("getPrefix"); 
     27      String prefix = (String)meth.invoke(clazz.newInstance()); 
     28      StratconMessageFactory smf = new StratconMessageFactory(clazz); 
     29      quicklookup.put(prefix, smf); 
     30      success = true; 
     31    } catch (NoSuchMethodException e) { 
     32      e.printStackTrace(); 
     33    } catch (IllegalAccessException e) { 
     34      e.printStackTrace(); 
     35    } catch (InvocationTargetException e) { 
     36      e.printStackTrace(); 
     37    } catch (InstantiationException e) { 
     38      e.printStackTrace(); 
     39    } 
     40    return success; 
     41  }   
    2642  public static String[] parseToArray(String jlog, int num) { 
    2743    // Get rid of the null parameter 
    28     return jlog.substring(0, jlog.length()-1).split("[\t]", num); 
     44    String operand = jlog.endsWith("\n") ? jlog.substring(0, jlog.length()-1) 
     45                                         : jlog; 
     46    return operand.split("[\t]", num); 
    2947  } 
    3048 
    31   protected String getPrefix() { 
    32      return null; 
    33   } 
     49  public String getPrefix() { return null; } 
    3450 
    3551  protected long timeToLong(String time) { 
     
    4763  } 
    4864 
    49   protected int getLength() { 
    50      return -1; 
    51   } 
     65  public int getLength() { return 0; } 
    5266 
     67  public abstract void handle(EventHandler eh); 
     68 
     69  public StratconMessage() {} 
    5370  // Check and make sure 
    5471  public StratconMessage(String[] parts) throws Exception { 
     
    6380  public static StratconMessage makeMessage(String jlog) { 
    6481    String[] parts; 
     82    if(jlog == null || jlog.length() == 0) return null; 
    6583    // The numbers of the parse are pulled from stratcon and 
    6684    // +1 for the extra remote 
    6785    try { 
    68       switch (jlog.charAt(0)) { 
    69         case 'C': 
    70           parts = parseToArray(jlog, 7); 
    71           return new NoitCheck(parts); 
    72         case 'S': 
    73           parts = parseToArray(jlog, 8); 
    74           return new NoitStatus(parts); 
    75         case 'M': 
    76           parts = parseToArray(jlog, 7); 
    77           if (parts[5].equals(METRIC_STRING)) { 
    78             return new NoitMetricText(parts); 
    79           } else { 
    80             return new NoitMetricNumeric(parts); 
    81           } 
    82         case 'D': 
    83           parts = parseToArray(jlog, 4); 
    84           return new StratconStatement(parts); 
    85         case 'Q': 
    86           parts = parseToArray(jlog, 5); 
    87           return new StratconQuery(parts); 
    88         case 'q': 
    89           parts = parseToArray(jlog, 3); 
    90           return new StratconQueryStop(parts); 
     86      String prefix = jlog.substring(0, jlog.indexOf('\t')); 
     87 
     88      StratconMessageFactory smf = quicklookup.get(prefix); 
     89      if(smf == null) { 
     90        throw new Exception("no handler for " + jlog); 
    9191      } 
     92      parts = parseToArray(jlog, smf.getLength()); 
     93      return smf.make(parts); 
    9294    } 
    9395    catch(Exception e) { 
     
    9799    return null; 
    98100  } 
     101  public static StratconMessage[] makeMessages(String big) { 
     102    String[] lines = big.split("[\n]"); 
     103    StratconMessage[] messages = new StratconMessage[lines.length]; 
     104    for (int i = 0; i < messages.length; i++) { 
     105      messages[i] = makeMessage(lines[i]); 
     106    } 
     107    return messages; 
     108  } 
     109  static { 
     110    StratconMessage.registerType(NoitCheck.class); 
     111    StratconMessage.registerType(NoitStatus.class); 
     112    StratconMessage.registerType(NoitMetric.class); 
     113    StratconMessage.registerType(StratconStatement.class); 
     114    StratconMessage.registerType(StratconQuery.class); 
     115    StratconMessage.registerType(StratconQueryStop.class); 
     116  } 
    99117} 
  • src/java/com/omniti/reconnoiter/event/NoitCheck.java

    r0335d9d r5cffd17  
    11package com.omniti.reconnoiter.event; 
    22 
    3 import com.omniti.reconnoiter.StratconMessage
     3import com.omniti.reconnoiter.event.NoitEvent
    44 
    55 
    6 public class NoitCheck extends StratconMessage
    7   String uuid;  
    8   String target;  
    9   String module; 
    10   String name;  
    11   String noit; 
     6public class NoitCheck extends NoitEvent
     7  String uuid;  
     8  String target;  
     9  String module; 
     10  String name;  
     11  String noit; 
    1212 
    13   protected String getPrefix() { 
    14     return "C"; 
    15  
     13  public String getPrefix() { 
     14    return "C"; 
     15 
    1616 
     17  /* 
     18   'C' REMOTE TIMESTAMP UUID TARGET MODULE NAME 
     19   */ 
     20  public NoitCheck() {} 
     21  public NoitCheck(String[] parts) throws Exception { 
     22    super(parts); 
     23    noit = parts[1]; 
     24    uuid = parts[3]; 
     25    target = parts[4]; 
     26    module = parts[5]; 
     27    name = parts[6]; 
     28  } 
    1729 
     30  public String getUuid() { return uuid; } 
     31  public String getName() { return name; } 
     32  public String getTarget() { return target; } 
     33  public String getModule() { return module; } 
     34  public String getNoit() { return noit; } 
    1835 
    19    /* 
    20     'C' REMOTE TIMESTAMP UUID TARGET MODULE NAME 
    21     */ 
    22    public NoitCheck(String[] parts) throws Exception { 
    23       super(parts); 
    24       noit = parts[1]; 
    25       uuid = parts[3]; 
    26       target = parts[4]; 
    27       module = parts[5]; 
    28       name = parts[6]; 
    29  
    30    } 
    31  
    32     public String getUuid() { return uuid; } 
    33     public String getName() { return name; } 
    34     public String getTarget() { return target; } 
    35     public String getModule() { return module; } 
    36     public String getNoit() { return noit; } 
    37  
    38   protected int getLength() { 
    39      return 7; 
    40    } 
     36  public int getLength() { 
     37    return 7; 
     38  } 
    4139} 
  • src/java/com/omniti/reconnoiter/event/NoitMetricNumeric.java

    rd855ea0 r5cffd17  
    99package com.omniti.reconnoiter.event; 
    1010 
    11 import com.omniti.reconnoiter.StratconMessage
     11import com.omniti.reconnoiter.event.NoitEvent
    1212 
    13 public class NoitMetricNumeric extends StratconMessage
     13public class NoitMetricNumeric extends NoitEvent
    1414    private String uuid; 
    1515    private String name; 
     
    1818    private String noit; 
    1919 
    20   protected String getPrefix() { 
     20  public String getPrefix() { 
    2121     return "M"; 
    2222   } 
     
    2424   'M' REMOTE TIMESTAMP UUID NAME TYPE VALUE 
    2525    */ 
     26   public NoitMetricNumeric() {} 
    2627   public NoitMetricNumeric(String[] parts) throws Exception { 
    2728      super(parts); 
     
    4647    public String getNoit() { return noit; } 
    4748 
    48     protected int getLength() { 
     49    public int getLength() { 
    4950        return 7; 
    5051    } 
  • src/java/com/omniti/reconnoiter/event/NoitMetricText.java

    rd855ea0 r5cffd17  
    11package com.omniti.reconnoiter.event; 
    22 
    3 import com.omniti.reconnoiter.StratconMessage
     3import com.omniti.reconnoiter.event.NoitEvent
    44 
    5 public class NoitMetricText extends StratconMessage
     5public class NoitMetricText extends NoitEvent
    66   String uuid; 
    77   Long time; 
     
    1010   String noit; 
    1111 
    12    protected String getPrefix() { 
     12   public String getPrefix() { 
    1313     return "M"; 
    1414   } 
     
    1616   'M' REMOTE TIMESTAMP UUID NAME TYPE VALUE 
    1717    */ 
     18    public NoitMetricText() {} 
    1819    public NoitMetricText(String[] parts) throws Exception { 
    1920      super(parts); 
     
    3132 
    3233 
    33     protected int getLength() { 
     34    public int getLength() { 
    3435        return 7; 
    3536    } 
  • src/java/com/omniti/reconnoiter/event/NoitStatus.java

    r5640498 r5cffd17  
    11package com.omniti.reconnoiter.event; 
    22 
    3 import com.omniti.reconnoiter.StratconMessage
     3import com.omniti.reconnoiter.event.NoitEvent
    44 
     5public class NoitStatus extends NoitEvent { 
     6  String uuid; 
     7  Long time; 
     8  String status; 
     9  String state; 
     10  String availability; 
     11  Double duration; 
     12  String noit; 
    513 
    6 public class NoitStatus  extends StratconMessage { 
    7    String uuid; 
    8    Long time; 
    9    String status; 
    10    String state; 
    11    String availability; 
    12    Double duration; 
    13    String noit; 
     14  public String getPrefix() { 
     15    return "S"; 
     16  } 
    1417 
    15   @Override 
    16   protected String getPrefix() { 
    17      return "S"; 
    18    } 
     18  /* 
     19   'S' REMOTE TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE 
     20   */ 
     21  public NoitStatus() {} 
     22  public NoitStatus(String[] parts) throws Exception { 
     23    super(parts); 
     24    noit = parts[1]; 
     25    uuid = parts[3]; 
     26    state = parts[4]; 
     27    availability = parts[5]; 
     28    duration = Double.parseDouble(parts[6]); 
     29    status = parts[7]; 
     30    time = timeToLong(parts[2]); 
     31  } 
    1932 
    20    /* 
    21     'S' REMOTE TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE 
    22     */ 
    23    public NoitStatus(String[] parts) throws Exception { 
    24       super(parts); 
    25       noit = parts[1]; 
    26       uuid = parts[3]; 
    27       state = parts[4]; 
    28       availability = parts[5]; 
    29       duration = Double.parseDouble(parts[6]); 
    30       status = parts[7]; 
    31       time = timeToLong(parts[2]); 
    32    } 
     33  public String getUuid() { return uuid; } 
     34  public Long getTime() { return time; } 
     35  public String getStatus() { return status; } 
     36  public String getState() { return state; } 
     37  public String getAvailability() { return availability; } 
     38  public Double getDuration() { return duration; } 
     39  public String getNoit() { return noit; } 
    3340 
    34     public String getUuid() { return uuid; } 
    35     public Long getTime() { return time; } 
    36     public String getStatus() { return status; } 
    37     public String getState() { return state; } 
    38     public String getAvailability() { return availability; } 
    39     public Double getDuration() { return duration; } 
    40     public String getNoit() { return noit; } 
    41  
    42   protected int getLength() { 
    43      return 8; 
    44    } 
     41  public int getLength() { 
     42    return 8; 
     43  } 
    4544} 
  • src/java/com/omniti/reconnoiter/event/StratconQuery.java

    rb4d0df9 r5cffd17  
    1111 
    1212import com.omniti.reconnoiter.event.StratconQueryBase; 
     13import com.omniti.reconnoiter.EventHandler; 
     14import com.espertech.esper.client.EPStatement; 
    1315import com.espertech.esper.client.UpdateListener; 
    1416import java.util.UUID; 
     
    2022  protected Thread thr; 
    2123 
    22   protected String getPrefix() { 
     24  public String getPrefix() { 
    2325     return "Q"; 
    2426   } 
    2527 
    2628  /*  'Q' REMOTE ID NAME QUERY  */ 
     29  public StratconQuery() {} 
    2730  public StratconQuery(String[] parts) throws Exception { 
    2831    super(parts); 
     
    5861  } 
    5962 
    60   protected int getLength() { 
     63  public int getLength() { 
    6164    return 5; 
    6265  } 
     66 
     67  public void handle(EventHandler eh) { 
     68    eh.deregisterQuery(getUUID()); 
     69 
     70    EPStatement statement = eh.getService().getEPAdministrator().createEPL(getExpression()); 
     71    UpdateListener o = eh.getBroker().getListener(eh.getService(), this); 
     72 
     73    statement.addListener(o); 
     74    setStatement(statement); 
     75    setListener(o); 
     76    eh.registerQuery(this); 
     77    System.err.println("Creating Query: " + getUUID()); 
     78  } 
    6379} 
  • src/java/com/omniti/reconnoiter/event/StratconQueryBase.java

    r0335d9d r5cffd17  
    1414import java.util.UUID; 
    1515 
    16 public class StratconQueryBase extends StratconMessage { 
     16public abstract class StratconQueryBase extends StratconMessage { 
    1717  protected EPStatement statement; 
    1818  protected UUID uuid; 
    1919  protected String expression; 
    2020 
     21  public StratconQueryBase() {} 
    2122  public StratconQueryBase(String parts[]) throws Exception { 
    2223    super(parts); 
  • src/java/com/omniti/reconnoiter/event/StratconQueryStop.java

    r0335d9d r5cffd17  
    99package com.omniti.reconnoiter.event; 
    1010 
     11import java.util.UUID; 
    1112import com.omniti.reconnoiter.StratconMessage; 
    12 import java.util.UUID; 
    13 import org.w3c.dom.Document; 
    14 import org.w3c.dom.Element; 
     13import com.omniti.reconnoiter.EventHandler; 
    1514 
    1615public class StratconQueryStop extends StratconQueryBase { 
    1716 
    18   protected String getPrefix() { 
    19     return "q"; 
    20  
     17  public String getPrefix() { 
     18    return "q"; 
     19 
    2120 
    22 /*  'q' REMOTE ID */ 
     21  /*  'q' REMOTE ID */ 
     22  public StratconQueryStop() {} 
    2323  public StratconQueryStop(String[] parts) throws Exception { 
    2424    super(parts); 
     
    2929  } 
    3030 
    31   protected int getLength() { 
     31  public int getLength() { 
    3232    return 3; 
    3333  } 
    34    
     34 
     35  public void handle(EventHandler eh) { 
     36    eh.deregisterQuery(getUUID()); 
     37  } 
    3538} 
    3639 
  • src/java/com/omniti/reconnoiter/event/StratconStatement.java

    r0335d9d r5cffd17  
    1010 
    1111import com.omniti.reconnoiter.event.StratconQueryBase; 
     12import com.omniti.reconnoiter.EventHandler; 
     13import com.espertech.esper.client.EPStatement; 
    1214import com.espertech.esper.client.UpdateListener; 
    1315 
    1416import java.util.UUID; 
    15 import javax.xml.xpath.XPath; 
    16 import javax.xml.xpath.XPathFactory; 
    17 import javax.xml.xpath.XPathConstants; 
    18 import javax.xml.xpath.XPathExpressionException; 
    19 import org.w3c.dom.Document; 
    2017 
    2118public class StratconStatement extends StratconQueryBase { 
    2219 
    23   protected String getPrefix() { 
    24     return "D"; 
    25  
     20  public String getPrefix() { 
     21    return "D"; 
     22 
    2623 
    27    /*  'D' REMOTE ID QUERY  */ 
    28    public StratconStatement(String[] parts) throws Exception { 
    29       super(parts); 
    30       String id = parts[2]; 
    31       expression = parts[3]; 
    32       if(id == null) 
    33         uuid = UUID.randomUUID(); 
    34       else 
    35         uuid = UUID.fromString(id); 
    36    } 
     24  /*  'D' REMOTE ID QUERY  */ 
     25  public StratconStatement() {} 
     26  public StratconStatement(String[] parts) throws Exception { 
     27    super(parts); 
     28    String id = parts[2]; 
     29    expression = parts[3]; 
     30    if(id == null) 
     31      uuid = UUID.randomUUID(); 
     32    else 
     33      uuid = UUID.fromString(id); 
     34  } 
    3735 
    38   protected int getLength() { 
     36  public int getLength() { 
    3937    return 4; 
    4038  } 
     39 
     40  public void handle(EventHandler eh) { 
     41    eh.deregisterQuery(getUUID()); 
     42    EPStatement statement = eh.getService().getEPAdministrator().createEPL(getExpression()); 
     43    setStatement(statement);   
     44    if(eh.registerQuery(this)) { 
     45      System.err.println("Creating Statement: " + getUUID()); 
     46    } 
     47  } 
    4148}