Changeset 699c97c25d44d177d2979ba0008d46658927a912
- Timestamp:
- 08/06/09 13:21:06 (4 years ago)
- git-parent:
- Files:
-
- src/java/Makefile.in (modified) (1 diff)
- src/java/com/omniti/reconnoiter/EventHandler.java (modified) (3 diffs)
- src/java/com/omniti/reconnoiter/IEPEngine.java (modified) (1 diff)
- src/java/com/omniti/reconnoiter/MQListener.java (modified) (1 diff)
- src/java/com/omniti/reconnoiter/StratconMessage.java (modified) (1 diff)
- src/java/com/omniti/reconnoiter/event/StratconQuery.java (modified) (3 diffs)
- src/java/com/omniti/reconnoiter/event/StratconQueryBase.java (added)
- src/java/com/omniti/reconnoiter/event/StratconStatement.java (added)
- src/stratcon.conf.in (modified) (1 diff)
- src/stratcon_iep.c (modified) (5 diffs)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/java/Makefile.in
r2326243 r699c97c 32 32 com/omniti/reconnoiter/event/NoitEvent.java \ 33 33 com/omniti/reconnoiter/event/NoitMetricNumeric.java \ 34 com/omniti/reconnoiter/event/StratconStatement.java \ 35 com/omniti/reconnoiter/event/StratconQueryBase.java \ 34 36 com/omniti/reconnoiter/event/StratconQuery.java \ 35 37 com/omniti/reconnoiter/event/StratconQueryStop.java \ src/java/com/omniti/reconnoiter/EventHandler.java
r4c3fb9b r699c97c 10 10 import com.omniti.reconnoiter.event.NoitEvent; 11 11 import com.omniti.reconnoiter.event.NoitMetricNumeric; 12 import com.omniti.reconnoiter.event.StratconQueryBase; 13 import com.omniti.reconnoiter.event.StratconStatement; 12 14 import com.omniti.reconnoiter.event.StratconQuery; 13 15 import com.omniti.reconnoiter.event.StratconQueryStop; … … 15 17 public class EventHandler { 16 18 17 private EPServiceProvider epService;18 private ConcurrentHashMap<UUID, StratconQuery> queries;19 private IMQBroker broker;19 private EPServiceProvider epService; 20 private ConcurrentHashMap<UUID, StratconQueryBase> queries; 21 private IMQBroker broker; 20 22 21 public EventHandler(ConcurrentHashMap<UUID,StratconQuery> queries, EPServiceProvider epService, IMQBroker broker) {22 this.epService = epService;23 this.queries = queries;24 this.broker = broker;25 }23 public EventHandler(ConcurrentHashMap<UUID,StratconQueryBase> queries, EPServiceProvider epService, IMQBroker broker) { 24 this.epService = epService; 25 this.queries = queries; 26 this.broker = broker; 27 } 26 28 27 public void processMessage(String xml) throws Exception { 28 29 StratconMessage m = StratconMessage.makeMessage(xml); 29 public void processMessage(String xml) throws Exception { 30 StratconMessage m = StratconMessage.makeMessage(xml); 30 31 if(m == null) { 31 32 System.err.println("Can't grok:\n" + xml); 32 33 } 33 if(m instanceof StratconQuery) { 34 if(m instanceof StratconStatement) { 35 StratconStatement sq = (StratconStatement) m; 36 37 if(queries.containsKey(sq.getUUID())) throw (new Exception("Duplicate Query")); 38 39 EPStatement statement = epService.getEPAdministrator().createEPL(sq.getExpression()); 40 sq.setStatement(statement); 41 queries.put(sq.getUUID(), sq); 42 System.err.println("Creating Statement: " + sq.getUUID()); 43 } 44 else if(m instanceof StratconQuery) { 34 45 StratconQuery sq = (StratconQuery) m; 35 46 … … 46 57 } 47 58 else if(m instanceof StratconQueryStop) { 48 StratconQuery sq = queries.get(((StratconQueryStop) m).getUUID()); 59 /* QueryStop stops both queries and statements */ 60 StratconQueryBase sq = queries.get(((StratconQueryStop) m).getUUID()); 49 61 if(sq != null) { 50 62 queries.remove(sq.getUUID()); src/java/com/omniti/reconnoiter/IEPEngine.java
r4c3fb9b r699c97c 34 34 NoitEvent.registerTypes(epService); 35 35 36 epService.getEPAdministrator().createEPL("create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck");37 epService.getEPAdministrator().createEPL("insert into CheckDetails select * from NoitCheck");38 39 36 MQListener l = new MQListener(epService, BrokerFactory.getBroker(sconf)); 40 37 src/java/com/omniti/reconnoiter/MQListener.java
r4c3fb9b r699c97c 19 19 public class MQListener implements Runnable { 20 20 private EPServiceProvider epService; 21 private ConcurrentHashMap<UUID,StratconQuery > queries;21 private ConcurrentHashMap<UUID,StratconQueryBase> queries; 22 22 private IMQBroker broker; 23 23 24 24 public MQListener(EPServiceProvider epService, IMQBroker broker) { 25 this.queries = new ConcurrentHashMap<UUID,StratconQuery >();25 this.queries = new ConcurrentHashMap<UUID,StratconQueryBase>(); 26 26 this.epService = epService; 27 27 this.broker = broker; src/java/com/omniti/reconnoiter/StratconMessage.java
r2220278 r699c97c 59 59 } 60 60 // and requests 61 else if(tag.equals("StratconStatement")) 62 return new StratconStatement(document); 61 63 else if(tag.equals("StratconQuery")) 62 64 return new StratconQuery(document); src/java/com/omniti/reconnoiter/event/StratconQuery.java
r2220278 r699c97c 11 11 import java.lang.System; 12 12 13 import com.omniti.reconnoiter.event.StratconQueryBase; 13 14 import com.omniti.reconnoiter.StratconMessage; 14 15 import com.espertech.esper.client.EPStatement; … … 24 25 import org.w3c.dom.NodeList; 25 26 26 public class StratconQuery extends StratconMessage { 27 private EPStatement statement; 28 private UpdateListener listener; 29 private UUID uuid; 30 private String name; 31 private String expression; 27 public class StratconQuery extends StratconQueryBase { 28 protected UpdateListener listener; 29 protected String name; 32 30 33 31 public StratconQuery(Document d) { … … 45 43 if(uuid == null) uuid = UUID.randomUUID(); 46 44 } 47 public UUID getUUID() {48 return uuid;49 }50 public EPStatement getStatement() {51 return statement;52 }53 public String getExpression() {54 return expression;55 }56 45 public String getName() { 57 46 return name; 58 }59 public void setStatement(EPStatement s) {60 this.statement = s;61 47 } 62 48 public void setListener(UpdateListener l) { src/stratcon.conf.in
r9d91262 r699c97c 35 35 command="%iepbindir%/run-iep.sh" /> 36 36 <queries> 37 <statement id="6cc613a4-7f9c-11de-973f-db7e8ccb2e5c" provides="CheckDetails-ddl"> 38 <epl>create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck</epl> 39 </statement> 40 <statement id="76598f5e-7f9c-11de-9f5b-ebb4dcb2494e" provides="CheckDetails"> 41 <requires>CheckDetails-ddl</requires> 42 <epl>insert into CheckDetails select * from NoitCheck</epl> 43 </statement> 44 <statement id="ba189f08-7f99-11de-9013-733772d37479" provides="UnavailableStream"> 45 <requires>CheckDetails</requires> 46 <epl>insert into UnavailableStream 47 select p.* as delta, cds.target as target, cds.module as module, 48 cds.name as name, p.s.uuid as uuid 49 from pattern [ every 50 s=NoitStatus(availability='A') -> 51 ( n0 = NoitStatus(uuid=s.uuid, availability='U') 52 and not NoitStatus(uuid=s.uuid, availability='A')) 53 ].std:lastevent() as p 54 inner join CheckDetails as cds on cds.uuid = p.s.uuid 55 </epl> 56 </statement> 37 57 <query id="ce6bf8d2-3dd7-11de-a45c-a7df160cba9e" topic="status"> 38 select * from NoitStatus58 <epl>select * from NoitStatus</epl> 39 59 </query> 40 60 </queries> src/stratcon_iep.c
r6f4687b r699c97c 169 169 170 170 static xmlDocPtr 171 stratcon_iep_doc_from_statement(char *data, char *remote) { 172 xmlDocPtr doc; 173 char *parts[3]; 174 if(bust_to_parts(data, parts, 3) != 3) return NULL; 175 /* 'D' ID QUERY */ 176 177 NEWDOC(doc, "StratconStatement", 178 { 179 ADDCHILD("id", parts[1]); 180 ADDCHILD("expression", parts[2]); 181 }); 182 return doc; 183 } 184 185 static xmlDocPtr 171 186 stratcon_iep_doc_from_query(char *data, char *remote) { 172 187 xmlDocPtr doc; … … 205 220 case 'S': return stratcon_iep_doc_from_status(data, remote); 206 221 case 'M': return stratcon_iep_doc_from_metric(data, remote); 222 case 'D': return stratcon_iep_doc_from_statement(data, remote); 207 223 case 'Q': return stratcon_iep_doc_from_query(data, remote); 208 224 case 'q': return stratcon_iep_doc_from_querystop(data, remote); … … 222 238 } 223 239 return 0; 240 } 241 242 struct statement_node { 243 char *id; 244 char *statement; 245 char *provides; 246 int marked; /* helps with identifying cycles */ 247 int nrequires; 248 struct statement_node **requires; 249 }; 250 static void 251 statement_node_free(void *vstmt) { 252 struct statement_node *stmt = vstmt; 253 if(stmt->id) free(stmt->id); 254 if(stmt->statement) free(stmt->statement); 255 if(stmt->provides) free(stmt->provides); 256 if(stmt->requires) free(stmt->requires); 257 } 258 static int 259 stmt_mark_dag(struct statement_node *stmt, int mgen) { 260 int i; 261 assert(stmt->marked <= mgen); 262 if(stmt->marked == mgen) return -1; 263 if(stmt->marked > 0) return 0; /* validated in a previous sweep */ 264 stmt->marked = mgen; 265 for(i=0; i<stmt->nrequires; i++) 266 if(stmt_mark_dag(stmt->requires[i], mgen) < 0) return -1; 267 return 0; 268 } 269 static void 270 submit_statement_node(struct statement_node *stmt) { 271 int line_len, i; 272 char *line, *cp; 273 274 if(stmt->marked) return; 275 for(i=0; i<stmt->nrequires; i++) 276 submit_statement_node(stmt->requires[i]); 277 278 line_len = 3 /* 2 tabs + \0 */ + 279 1 /* 'D' */ + 1 /* '\n' */ + 280 strlen(stmt->id) + strlen(stmt->statement); 281 line = malloc(line_len); 282 snprintf(line, line_len, "D\t%s\t%s\n", stmt->id, stmt->statement); 283 cp = line; 284 while(cp[0] && cp[1]) { 285 if(*cp == '\n') *cp = ' '; 286 cp++; 287 } 288 noitL(noit_error, "submitting statement: %s\n", line); 289 stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 290 stmt->marked = 1; 291 } 292 void stratcon_iep_submit_statements() { 293 int i, cnt = 0; 294 noit_conf_section_t *statement_configs; 295 char path[256]; 296 struct statement_node *stmt; 297 void *vstmt; 298 noit_hash_table stmt_by_id = NOIT_HASH_EMPTY; 299 noit_hash_table stmt_by_provider = NOIT_HASH_EMPTY; 300 noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 301 const char *key; 302 int klen, mgen = 0; 303 304 snprintf(path, sizeof(path), "/stratcon/iep/queries//statement"); 305 statement_configs = noit_conf_get_sections(NULL, path, &cnt); 306 noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 307 308 /* Phase 1: sweep in all the statements */ 309 for(i=0; i<cnt; i++) { 310 char id[UUID_STR_LEN]; 311 char provides[256]; 312 char *statement; 313 314 if(!noit_conf_get_stringbuf(statement_configs[i], 315 "self::node()/@id", 316 id, sizeof(id))) { 317 noitL(noit_iep, "No uuid specified in query\n"); 318 continue; 319 } 320 if(!noit_conf_get_stringbuf(statement_configs[i], 321 "ancestor-or-self::node()/@provides", 322 provides, sizeof(provides))) { 323 provides[0] = '\0'; 324 } 325 if(!noit_conf_get_string(statement_configs[i], "self::node()/epl", 326 &statement)) { 327 noitL(noit_iep, "No contents specified in statement\n"); 328 continue; 329 } 330 stmt = calloc(1, sizeof(*stmt)); 331 stmt->id = strdup(id); 332 stmt->statement = statement; 333 stmt->provides = provides[0] ? strdup(provides) : NULL; 334 if(!noit_hash_store(&stmt_by_id, stmt->id, strlen(stmt->id), stmt)) { 335 noitL(noit_error, "Duplicate statement id: %s\n", stmt->id); 336 exit(-1); 337 } 338 if(stmt->provides) { 339 if(!noit_hash_store(&stmt_by_provider, stmt->provides, 340 strlen(stmt->provides), stmt)) { 341 noitL(noit_error, "Two statements provide: '%s'\n", stmt->provides); 342 exit(-1); 343 } 344 } 345 } 346 347 /* Phase 2: load the requires graph */ 348 for(i=0; i<cnt; i++) { 349 char id[UUID_STR_LEN]; 350 int rcnt, j; 351 char *requires; 352 noit_conf_section_t *reqs; 353 354 if(!noit_conf_get_stringbuf(statement_configs[i], 355 "self::node()/@id", 356 id, sizeof(id))) { 357 noitL(noit_iep, "No uuid specified in query\n"); 358 continue; 359 } 360 if(!noit_hash_retrieve(&stmt_by_id, id, strlen(id), &vstmt)) { 361 noitL(noit_error, "Cannot find statement: %s\n", id); 362 exit(-1); 363 } 364 stmt = vstmt; 365 reqs = noit_conf_get_sections(statement_configs[i], 366 "self::node()/requires", &rcnt); 367 if(rcnt > 0) { 368 stmt->requires = malloc(rcnt * sizeof(*(stmt->requires))); 369 for(j=0; j<rcnt; j++) { 370 void *vrstmt; 371 if(!noit_conf_get_string(reqs[j], "self::node()", 372 &requires) || requires[0] == '\0') { 373 continue; 374 } 375 if(!noit_hash_retrieve(&stmt_by_provider, requires, strlen(requires), 376 &vrstmt)) { 377 noitL(noit_error, 378 "Statement %s requires %s which no one provides.\n", 379 stmt->id, requires); 380 exit(-1); 381 } 382 stmt->requires[stmt->nrequires++] = vrstmt; 383 } 384 } 385 } 386 387 /* Phase 3: Recursive sweep and mark to detect cycles. 388 We're walking the graph backwards here from dependent to provider, 389 but a cycle is a cycle, so this validates the graph. */ 390 while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 391 stmt = vstmt; 392 if(stmt_mark_dag(stmt, ++mgen) < 0) { 393 noitL(noit_error, "Statement %s has a cyclic requirement\n", stmt->id); 394 exit(-1); 395 } 396 } 397 398 /* Phase 4: clean the markings */ 399 mgen = 0; 400 memset(&iter, 0, sizeof(iter)); 401 while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 402 stmt = vstmt; 403 stmt->marked = 0; 404 } 405 406 /* Phase 5: do the load */ 407 memset(&iter, 0, sizeof(iter)); 408 while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 409 stmt = vstmt; 410 submit_statement_node(stmt); 411 } 412 413 noit_hash_destroy(&stmt_by_provider, NULL, NULL); 414 noit_hash_destroy(&stmt_by_id, NULL, statement_node_free); 415 free(statement_configs); 224 416 } 225 417 … … 251 443 continue; 252 444 } 253 if(!noit_conf_get_string(query_configs[i], "self::node() ",445 if(!noit_conf_get_string(query_configs[i], "self::node()/epl", 254 446 &query)) { 255 447 noitL(noit_iep, "No contents specified in query\n"); … … 368 560 } 369 561 #endif 562 stratcon_iep_submit_statements(); 370 563 stratcon_datastore_iep_check_preload(); 371 564 stratcon_iep_submit_queries();
