Changeset 699c97c25d44d177d2979ba0008d46658927a912

Show
Ignore:
Timestamp:
08/06/09 13:21:06 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1249564866 +0000
git-parent:

[e754a7ed9a2306bb7b477de6a099909fc3234c53]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1249564866 +0000
Message:

works for me. Ensures there is a (perhaps non-connected) DAG of statement dependencies and runs them, then loads data, then runs queries. refs #162

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/Makefile.in

    r2326243 r699c97c  
    3232        com/omniti/reconnoiter/event/NoitEvent.java \ 
    3333        com/omniti/reconnoiter/event/NoitMetricNumeric.java \ 
     34        com/omniti/reconnoiter/event/StratconStatement.java \ 
     35        com/omniti/reconnoiter/event/StratconQueryBase.java \ 
    3436        com/omniti/reconnoiter/event/StratconQuery.java \ 
    3537        com/omniti/reconnoiter/event/StratconQueryStop.java \ 
  • src/java/com/omniti/reconnoiter/EventHandler.java

    r4c3fb9b r699c97c  
    1010import com.omniti.reconnoiter.event.NoitEvent; 
    1111import com.omniti.reconnoiter.event.NoitMetricNumeric; 
     12import com.omniti.reconnoiter.event.StratconQueryBase; 
     13import com.omniti.reconnoiter.event.StratconStatement; 
    1214import com.omniti.reconnoiter.event.StratconQuery; 
    1315import com.omniti.reconnoiter.event.StratconQueryStop; 
     
    1517public class EventHandler { 
    1618         
    17        private EPServiceProvider epService; 
    18        private ConcurrentHashMap<UUID, StratconQuery> queries; 
    19        private IMQBroker broker; 
     19  private EPServiceProvider epService; 
     20  private ConcurrentHashMap<UUID, StratconQueryBase> queries; 
     21  private IMQBroker broker; 
    2022 
    21        public EventHandler(ConcurrentHashMap<UUID,StratconQuery> queries, EPServiceProvider epService, IMQBroker broker) { 
    22                this.epService = epService; 
    23                this.queries = queries; 
    24                this.broker = broker; 
    25        
     23  public EventHandler(ConcurrentHashMap<UUID,StratconQueryBase> queries, EPServiceProvider epService, IMQBroker broker) { 
     24    this.epService = epService; 
     25    this.queries = queries; 
     26    this.broker = broker; 
     27 
    2628         
    27         public void processMessage(String xml) throws Exception { 
    28                  
    29                 StratconMessage m = StratconMessage.makeMessage(xml); 
     29  public void processMessage(String xml) throws Exception { 
     30    StratconMessage m = StratconMessage.makeMessage(xml); 
    3031    if(m == null) { 
    3132      System.err.println("Can't grok:\n" + xml); 
    3233    } 
    33     if(m instanceof StratconQuery) { 
     34    if(m instanceof StratconStatement) { 
     35      StratconStatement sq = (StratconStatement) m; 
     36 
     37      if(queries.containsKey(sq.getUUID())) throw (new Exception("Duplicate Query")); 
     38 
     39      EPStatement statement = epService.getEPAdministrator().createEPL(sq.getExpression()); 
     40      sq.setStatement(statement); 
     41      queries.put(sq.getUUID(), sq); 
     42      System.err.println("Creating Statement: " + sq.getUUID()); 
     43    } 
     44    else if(m instanceof StratconQuery) { 
    3445      StratconQuery sq = (StratconQuery) m; 
    3546 
     
    4657    } 
    4758    else if(m instanceof StratconQueryStop) { 
    48       StratconQuery sq = queries.get(((StratconQueryStop) m).getUUID()); 
     59      /* QueryStop stops both queries and statements */ 
     60      StratconQueryBase sq = queries.get(((StratconQueryStop) m).getUUID()); 
    4961      if(sq != null) { 
    5062        queries.remove(sq.getUUID()); 
  • src/java/com/omniti/reconnoiter/IEPEngine.java

    r4c3fb9b r699c97c  
    3434    NoitEvent.registerTypes(epService); 
    3535 
    36     epService.getEPAdministrator().createEPL("create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck"); 
    37     epService.getEPAdministrator().createEPL("insert into CheckDetails select * from NoitCheck"); 
    38  
    3936    MQListener l = new MQListener(epService, BrokerFactory.getBroker(sconf)); 
    4037 
  • src/java/com/omniti/reconnoiter/MQListener.java

    r4c3fb9b r699c97c  
    1919public class MQListener implements Runnable { 
    2020    private EPServiceProvider epService; 
    21     private ConcurrentHashMap<UUID,StratconQuery> queries; 
     21    private ConcurrentHashMap<UUID,StratconQueryBase> queries; 
    2222    private IMQBroker broker; 
    2323 
    2424    public MQListener(EPServiceProvider epService, IMQBroker broker) { 
    25       this.queries = new ConcurrentHashMap<UUID,StratconQuery>(); 
     25      this.queries = new ConcurrentHashMap<UUID,StratconQueryBase>(); 
    2626      this.epService = epService; 
    2727      this.broker = broker; 
  • src/java/com/omniti/reconnoiter/StratconMessage.java

    r2220278 r699c97c  
    5959      } 
    6060      // and requests 
     61      else if(tag.equals("StratconStatement")) 
     62        return new StratconStatement(document); 
    6163      else if(tag.equals("StratconQuery")) 
    6264        return new StratconQuery(document); 
  • src/java/com/omniti/reconnoiter/event/StratconQuery.java

    r2220278 r699c97c  
    1111import java.lang.System; 
    1212 
     13import com.omniti.reconnoiter.event.StratconQueryBase; 
    1314import com.omniti.reconnoiter.StratconMessage; 
    1415import com.espertech.esper.client.EPStatement; 
     
    2425import org.w3c.dom.NodeList; 
    2526 
    26 public class StratconQuery extends StratconMessage { 
    27   private EPStatement statement; 
    28   private UpdateListener listener; 
    29   private UUID uuid; 
    30   private String name; 
    31   private String expression; 
     27public class StratconQuery extends StratconQueryBase { 
     28  protected UpdateListener listener; 
     29  protected String name; 
    3230 
    3331  public StratconQuery(Document d) { 
     
    4543    if(uuid == null) uuid = UUID.randomUUID(); 
    4644  } 
    47   public UUID getUUID() { 
    48     return uuid; 
    49   } 
    50   public EPStatement getStatement() { 
    51     return statement; 
    52   } 
    53   public String getExpression() { 
    54     return expression; 
    55   } 
    5645  public String getName() { 
    5746    return name; 
    58   } 
    59   public void setStatement(EPStatement s) { 
    60     this.statement = s; 
    6147  } 
    6248  public void setListener(UpdateListener l) { 
  • src/stratcon.conf.in

    r9d91262 r699c97c  
    3535           command="%iepbindir%/run-iep.sh" /> 
    3636    <queries> 
     37      <statement id="6cc613a4-7f9c-11de-973f-db7e8ccb2e5c" provides="CheckDetails-ddl"> 
     38        <epl>create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck</epl> 
     39      </statement> 
     40      <statement id="76598f5e-7f9c-11de-9f5b-ebb4dcb2494e" provides="CheckDetails"> 
     41        <requires>CheckDetails-ddl</requires> 
     42        <epl>insert into CheckDetails select * from NoitCheck</epl> 
     43      </statement> 
     44      <statement id="ba189f08-7f99-11de-9013-733772d37479" provides="UnavailableStream"> 
     45        <requires>CheckDetails</requires> 
     46        <epl>insert into UnavailableStream 
     47             select p.* as delta, cds.target as target, cds.module as module, 
     48                    cds.name as name, p.s.uuid as uuid 
     49             from pattern [ every 
     50                            s=NoitStatus(availability='A') -> 
     51                            ( n0 = NoitStatus(uuid=s.uuid, availability='U') 
     52                              and not NoitStatus(uuid=s.uuid, availability='A')) 
     53                          ].std:lastevent() as p 
     54             inner join CheckDetails as cds on cds.uuid = p.s.uuid 
     55        </epl> 
     56      </statement> 
    3757      <query id="ce6bf8d2-3dd7-11de-a45c-a7df160cba9e" topic="status"> 
    38         select * from NoitStatus 
     58        <epl>select * from NoitStatus</epl> 
    3959      </query> 
    4060    </queries> 
  • src/stratcon_iep.c

    r6f4687b r699c97c  
    169169 
    170170static xmlDocPtr 
     171stratcon_iep_doc_from_statement(char *data, char *remote) { 
     172  xmlDocPtr doc; 
     173  char *parts[3]; 
     174  if(bust_to_parts(data, parts, 3) != 3) return NULL; 
     175  /*  'D' ID QUERY  */ 
     176 
     177  NEWDOC(doc, "StratconStatement", 
     178         { 
     179           ADDCHILD("id", parts[1]); 
     180           ADDCHILD("expression", parts[2]); 
     181         }); 
     182  return doc; 
     183} 
     184 
     185static xmlDocPtr 
    171186stratcon_iep_doc_from_query(char *data, char *remote) { 
    172187  xmlDocPtr doc; 
     
    205220      case 'S': return stratcon_iep_doc_from_status(data, remote); 
    206221      case 'M': return stratcon_iep_doc_from_metric(data, remote); 
     222      case 'D': return stratcon_iep_doc_from_statement(data, remote); 
    207223      case 'Q': return stratcon_iep_doc_from_query(data, remote); 
    208224      case 'q': return stratcon_iep_doc_from_querystop(data, remote); 
     
    222238  } 
    223239  return 0; 
     240} 
     241 
     242struct statement_node { 
     243  char *id; 
     244  char *statement; 
     245  char *provides; 
     246  int marked; /* helps with identifying cycles */ 
     247  int nrequires; 
     248  struct statement_node **requires; 
     249}; 
     250static void 
     251statement_node_free(void *vstmt) { 
     252  struct statement_node *stmt = vstmt; 
     253  if(stmt->id) free(stmt->id); 
     254  if(stmt->statement) free(stmt->statement); 
     255  if(stmt->provides) free(stmt->provides); 
     256  if(stmt->requires) free(stmt->requires); 
     257} 
     258static int 
     259stmt_mark_dag(struct statement_node *stmt, int mgen) { 
     260  int i; 
     261  assert(stmt->marked <= mgen); 
     262  if(stmt->marked == mgen) return -1; 
     263  if(stmt->marked > 0) return 0; /* validated in a previous sweep */ 
     264  stmt->marked = mgen; 
     265  for(i=0; i<stmt->nrequires; i++) 
     266    if(stmt_mark_dag(stmt->requires[i], mgen) < 0) return -1; 
     267  return 0; 
     268} 
     269static void 
     270submit_statement_node(struct statement_node *stmt) { 
     271  int line_len, i; 
     272  char *line, *cp; 
     273 
     274  if(stmt->marked) return; 
     275  for(i=0; i<stmt->nrequires; i++) 
     276    submit_statement_node(stmt->requires[i]); 
     277 
     278  line_len = 3 /* 2 tabs + \0 */ + 
     279             1 /* 'D' */ + 1 /* '\n' */ + 
     280             strlen(stmt->id) + strlen(stmt->statement); 
     281  line = malloc(line_len); 
     282  snprintf(line, line_len, "D\t%s\t%s\n", stmt->id, stmt->statement); 
     283  cp = line; 
     284  while(cp[0] && cp[1]) { 
     285    if(*cp == '\n') *cp = ' '; 
     286    cp++; 
     287  } 
     288  noitL(noit_error, "submitting statement: %s\n", line); 
     289  stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 
     290  stmt->marked = 1; 
     291} 
     292void stratcon_iep_submit_statements() { 
     293  int i, cnt = 0; 
     294  noit_conf_section_t *statement_configs; 
     295  char path[256]; 
     296  struct statement_node *stmt; 
     297  void *vstmt; 
     298  noit_hash_table stmt_by_id = NOIT_HASH_EMPTY; 
     299  noit_hash_table stmt_by_provider = NOIT_HASH_EMPTY; 
     300  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     301  const char *key; 
     302  int klen, mgen = 0; 
     303 
     304  snprintf(path, sizeof(path), "/stratcon/iep/queries//statement"); 
     305  statement_configs = noit_conf_get_sections(NULL, path, &cnt); 
     306  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 
     307 
     308  /* Phase 1: sweep in all the statements */ 
     309  for(i=0; i<cnt; i++) { 
     310    char id[UUID_STR_LEN]; 
     311    char provides[256]; 
     312    char *statement; 
     313 
     314    if(!noit_conf_get_stringbuf(statement_configs[i], 
     315                                "self::node()/@id", 
     316                                id, sizeof(id))) { 
     317      noitL(noit_iep, "No uuid specified in query\n"); 
     318      continue; 
     319    } 
     320    if(!noit_conf_get_stringbuf(statement_configs[i], 
     321                                "ancestor-or-self::node()/@provides", 
     322                                provides, sizeof(provides))) { 
     323      provides[0] = '\0'; 
     324    } 
     325    if(!noit_conf_get_string(statement_configs[i], "self::node()/epl", 
     326                             &statement)) { 
     327      noitL(noit_iep, "No contents specified in statement\n"); 
     328      continue; 
     329    } 
     330    stmt = calloc(1, sizeof(*stmt)); 
     331    stmt->id = strdup(id); 
     332    stmt->statement = statement; 
     333    stmt->provides = provides[0] ? strdup(provides) : NULL; 
     334    if(!noit_hash_store(&stmt_by_id, stmt->id, strlen(stmt->id), stmt)) { 
     335      noitL(noit_error, "Duplicate statement id: %s\n", stmt->id); 
     336      exit(-1); 
     337    } 
     338    if(stmt->provides) { 
     339      if(!noit_hash_store(&stmt_by_provider, stmt->provides, 
     340                          strlen(stmt->provides), stmt)) { 
     341        noitL(noit_error, "Two statements provide: '%s'\n", stmt->provides); 
     342        exit(-1); 
     343      } 
     344    } 
     345  } 
     346 
     347  /* Phase 2: load the requires graph */ 
     348  for(i=0; i<cnt; i++) { 
     349    char id[UUID_STR_LEN]; 
     350    int rcnt, j; 
     351    char *requires; 
     352    noit_conf_section_t *reqs; 
     353 
     354    if(!noit_conf_get_stringbuf(statement_configs[i], 
     355                                "self::node()/@id", 
     356                                id, sizeof(id))) { 
     357      noitL(noit_iep, "No uuid specified in query\n"); 
     358      continue; 
     359    } 
     360    if(!noit_hash_retrieve(&stmt_by_id, id, strlen(id), &vstmt)) { 
     361      noitL(noit_error, "Cannot find statement: %s\n", id); 
     362      exit(-1); 
     363    } 
     364    stmt = vstmt; 
     365    reqs = noit_conf_get_sections(statement_configs[i], 
     366                                  "self::node()/requires", &rcnt); 
     367    if(rcnt > 0) { 
     368      stmt->requires = malloc(rcnt * sizeof(*(stmt->requires))); 
     369      for(j=0; j<rcnt; j++) { 
     370        void *vrstmt; 
     371        if(!noit_conf_get_string(reqs[j], "self::node()", 
     372                                 &requires) || requires[0] == '\0') { 
     373          continue; 
     374        } 
     375        if(!noit_hash_retrieve(&stmt_by_provider, requires, strlen(requires), 
     376                               &vrstmt)) { 
     377          noitL(noit_error, 
     378                "Statement %s requires %s which no one provides.\n", 
     379                stmt->id, requires); 
     380          exit(-1); 
     381        } 
     382        stmt->requires[stmt->nrequires++] = vrstmt; 
     383      } 
     384    } 
     385  } 
     386 
     387  /* Phase 3: Recursive sweep and mark to detect cycles. 
     388     We're walking the graph backwards here from dependent to provider, 
     389     but a cycle is a cycle, so this validates the graph. */ 
     390  while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 
     391    stmt = vstmt; 
     392    if(stmt_mark_dag(stmt, ++mgen) < 0) { 
     393      noitL(noit_error, "Statement %s has a cyclic requirement\n", stmt->id); 
     394      exit(-1); 
     395    } 
     396  } 
     397 
     398  /* Phase 4: clean the markings */ 
     399  mgen = 0; 
     400  memset(&iter, 0, sizeof(iter)); 
     401  while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 
     402    stmt = vstmt; 
     403    stmt->marked = 0; 
     404  } 
     405 
     406  /* Phase 5: do the load */ 
     407  memset(&iter, 0, sizeof(iter)); 
     408  while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 
     409    stmt = vstmt; 
     410    submit_statement_node(stmt); 
     411  } 
     412 
     413  noit_hash_destroy(&stmt_by_provider, NULL, NULL); 
     414  noit_hash_destroy(&stmt_by_id, NULL, statement_node_free); 
     415  free(statement_configs); 
    224416} 
    225417 
     
    251443      continue; 
    252444    } 
    253     if(!noit_conf_get_string(query_configs[i], "self::node()", 
     445    if(!noit_conf_get_string(query_configs[i], "self::node()/epl", 
    254446                             &query)) { 
    255447      noitL(noit_iep, "No contents specified in query\n"); 
     
    368560     } 
    369561#endif 
     562     stratcon_iep_submit_statements(); 
    370563     stratcon_datastore_iep_check_preload(); 
    371564     stratcon_iep_submit_queries();