root/src/java/reconnoiter-riemann/src/main/java/com/omniti/reconnoiter/EventHandler.java

Revision d0a64b649e4eac288431ac20d986830b57fb044d, 7.1 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

Add support for riemann as the IEP subsystem.
Remove all traces of Esper.
Change the license on all our bits to simply match reconnoiter.
Cleanup copyrights and embelish auditing script.
Updated test 108 to check riemann iep results.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2013, Circonus, Inc. All rights reserved.
3  * Copyright (c) 2010, OmniTI Computer Consulting, Inc.
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions are
8  * met:
9  *
10  *     * Redistributions of source code must retain the above copyright
11  *       notice, this list of conditions and the following disclaimer.
12  *     * Redistributions in binary form must reproduce the above
13  *       copyright notice, this list of conditions and the following
14  *       disclaimer in the documentation and/or other materials provided
15  *       with the distribution.
16  *     * Neither the name OmniTI Computer Consulting, Inc. nor the names
17  *       of its contributors may be used to endorse or promote products
18  *       derived from this software without specific prior written
19  *       permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 package com.omniti.reconnoiter;
34
35 import org.apache.log4j.Logger;
36 import java.util.UUID;
37 import java.util.LinkedList;
38 import java.util.concurrent.ConcurrentHashMap;
39
40 import com.omniti.reconnoiter.broker.IMQBroker;
41 import com.omniti.reconnoiter.event.*;
42 import com.omniti.reconnoiter.MessageHandler;
43 import com.omniti.reconnoiter.IEventHandler;
44 import java.util.Map;
45 import java.util.HashMap;
46 import java.util.concurrent.atomic.AtomicLong;
47
48 import riemann.codec.Event;
49 import java.io.StringReader;
50 import clojure.lang.RT;
51 import clojure.lang.Var;
52 import clojure.lang.Compiler;
53 import clojure.lang.Atom;
54 import clojure.lang.Symbol;
55 import clojure.lang.Namespace;
56 import clojure.lang.PersistentVector;
57
58 public class EventHandler implements IEventHandler {
59   static Logger logger = Logger.getLogger(EventHandler.class.getName());
60   private LinkedList<MessageHandler> alternates;
61   private IMQBroker broker;
62   private AtomicLong events_handled_num;
63   private AtomicLong events_handled_microseconds;
64   private Atom coreAtom;
65   private java.lang.Object core;
66   private Var stream;
67   static private EventHandler _global;
68
69   static public void _sendAlert(String key, String json) {
70     _global.sendAlert(key,json);
71   }
72   static public void _sendAlert(String json) {
73     _global.sendAlert(json);
74   }
75   static public void _coreReload() {
76     _global.coreReload();
77   }
78   public EventHandler(IMQBroker broker) {
79     _global = this;
80     this.broker = broker;
81     alternates = new LinkedList<MessageHandler>();
82     events_handled_num = new AtomicLong(0);
83     events_handled_microseconds = new AtomicLong(0);
84
85     coreAtom = (Atom)RT.var("riemann.config", "core").deref();
86     String watcher =
87       "(clojure.core/add-watch " +
88       "  riemann.config/core 1 " +
89       "    (fn [ref key old new] " +
90       "        (com.omniti.reconnoiter.EventHandler/_coreReload)))";
91     Compiler.load(new StringReader(watcher));
92     stream = RT.var("riemann.core", "stream!");
93     coreReload();
94   }
95   public void coreReload() {
96     core = coreAtom.deref();
97     logger.info("core changed, reloaded: " + core);
98   }
99   public void addObserver(MessageHandler mh) {
100     alternates.add(mh);
101   }
102   public IMQBroker getBroker() { return broker; }
103
104   public void processMessage(StratconMessage m) throws Exception {
105     for ( MessageHandler mh : alternates )
106       if(mh.observe(m, null) == true)
107         return;
108     long start = System.nanoTime();
109     m.handle(this);
110     long us = (System.nanoTime() - start) / 1000;
111     events_handled_num.incrementAndGet();
112     events_handled_microseconds.addAndGet(us);
113   }
114   public long getNumEventsHandled() { return events_handled_num.longValue(); }
115   public long getMicrosecondsHandlingEvents() { return events_handled_microseconds.longValue(); }
116   public void processMessage(StratconMessage[] messages) throws Exception {
117     Exception last = null;
118     for ( StratconMessage m : messages ) {
119       if(m != null) try { processMessage(m); } catch (Exception e) { last = e; }
120     }
121     if(last != null) throw(last);
122   }
123   public void processMessage(String payload) throws Exception {
124     Exception last = null;
125     StratconMessage[] messages = StratconMessage.makeMessages(payload);
126     if(messages == null) {
127       System.err.println("Can't grok:\n" + payload);
128     }
129     processMessage(messages);
130   }
131   public boolean stopProcessing(StratconMessage m, String source) {
132     for ( MessageHandler mh : alternates )
133       if(mh.observe(m, source) == true)
134         return true;
135     return false;
136   }
137   public void sendAlert(String json) {
138     broker.alert(null, json);
139   }
140   public void sendAlert(String name, String json) {
141     broker.alert(name, json);
142   }
143   public void sendEvent(StratconMessage m) {
144     Event e = null;
145     NoitMetricNumeric nmn = null;
146     NoitMetricText nmt = null;
147     if (m instanceof NoitMetric) {
148       NoitMetric nm = (NoitMetric)m;
149       if(nm.isNumeric()) nmn = nm.getNumeric();
150       if(nm.isText()) nmt = nm.getText();
151     }
152     else if (m instanceof NoitMetricNumeric) {
153       nmn = (NoitMetricNumeric)m;
154     }
155     else if (m instanceof NoitMetricText) {
156       nmt = (NoitMetricText)m;
157     }
158     if(nmn != null) {
159       PersistentVector tags = PersistentVector.create(new java.lang.String[] {
160                       "reconnoiter",
161                       "check:" + nmn.getUuid(),
162                       "name:" + nmn.getCheck_name(),
163                       "module:" + nmn.getCheck_module(),
164                       "noit:" + nmn.getNoit() });
165       e = new Event(nmn.getCheck_target(), // host
166                     nmn.getName(), // service
167                     null, // state
168                     null, // description
169                     nmn.getValue(), // value
170                     tags, // tags
171                     nmn.getTime()/1000, //time
172                     10); //ttl
173     }
174     else if(nmt != null) {
175       PersistentVector tags = PersistentVector.create(new java.lang.String[] {
176                       "reconnoiter",
177                       "check:" + nmt.getUuid(),
178                       "name:" + nmt.getCheck_name(),
179                       "module:" + nmt.getCheck_module(),
180                       "noit:" + nmt.getNoit() });
181       e = new Event(nmt.getCheck_target(), // host
182                     nmt.getName(), // service
183                     null, // state
184                     nmt.getMessage(), // description
185                     null, // value
186                     tags, // tags
187                     nmt.getTime()/1000, //time
188                     10); //ttl
189     }
190     if(e != null) {
191       stream.invoke(core, e);
192     }
193   }
194 }
Note: See TracBrowser for help on using the browser.