Show
Ignore:
Timestamp:
09/24/09 23:50:27 (9 years ago)
Author:
Dan Di Spaltro <dan.dispaltro@gmail.com>
git-committer:
Dan Di Spaltro <dan.dispaltro@gmail.com> 1253836227 +0000
git-parent:

[7666f798ce5023fb2d514f6ea370156ce06a1102]

git-author:
Dan Di Spaltro <dan.dispaltro@gmail.com> 1253836227 +0000
Message:

Updated the the connector bits to parse jlog instead of xml.
closes #185

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon_iep.c

    rb9a4230 r0335d9d  
    4141#include "noit_conf.h" 
    4242#include "noit_check.h" 
    43 #include "noit_xml.h" 
    4443 
    4544#include <sys/types.h> 
     
    5251#endif 
    5352#include <assert.h> 
    54 #include <libxml/parser.h> 
    55 #include <libxml/tree.h> 
    56 #include <libxml/xmlsave.h> 
    5753#ifdef OPENWIRE 
    5854#include "amqcs.h" 
     
    7975  char *line;       /* This is a copy and gets trashed during processing */ 
    8076  char *remote; 
    81   xmlDocPtr doc; 
    8277  char *doc_str; 
    8378  apr_pool_t *pool; 
     
    8782start_iep_daemon(); 
    8883 
    89 static int 
    90 bust_to_parts(char *in, char **p, int len) { 
    91   int cnt = 0; 
    92   char *s = in; 
    93   while(cnt < len) { 
    94     p[cnt++] = s; 
    95     while(*s && *s != '\t') s++; 
    96     if(!*s) break; 
    97     *s++ = '\0'; 
    98   } 
    99   while(*s) s++; /* Move to end */ 
    100   if(s > in && *(s-1) == '\n') *(s-1) = '\0'; /* chomp */ 
    101   return cnt; 
    102 } 
    103  
    104 #define ADDCHILD(a,b) \ 
    105   xmlNewTextChild(root, NULL, (xmlChar *)(a), (xmlChar *)(b)) 
    106 #define NEWDOC(xmldoc,n,stanza) do { \ 
    107   xmlNodePtr root; \ 
    108   xmldoc = xmlNewDoc((xmlChar *)"1.0"); \ 
    109   root = xmlNewDocNode(xmldoc, NULL, (xmlChar *)(n), NULL); \ 
    110   xmlDocSetRootElement(xmldoc, root); \ 
    111   stanza \ 
    112 } while(0) 
    113  
    114  
    115 static xmlDocPtr 
    116 stratcon_iep_doc_from_status(char *data, char *remote) { 
    117   xmlDocPtr doc; 
    118   char *parts[7]; 
    119   if(bust_to_parts(data, parts, 7) != 7) return NULL; 
    120   /* 'S' TIMESTAMP UUID STATE AVAILABILITY DURATION STATUS_MESSAGE */ 
    121   NEWDOC(doc, "NoitStatus", 
    122          { 
    123            ADDCHILD("remote", remote); 
    124            ADDCHILD("id", parts[2]); 
    125            ADDCHILD("state", parts[3]); 
    126            ADDCHILD("availability", parts[4]); 
    127            ADDCHILD("duration", parts[5]); 
    128            ADDCHILD("status", parts[6]); 
    129          }); 
    130   return doc; 
    131 } 
    132  
    133 static xmlDocPtr 
    134 stratcon_iep_doc_from_check(char *data, char *remote) { 
    135   xmlDocPtr doc; 
    136   char *parts[6]; 
    137   if(bust_to_parts(data, parts, 6) != 6) return NULL; 
    138   /* 'C' TIMESTAMP UUID TARGET MODULE NAME */ 
    139   NEWDOC(doc, "NoitCheck", 
    140          { 
    141            ADDCHILD("remote", remote); 
    142            ADDCHILD("id", parts[2]); 
    143            ADDCHILD("target", parts[3]); 
    144            ADDCHILD("module", parts[4]); 
    145            ADDCHILD("name", parts[5]); 
    146          }); 
    147   return doc; 
    148 } 
    149  
    150 static xmlDocPtr 
    151 stratcon_iep_doc_from_metric(char *data, char *remote) { 
    152   xmlDocPtr doc; 
    153   char *parts[6]; 
    154   const char *rootname = "NoitMetricNumeric"; 
    155   const char *valuename = "value"; 
    156   if(bust_to_parts(data, parts, 6) != 6) return NULL; 
    157   /*  'M' TIMESTAMP UUID NAME TYPE VALUE */ 
    158  
    159   if(*parts[4] == METRIC_STRING) { 
    160     rootname = "NoitMetricText"; 
    161     valuename = "message"; 
    162   } 
    163   NEWDOC(doc, rootname, 
    164          { 
    165            ADDCHILD("remote", remote); 
    166            ADDCHILD("id", parts[2]); 
    167            ADDCHILD("name", parts[3]); 
    168            ADDCHILD(valuename, parts[5]); 
    169          }); 
    170   return doc; 
    171 } 
    172  
    173 static xmlDocPtr 
    174 stratcon_iep_doc_from_statement(char *data, char *remote) { 
    175   xmlDocPtr doc; 
    176   char *parts[3]; 
    177   if(bust_to_parts(data, parts, 3) != 3) return NULL; 
    178   /*  'D' ID QUERY  */ 
    179  
    180   NEWDOC(doc, "StratconStatement", 
    181          { 
    182            ADDCHILD("id", parts[1]); 
    183            ADDCHILD("expression", parts[2]); 
    184          }); 
    185   return doc; 
    186 } 
    187  
    188 static xmlDocPtr 
    189 stratcon_iep_doc_from_query(char *data, char *remote) { 
    190   xmlDocPtr doc; 
    191   char *parts[4]; 
    192   if(bust_to_parts(data, parts, 4) != 4) return NULL; 
    193   /*  'Q' ID NAME QUERY  */ 
    194  
    195   NEWDOC(doc, "StratconQuery", 
    196          { 
    197            ADDCHILD("id", parts[1]); 
    198            ADDCHILD("name", parts[2]); 
    199            ADDCHILD("expression", parts[3]); 
    200          }); 
    201   return doc; 
    202 } 
    203  
    204 static xmlDocPtr 
    205 stratcon_iep_doc_from_querystop(char *data, char *remote) { 
    206   xmlDocPtr doc; 
    207   char *parts[2]; 
    208   if(bust_to_parts(data, parts, 2) != 2) return NULL; 
    209   /*  'Q' ID */ 
    210  
    211   NEWDOC(doc, "StratconQueryStop", 
    212          { 
    213            xmlNodeSetContent(root, (xmlChar *)parts[1]); 
    214          }); 
    215   return doc; 
    216 } 
    217  
    218 static xmlDocPtr 
    219 stratcon_iep_doc_from_line(char *data, char *remote) { 
    220   if(data) { 
    221     switch(*data) { 
    222       case 'C': return stratcon_iep_doc_from_check(data, remote); 
    223       case 'S': return stratcon_iep_doc_from_status(data, remote); 
    224       case 'M': return stratcon_iep_doc_from_metric(data, remote); 
    225       case 'D': return stratcon_iep_doc_from_statement(data, remote); 
    226       case 'Q': return stratcon_iep_doc_from_query(data, remote); 
    227       case 'q': return stratcon_iep_doc_from_querystop(data, remote); 
    228     } 
    229   } 
    230   return NULL; 
    231 } 
    23284 
    23385static float 
     
    537389      } 
    538390 
    539  
    540  
    541 /* 
    542       We don't use login/pass 
    543       apr_hash_set(frame.headers, "login", APR_HASH_KEY_STRING, ""); 
    544       apr_hash_set(frame.headers, "passcode", APR_HASH_KEY_STRING, ""); 
    545 */ 
    546391      frame.body = NULL; 
    547392      frame.body_length = -1; 
     
    604449      if(job->remote) free(job->remote); 
    605450      if(job->doc_str) free(job->doc_str); 
    606       if(job->doc) xmlFreeDoc(job->doc); 
    607451      if(job->pool) apr_pool_destroy(job->pool); 
    608452      free(job); 
     
    619463    return 0; 
    620464  } 
    621   job->doc = stratcon_iep_doc_from_line(job->line, job->remote); 
    622   if(job->doc) { 
    623     job->doc_str = noit_xmlSaveToBuffer(job->doc); 
    624     if(job->doc_str) { 
    625       /* Submit */ 
    626       if(driver && driver->pool && driver->connection) { 
    627         apr_status_t rc; 
     465  /* Submit */ 
     466  if(driver && driver->pool && driver->connection) { 
     467    apr_status_t rc; 
     468    int line_len = strlen(job->line); 
     469    int remote_len = strlen(job->remote); 
    628470#ifdef OPENWIRE 
    629         ow_ActiveMQQueue *dest; 
    630         ow_ActiveMQTextMessage *message; 
    631  
    632         apr_pool_create(&job->pool, driver->pool); 
    633         message = ow_ActiveMQTextMessage_create(job->pool); 
    634         message->content = 
    635           ow_byte_array_create_with_data(job->pool,strlen(job->doc_str), 
    636                                          job->doc_str); 
    637         dest = ow_ActiveMQQueue_create(job->pool); 
    638         dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");          
    639         rc = amqcs_send(driver->connection, 
    640                         (ow_ActiveMQDestination*)dest, 
    641                         (ow_ActiveMQMessage*)message, 
    642                         1,4,0,job->pool); 
    643         if(rc != APR_SUCCESS) { 
    644           noitL(noit_error, "MQ send failed, disconnecting\n"); 
    645           if(driver->connection) amqcs_disconnect(&driver->connection); 
    646           driver->connection = NULL; 
    647        
     471    ow_ActiveMQQueue *dest; 
     472    ow_ActiveMQTextMessage *message; 
     473 
     474    apr_pool_create(&job->pool, driver->pool); 
     475    message = ow_ActiveMQTextMessage_create(job->pool); 
     476    message->content = 
     477      ow_byte_array_create_with_data(job->pool,strlen(job->doc_str), 
     478                                     job->doc_str); 
     479    dest = ow_ActiveMQQueue_create(job->pool); 
     480    dest->physicalName = ow_string_create_from_cstring(job->pool,"TEST.QUEUE");          
     481    rc = amqcs_send(driver->connection, 
     482                    (ow_ActiveMQDestination*)dest, 
     483                    (ow_ActiveMQMessage*)message, 
     484                    1,4,0,job->pool); 
     485    if(rc != APR_SUCCESS) { 
     486      noitL(noit_error, "MQ send failed, disconnecting\n"); 
     487      if(driver->connection) amqcs_disconnect(&driver->connection); 
     488      driver->connection = NULL; 
     489   
    648490#else 
    649         stomp_frame out; 
    650  
    651         apr_pool_create(&job->pool, driver->pool); 
    652  
    653         out.command = "SEND"; 
    654         out.headers = apr_hash_make(job->pool); 
    655         if (driver->exchange) 
    656           apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
    657  
    658         apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 
    659         apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 
    660        
    661         out.body_length = -1; 
    662         out.body = job->doc_str; 
    663         rc = stomp_write(driver->connection, &out, job->pool); 
    664         if(rc != APR_SUCCESS) { 
    665           noitL(noit_error, "STOMP send failed, disconnecting\n"); 
    666           if(driver->connection) stomp_disconnect(&driver->connection); 
    667           driver->connection = NULL; 
    668         } 
     491    stomp_frame out; 
     492 
     493    job->doc_str = (char*)calloc(line_len + 1 /* \t */ + 
     494        remote_len + 2, 1); 
     495    strncpy(job->doc_str, job->line, 2); 
     496    strncat(job->doc_str, job->remote, remote_len); 
     497    strncat(job->doc_str, "\t", 1); 
     498    strncat(job->doc_str, job->line + 2, line_len - 2); 
     499 
     500    apr_pool_create(&job->pool, driver->pool); 
     501 
     502    out.command = "SEND"; 
     503    out.headers = apr_hash_make(job->pool); 
     504    if (driver->exchange) 
     505      apr_hash_set(out.headers, "exchange", APR_HASH_KEY_STRING, driver->exchange); 
     506 
     507    apr_hash_set(out.headers, "destination", APR_HASH_KEY_STRING, "/queue/noit.firehose"); 
     508    apr_hash_set(out.headers, "ack", APR_HASH_KEY_STRING, "auto"); 
     509   
     510    out.body_length = -1; 
     511    out.body = job->doc_str; 
     512    rc = stomp_write(driver->connection, &out, job->pool); 
     513    if(rc != APR_SUCCESS) { 
     514      noitL(noit_error, "STOMP send failed, disconnecting\n"); 
     515      if(driver->connection) stomp_disconnect(&driver->connection); 
     516      driver->connection = NULL; 
     517    } 
    669518#endif 
    670       } 
    671       else { 
    672         noitL(noit_error, "Not submitting event, no MQ\n"); 
    673       } 
    674     } 
    675519  } 
    676520  else { 
    677     noitL(noit_iep, "no iep handler for: '%s'\n", job->line); 
     521    noitL(noit_iep, "no iep handler for: '%s'\n", job->line);  
    678522  } 
    679523  return 0; 
     
    818662                           &info->command)) { 
    819663    noitL(noit_error, "No IEP start command provided.  You're on your own.\n"); 
    820     goto bail; 
     664    // goto bail; 
     665    // If you want to start it as a seperate process 
     666    return; 
    821667  } 
    822668  if(pipe(info->stdin_pipe) != 0 ||