Changeset 24eac7c456bb2802cf4a695e79f812cc3a0b4b0c

Show
Ignore:
Timestamp:
05/13/09 03:30:23 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1242185423 +0000
git-parent:

[a9077178423e39a94a9b624e44cd4b37899d6fd3]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1242185423 +0000
Message:

variety of stuff. fixing some bugs and adding checks into the data model, refs #122

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/java/Makefile.in

    ra907717 r24eac7c  
    2626        com/omniti/reconnoiter/AMQOutput.java \ 
    2727        com/omniti/reconnoiter/event/NoitEvent.java \ 
    28         com/omniti/reconnoiter/event/StatusEvent.java \ 
    2928        com/omniti/reconnoiter/event/StratconQuery.java \ 
    3029        com/omniti/reconnoiter/event/StratconQueryStop.java \ 
  • src/java/com/omniti/reconnoiter/AMQListener.java

    ra907717 r24eac7c  
    3636      this.queries = new ConcurrentHashMap<UUID,StratconQuery>(); 
    3737      this.epService = epService; 
    38       NoitEvent.registerTypes(epService); 
    3938      try { 
    4039        // we just need it started up 
  • src/java/com/omniti/reconnoiter/AMQOutput.java

    ra907717 r24eac7c  
    4646    } 
    4747    public void update(EventBean[] newEvents, EventBean[] oldEvents) { 
    48       EventBean event = newEvents[0]; 
    49  
    50       JSONEventRenderer jsonRenderer = epService.getEPRuntime(). 
    51                                                  getEventRenderer(). 
    52                                                  getJSONRenderer(statement.getEventType()); 
    53       String output = jsonRenderer.render("MyEvent", event); 
    54       try { 
    55         TextMessage message = session.createTextMessage(output); 
    56         producer.send(message); 
    57       }  catch(JMSException e) { 
    58         System.err.println(e); 
     48      for(int i = 0; i < newEvents.length; i++) { 
     49        EventBean event = newEvents[i]; 
     50   
     51        JSONEventRenderer jsonRenderer = epService.getEPRuntime(). 
     52                                                   getEventRenderer(). 
     53                                                   getJSONRenderer(statement.getEventType()); 
     54        String output = jsonRenderer.render("MyEvent", event); 
     55        try { 
     56          TextMessage message = session.createTextMessage(output); 
     57          producer.send(message); 
     58        }  catch(JMSException e) { 
     59          System.err.println(e); 
     60        } 
     61        System.err.println(output); 
    5962      } 
    60       System.err.println(output); 
    6163    } 
    6264} 
  • src/java/com/omniti/reconnoiter/IEPEngine.java

    ra907717 r24eac7c  
    55import java.io.InputStreamReader; 
    66import com.omniti.reconnoiter.AMQListener; 
     7import com.omniti.reconnoiter.event.NoitEvent; 
    78import com.espertech.esper.client.*; 
     9import com.espertech.esper.client.soda.*; 
    810 
    911import org.apache.log4j.BasicConfigurator; 
     
    1416 
    1517    Configuration config = new Configuration(); 
     18    EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); 
    1619    config.addEventTypeAutoName("com.omniti.reconnoiter.event"); 
    17     EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config); 
     20    NoitEvent.registerTypes(epService); 
     21 
     22    epService.getEPAdministrator().createEPL("create window CheckDetails.std:unique(uuid).win:keepall() as NoitCheck"); 
     23    epService.getEPAdministrator().createEPL("insert into CheckDetails select * from NoitCheck"); 
    1824 
    1925    AMQListener l = new AMQListener(epService); 
  • src/java/com/omniti/reconnoiter/StratconMessage.java

    ra907717 r24eac7c  
    4040      if(tag.equals("NoitStatus") || 
    4141         tag.equals("NoitMetricNumeric") || 
    42          tag.equals("NoitMetricText")) 
     42         tag.equals("NoitMetricText") || 
     43         tag.equals("NoitCheck")) 
    4344        return new NoitEvent(document); 
    4445      // and requests 
  • src/java/com/omniti/reconnoiter/event/NoitEvent.java

    ra907717 r24eac7c  
    2020 
    2121    cfg = new ConfigurationEventTypeXMLDOM(); 
     22    cfg.addXPathProperty("uuid", "/NoitCheck/id", XPathConstants.STRING); 
     23    cfg.addXPathProperty("target", "/NoitCheck/target", XPathConstants.STRING); 
     24    cfg.addXPathProperty("module", "/NoitCheck/module", XPathConstants.STRING); 
     25    cfg.addXPathProperty("name", "/NoitCheck/name", XPathConstants.STRING); 
     26    cfg.addXPathProperty("noit", "/NoitCheck/remote", XPathConstants.STRING); 
     27    cfg.setRootElementName("NoitCheck"); 
     28    epService.getEPAdministrator().getConfiguration() 
     29             .addEventType("NoitCheck", cfg); 
     30 
     31    cfg = new ConfigurationEventTypeXMLDOM(); 
    2232    cfg.addXPathProperty("uuid", "/NoitStatus/id", XPathConstants.STRING); 
    2333    cfg.addXPathProperty("status", "/NoitStatus/status", XPathConstants.STRING); 
     
    2535    cfg.addXPathProperty("availability", "/NoitStatus/availability", XPathConstants.STRING); 
    2636    cfg.addXPathProperty("duration", "/NoitStatus/duration", XPathConstants.NUMBER); 
     37    cfg.addXPathProperty("noit", "/NoitStatus/remote", XPathConstants.STRING); 
    2738    cfg.setRootElementName("NoitStatus"); 
    2839    epService.getEPAdministrator().getConfiguration() 
     
    3041 
    3142    cfg = new ConfigurationEventTypeXMLDOM(); 
    32     cfg.addXPathProperty("uuid", "/NoitStatus/id", XPathConstants.STRING); 
    33     cfg.addXPathProperty("name", "/NoitStatus/name", XPathConstants.STRING); 
    34     cfg.addXPathProperty("value", "/NoitStatus/value", XPathConstants.NUMBER); 
     43    cfg.addXPathProperty("uuid", "/NoitMetricNumeric/id", XPathConstants.STRING); 
     44    cfg.addXPathProperty("name", "/NoitMetricNumeric/name", XPathConstants.STRING); 
     45    cfg.addXPathProperty("value", "/NoitMetricNumeric/value", XPathConstants.NUMBER); 
     46    cfg.addXPathProperty("noit", "/NoitMetricNumeric/remote", XPathConstants.STRING); 
    3547    cfg.setRootElementName("NoitMetricNumeric"); 
    3648    epService.getEPAdministrator().getConfiguration() 
     
    3850 
    3951    cfg = new ConfigurationEventTypeXMLDOM(); 
    40     cfg.addXPathProperty("uuid", "/NoitStatus/id", XPathConstants.STRING); 
    41     cfg.addXPathProperty("name", "/NoitStatus/name", XPathConstants.STRING); 
    42     cfg.addXPathProperty("message", "/NoitStatus/message", XPathConstants.STRING); 
     52    cfg.addXPathProperty("uuid", "/NoitMetricText/id", XPathConstants.STRING); 
     53    cfg.addXPathProperty("name", "/NoitMetricText/name", XPathConstants.STRING); 
     54    cfg.addXPathProperty("message", "/NoitMetricText/message", XPathConstants.STRING); 
     55    cfg.addXPathProperty("noit", "/NoitMetricText/remote", XPathConstants.STRING); 
    4356    cfg.setRootElementName("NoitMetricText"); 
    4457    epService.getEPAdministrator().getConfiguration() 
  • src/java/com/omniti/reconnoiter/event/StratconQuery.java

    ra907717 r24eac7c  
    3636    if(name == null) name = "default"; 
    3737    if(uuid == null) uuid = UUID.randomUUID(); 
    38 System.err.println("In StratconQuery Constructor Name: " + name); 
    39 System.err.println("In StratconQuery Constructor id: " + uuid); 
    40 System.err.println("In StratconQuery Constructor expression: " + expression); 
    4138  } 
    4239  public UUID getUUID() { 
  • src/stratcon.conf.in

    ra907717 r24eac7c  
    4848    </dbconfig> 
    4949    <statements> 
     50      <allchecks><![CDATA[ 
     51        SELECT remote_address, id, target, module, name 
     52          FROM stratcon.mv_loading_dock_check_s 
     53      ]]></allchecks> 
    5054      <findcheck><![CDATA[ 
    5155        SELECT remote_address, id 
  • src/stratcon_datastore.c

    ra907717 r24eac7c  
    1010#include "stratcon_datastore.h" 
    1111#include "stratcon_realtime_http.h" 
     12#include "stratcon_iep.h" 
    1213#include "noit_conf.h" 
    1314#include "noit_check.h" 
     
    1920#include <zlib.h> 
    2021 
     22static char *check_loadall = NULL; 
     23static const char *check_loadall_conf = "/stratcon/database/statements/allchecks"; 
    2124static char *check_find = NULL; 
    2225static const char *check_find_conf = "/stratcon/database/statements/findcheck"; 
     
    7780  ds_job_detail   *tail; 
    7881} conn_q; 
     82 
     83static int stratcon_database_connect(conn_q *cq); 
    7984 
    8085static void 
     
    216221} while(0) 
    217222 
     223static int 
     224stratcon_datastore_asynch_drive_iep(eventer_t e, int mask, void *closure, 
     225                                    struct timeval *now) { 
     226  conn_q *cq = closure; 
     227  ds_job_detail *d; 
     228  int i, row_count = 0, good = 0; 
     229  char buff[1024]; 
     230 
     231  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     232  if(mask & EVENTER_ASYNCH_CLEANUP) return 0; 
     233 
     234  stratcon_database_connect(cq); 
     235  d = calloc(1, sizeof(*d)); 
     236  GET_QUERY(check_loadall); 
     237  PG_EXEC(check_loadall); 
     238  row_count = PQntuples(d->res); 
     239   
     240  for(i=0; i<row_count; i++) { 
     241    char *remote, *id, *target, *module, *name; 
     242    PG_GET_STR_COL(remote, i, "remote_address"); 
     243    PG_GET_STR_COL(id, i, "id"); 
     244    PG_GET_STR_COL(target, i, "target"); 
     245    PG_GET_STR_COL(module, i, "module"); 
     246    PG_GET_STR_COL(name, i, "name"); 
     247    snprintf(buff, sizeof(buff), "C\t0.000\t%s\t%s\t%s\t%s\n", id, target, module, name); 
     248    stratcon_iep_line_processor(DS_OP_INSERT, NULL, buff); 
     249    good++; 
     250  } 
     251  noitL(noit_error, "Staged %d/%d remembered checks into IEP\n", good, row_count); 
     252 bad_row: 
     253  PQclear(d->res); 
     254  return 0; 
     255} 
     256void 
     257stratcon_datastore_iep_check_preload() { 
     258  eventer_t e; 
     259  conn_q *cq; 
     260  cq = __get_conn_q_for_remote(NULL); 
     261 
     262  e = eventer_alloc(); 
     263  e->mask = EVENTER_ASYNCH; 
     264  e->callback = stratcon_datastore_asynch_drive_iep; 
     265  e->closure = cq; 
     266  eventer_add_asynch(cq->jobq, e); 
     267} 
    218268execute_outcome_t 
    219269stratcon_datastore_find(conn_q *cq, ds_job_detail *d) { 
     
    500550                                 struct timeval *now) { 
    501551  conn_q *cq = closure; 
    502   ds_job_detail *current
     552  ds_job_detail *current, *next
    503553  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
    504554 
     
    510560  while(current) { 
    511561    if(current->rt) { 
     562      next = current->next; 
    512563      stratcon_datastore_find(cq, current); 
    513       current = current->next; 
     564      current = next; 
    514565    } 
    515566    else if(current->completion_event) { 
     567      next = current->next; 
    516568      eventer_add(current->completion_event); 
    517       current = current->next; 
     569      current = next; 
    518570      __remove_until(cq, current); 
    519571    } 
     
    658710  nnode->dispatch = f; 
    659711  nnode->next = onlookers; 
    660   while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (vpsized_int)nnode->next) 
     712  while(noit_atomic_casptr((void **)&onlookers, nnode, nnode->next) != (void *)nnode->next) 
    661713    nnode->next = onlookers; 
    662714} 
  • src/stratcon_datastore.h

    ra7ed2df r24eac7c  
    3232  stratcon_datastore_saveconfig(void *unused); 
    3333 
     34/* Private'ish... called from IEP to populate IEP */ 
     35API_EXPORT(void) 
     36  stratcon_datastore_iep_check_preload(); 
     37 
    3438#endif 
  • src/stratcon_iep.c

    ra907717 r24eac7c  
    1111#include "stratcon_jlog_streamer.h" 
    1212#include "stratcon_datastore.h" 
     13#include "stratcon_iep.h" 
    1314#include "noit_conf.h" 
    1415#include "noit_check.h" 
     
    4142struct iep_job_closure { 
    4243  char *line;       /* This is a copy and gets trashed during processing */ 
     44  char *remote; 
    4345  xmlDocPtr doc; 
    4446  char *doc_str; 
     
    4648}; 
    4749 
    48 static void 
    49 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 
    50                                 struct sockaddr *remote, void *operand); 
    5150static void 
    5251start_iep_daemon(); 
     
    7978 
    8079static xmlDocPtr 
    81 stratcon_iep_doc_from_status(char *data) { 
     80stratcon_iep_doc_from_status(char *data, char *remote) { 
    8281  xmlDocPtr doc; 
    8382  char *parts[7]; 
     
    8685  NEWDOC(doc, "NoitStatus", 
    8786         { 
     87           ADDCHILD("remote", remote); 
    8888           ADDCHILD("id", parts[2]); 
    8989           ADDCHILD("state", parts[3]); 
     
    9696 
    9797static xmlDocPtr 
    98 stratcon_iep_doc_from_metric(char *data) { 
     98stratcon_iep_doc_from_check(char *data, char *remote) { 
     99  xmlDocPtr doc; 
     100  char *parts[6]; 
     101  if(bust_to_parts(data, parts, 6) != 6) return NULL; 
     102  /* 'C' TIMESTAMP UUID TARGET MODULE NAME */ 
     103  NEWDOC(doc, "NoitCheck", 
     104         { 
     105           ADDCHILD("remote", remote); 
     106           ADDCHILD("id", parts[2]); 
     107           ADDCHILD("target", parts[3]); 
     108           ADDCHILD("module", parts[4]); 
     109           ADDCHILD("name", parts[5]); 
     110         }); 
     111noitL(noit_error,"Submitting check %s\n", parts[2]); 
     112  return doc; 
     113
     114 
     115static xmlDocPtr 
     116stratcon_iep_doc_from_metric(char *data, char *remote) { 
    99117  xmlDocPtr doc; 
    100118  char *parts[6]; 
     
    110128  NEWDOC(doc, rootname, 
    111129         { 
     130           ADDCHILD("remote", remote); 
    112131           ADDCHILD("id", parts[2]); 
    113132           ADDCHILD("name", parts[3]); 
     
    118137 
    119138static xmlDocPtr 
    120 stratcon_iep_doc_from_query(char *data) { 
     139stratcon_iep_doc_from_query(char *data, char *remote) { 
    121140  xmlDocPtr doc; 
    122141  char *parts[4]; 
     
    134153 
    135154static xmlDocPtr 
    136 stratcon_iep_doc_from_querystop(char *data) { 
     155stratcon_iep_doc_from_querystop(char *data, char *remote) { 
    137156  xmlDocPtr doc; 
    138157  char *parts[2]; 
     
    148167 
    149168static xmlDocPtr 
    150 stratcon_iep_doc_from_line(char *data) { 
     169stratcon_iep_doc_from_line(char *data, char *remote) { 
    151170  if(data) { 
    152171    switch(*data) { 
    153       case 'S': return stratcon_iep_doc_from_status(data); 
    154       case 'M': return stratcon_iep_doc_from_metric(data); 
    155       case 'Q': return stratcon_iep_doc_from_query(data); 
    156       case 'q': return stratcon_iep_doc_from_querystop(data); 
     172      case 'C': return stratcon_iep_doc_from_check(data, remote); 
     173      case 'S': return stratcon_iep_doc_from_status(data, remote); 
     174      case 'M': return stratcon_iep_doc_from_metric(data, remote); 
     175      case 'Q': return stratcon_iep_doc_from_query(data, remote); 
     176      case 'q': return stratcon_iep_doc_from_querystop(data, remote); 
    157177    } 
    158178  } 
     
    215235      query++; 
    216236    } 
    217     stratcon_iep_datastore_onlooker(DS_OP_INSERT, NULL, line); 
     237    stratcon_iep_line_processor(DS_OP_INSERT, NULL, line); 
    218238    free(line); 
    219239  } 
     
    302322     } 
    303323#endif 
     324     stratcon_datastore_iep_check_preload(); 
    304325     stratcon_iep_submit_queries(); 
    305326  } 
     
    320341    if(job) { 
    321342      if(job->line) free(job->line); 
     343      if(job->remote) free(job->remote); 
    322344      if(job->doc_str) free(job->doc_str); 
    323345      if(job->doc) xmlFreeDoc(job->doc); 
     
    332354    return 0; 
    333355  } 
    334   job->doc = stratcon_iep_doc_from_line(job->line); 
     356  job->doc = stratcon_iep_doc_from_line(job->line, job->remote); 
    335357  if(job->doc) { 
    336358    job->doc_str = stratcon__xml_doc_to_str(job->doc); 
     
    392414} 
    393415 
    394 static void 
    395 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, 
    396                                 struct sockaddr *remote, void *operand) { 
     416void 
     417stratcon_iep_line_processor(stratcon_datastore_op_t op, 
     418                            struct sockaddr *remote, void *operand) { 
     419  int len; 
     420  char remote_str[128]; 
    397421  struct iep_job_closure *jc; 
    398422  eventer_t newe; 
     
    405429  } 
    406430  if(op != DS_OP_INSERT) return; 
     431 
     432  snprintf(remote_str, sizeof(remote_str), "%s", "0.0.0.0"); 
     433  if(remote) { 
     434    switch(remote->sa_family) { 
     435      case AF_INET: 
     436        len = sizeof(struct sockaddr_in); 
     437        inet_ntop(remote->sa_family, &((struct sockaddr_in *)remote)->sin_addr, 
     438                  remote_str, len); 
     439        break; 
     440      case AF_INET6: 
     441       len = sizeof(struct sockaddr_in6); 
     442        inet_ntop(remote->sa_family, &((struct sockaddr_in6 *)remote)->sin6_addr, 
     443                  remote_str, len); 
     444       break; 
     445      case AF_UNIX: 
     446        len = SUN_LEN(((struct sockaddr_un *)remote)); 
     447        snprintf(remote_str, sizeof(remote_str), "%s", ((struct sockaddr_un *)remote)->sun_path); 
     448        break; 
     449    } 
     450  } 
    407451 
    408452  /* process operand and push onto queue */ 
     
    414458  jc = calloc(1, sizeof(*jc)); 
    415459  jc->line = strdup(operand); 
     460  jc->remote = strdup(remote_str); 
    416461  newe->closure = jc; 
    417462 
     
    435480  ctx = stratcon_jlog_streamer_ctx_alloc(); 
    436481  ctx->jlog_feed_cmd = htonl(NOIT_JLOG_DATA_TEMP_FEED); 
    437   ctx->push = stratcon_iep_datastore_onlooker; 
     482  ctx->push = stratcon_iep_line_processor; 
    438483  return ctx; 
    439484} 
  • src/stratcon_iep.h

    r6453a67 r24eac7c  
    1010#include "eventer/eventer.h" 
    1111#include "utils/noit_hash.h" 
     12#include "stratcon_datastore.h" 
    1213 
    1314#include <sys/types.h> 
     
    1718  stratcon_iep_init(); 
    1819 
     20API_EXPORT(void) 
     21  stratcon_iep_line_processor(stratcon_datastore_op_t op, 
     22                              struct sockaddr *remote, void *operand); 
     23 
    1924#endif