Show
Ignore:
Timestamp:
08/06/09 13:21:06 (9 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1249564866 +0000
git-parent:

[e754a7ed9a2306bb7b477de6a099909fc3234c53]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1249564866 +0000
Message:

works for me. Ensures there is a (perhaps non-connected) DAG of statement dependencies and runs them, then loads data, then runs queries. refs #162

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon_iep.c

    r6f4687b r699c97c  
    169169 
    170170static xmlDocPtr 
     171stratcon_iep_doc_from_statement(char *data, char *remote) { 
     172  xmlDocPtr doc; 
     173  char *parts[3]; 
     174  if(bust_to_parts(data, parts, 3) != 3) return NULL; 
     175  /*  'D' ID QUERY  */ 
     176 
     177  NEWDOC(doc, "StratconStatement", 
     178         { 
     179           ADDCHILD("id", parts[1]); 
     180           ADDCHILD("expression", parts[2]); 
     181         }); 
     182  return doc; 
     183} 
     184 
     185static xmlDocPtr 
    171186stratcon_iep_doc_from_query(char *data, char *remote) { 
    172187  xmlDocPtr doc; 
     
    205220      case 'S': return stratcon_iep_doc_from_status(data, remote); 
    206221      case 'M': return stratcon_iep_doc_from_metric(data, remote); 
     222      case 'D': return stratcon_iep_doc_from_statement(data, remote); 
    207223      case 'Q': return stratcon_iep_doc_from_query(data, remote); 
    208224      case 'q': return stratcon_iep_doc_from_querystop(data, remote); 
     
    222238  } 
    223239  return 0; 
     240} 
     241 
     242struct statement_node { 
     243  char *id; 
     244  char *statement; 
     245  char *provides; 
     246  int marked; /* helps with identifying cycles */ 
     247  int nrequires; 
     248  struct statement_node **requires; 
     249}; 
     250static void 
     251statement_node_free(void *vstmt) { 
     252  struct statement_node *stmt = vstmt; 
     253  if(stmt->id) free(stmt->id); 
     254  if(stmt->statement) free(stmt->statement); 
     255  if(stmt->provides) free(stmt->provides); 
     256  if(stmt->requires) free(stmt->requires); 
     257} 
     258static int 
     259stmt_mark_dag(struct statement_node *stmt, int mgen) { 
     260  int i; 
     261  assert(stmt->marked <= mgen); 
     262  if(stmt->marked == mgen) return -1; 
     263  if(stmt->marked > 0) return 0; /* validated in a previous sweep */ 
     264  stmt->marked = mgen; 
     265  for(i=0; i<stmt->nrequires; i++) 
     266    if(stmt_mark_dag(stmt->requires[i], mgen) < 0) return -1; 
     267  return 0; 
     268} 
     269static void 
     270submit_statement_node(struct statement_node *stmt) { 
     271  int line_len, i; 
     272  char *line, *cp; 
     273 
     274  if(stmt->marked) return; 
     275  for(i=0; i<stmt->nrequires; i++) 
     276    submit_statement_node(stmt->requires[i]); 
     277 
     278  line_len = 3 /* 2 tabs + \0 */ + 
     279             1 /* 'D' */ + 1 /* '\n' */ + 
     280             strlen(stmt->id) + strlen(stmt->statement); 
     281  line = malloc(line_len); 
     282  snprintf(line, line_len, "D\t%s\t%s\n", stmt->id, stmt->statement); 
     283  cp = line; 
     284  while(cp[0] && cp[1]) { 
     285    if(*cp == '\n') *cp = ' '; 
     286    cp++; 
     287  } 
     288  noitL(noit_error, "submitting statement: %s\n", line); 
     289  stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 
     290  stmt->marked = 1; 
     291} 
     292void stratcon_iep_submit_statements() { 
     293  int i, cnt = 0; 
     294  noit_conf_section_t *statement_configs; 
     295  char path[256]; 
     296  struct statement_node *stmt; 
     297  void *vstmt; 
     298  noit_hash_table stmt_by_id = NOIT_HASH_EMPTY; 
     299  noit_hash_table stmt_by_provider = NOIT_HASH_EMPTY; 
     300  noit_hash_iter iter = NOIT_HASH_ITER_ZERO; 
     301  const char *key; 
     302  int klen, mgen = 0; 
     303 
     304  snprintf(path, sizeof(path), "/stratcon/iep/queries//statement"); 
     305  statement_configs = noit_conf_get_sections(NULL, path, &cnt); 
     306  noitL(noit_debug, "Found %d %s stanzas\n", cnt, path); 
     307 
     308  /* Phase 1: sweep in all the statements */ 
     309  for(i=0; i<cnt; i++) { 
     310    char id[UUID_STR_LEN]; 
     311    char provides[256]; 
     312    char *statement; 
     313 
     314    if(!noit_conf_get_stringbuf(statement_configs[i], 
     315                                "self::node()/@id", 
     316                                id, sizeof(id))) { 
     317      noitL(noit_iep, "No uuid specified in query\n"); 
     318      continue; 
     319    } 
     320    if(!noit_conf_get_stringbuf(statement_configs[i], 
     321                                "ancestor-or-self::node()/@provides", 
     322                                provides, sizeof(provides))) { 
     323      provides[0] = '\0'; 
     324    } 
     325    if(!noit_conf_get_string(statement_configs[i], "self::node()/epl", 
     326                             &statement)) { 
     327      noitL(noit_iep, "No contents specified in statement\n"); 
     328      continue; 
     329    } 
     330    stmt = calloc(1, sizeof(*stmt)); 
     331    stmt->id = strdup(id); 
     332    stmt->statement = statement; 
     333    stmt->provides = provides[0] ? strdup(provides) : NULL; 
     334    if(!noit_hash_store(&stmt_by_id, stmt->id, strlen(stmt->id), stmt)) { 
     335      noitL(noit_error, "Duplicate statement id: %s\n", stmt->id); 
     336      exit(-1); 
     337    } 
     338    if(stmt->provides) { 
     339      if(!noit_hash_store(&stmt_by_provider, stmt->provides, 
     340                          strlen(stmt->provides), stmt)) { 
     341        noitL(noit_error, "Two statements provide: '%s'\n", stmt->provides); 
     342        exit(-1); 
     343      } 
     344    } 
     345  } 
     346 
     347  /* Phase 2: load the requires graph */ 
     348  for(i=0; i<cnt; i++) { 
     349    char id[UUID_STR_LEN]; 
     350    int rcnt, j; 
     351    char *requires; 
     352    noit_conf_section_t *reqs; 
     353 
     354    if(!noit_conf_get_stringbuf(statement_configs[i], 
     355                                "self::node()/@id", 
     356                                id, sizeof(id))) { 
     357      noitL(noit_iep, "No uuid specified in query\n"); 
     358      continue; 
     359    } 
     360    if(!noit_hash_retrieve(&stmt_by_id, id, strlen(id), &vstmt)) { 
     361      noitL(noit_error, "Cannot find statement: %s\n", id); 
     362      exit(-1); 
     363    } 
     364    stmt = vstmt; 
     365    reqs = noit_conf_get_sections(statement_configs[i], 
     366                                  "self::node()/requires", &rcnt); 
     367    if(rcnt > 0) { 
     368      stmt->requires = malloc(rcnt * sizeof(*(stmt->requires))); 
     369      for(j=0; j<rcnt; j++) { 
     370        void *vrstmt; 
     371        if(!noit_conf_get_string(reqs[j], "self::node()", 
     372                                 &requires) || requires[0] == '\0') { 
     373          continue; 
     374        } 
     375        if(!noit_hash_retrieve(&stmt_by_provider, requires, strlen(requires), 
     376                               &vrstmt)) { 
     377          noitL(noit_error, 
     378                "Statement %s requires %s which no one provides.\n", 
     379                stmt->id, requires); 
     380          exit(-1); 
     381        } 
     382        stmt->requires[stmt->nrequires++] = vrstmt; 
     383      } 
     384    } 
     385  } 
     386 
     387  /* Phase 3: Recursive sweep and mark to detect cycles. 
     388     We're walking the graph backwards here from dependent to provider, 
     389     but a cycle is a cycle, so this validates the graph. */ 
     390  while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 
     391    stmt = vstmt; 
     392    if(stmt_mark_dag(stmt, ++mgen) < 0) { 
     393      noitL(noit_error, "Statement %s has a cyclic requirement\n", stmt->id); 
     394      exit(-1); 
     395    } 
     396  } 
     397 
     398  /* Phase 4: clean the markings */ 
     399  mgen = 0; 
     400  memset(&iter, 0, sizeof(iter)); 
     401  while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 
     402    stmt = vstmt; 
     403    stmt->marked = 0; 
     404  } 
     405 
     406  /* Phase 5: do the load */ 
     407  memset(&iter, 0, sizeof(iter)); 
     408  while(noit_hash_next(&stmt_by_id, &iter, &key, &klen, &vstmt)) { 
     409    stmt = vstmt; 
     410    submit_statement_node(stmt); 
     411  } 
     412 
     413  noit_hash_destroy(&stmt_by_provider, NULL, NULL); 
     414  noit_hash_destroy(&stmt_by_id, NULL, statement_node_free); 
     415  free(statement_configs); 
    224416} 
    225417 
     
    251443      continue; 
    252444    } 
    253     if(!noit_conf_get_string(query_configs[i], "self::node()", 
     445    if(!noit_conf_get_string(query_configs[i], "self::node()/epl", 
    254446                             &query)) { 
    255447      noitL(noit_iep, "No contents specified in query\n"); 
     
    368560     } 
    369561#endif 
     562     stratcon_iep_submit_statements(); 
    370563     stratcon_datastore_iep_check_preload(); 
    371564     stratcon_iep_submit_queries();