Show
Ignore:
Timestamp:
01/17/09 20:05:36 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1232222736 +0000
git-parent:

[2f419952f26b44b72a3e237c5aea5d3cf9af86c9]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1232222736 +0000
Message:

first whack at feeding actual data. refs #71

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/stratcon_datastore.c

    raa6712f r21b0c6c  
    99#include "utils/noit_b64.h" 
    1010#include "stratcon_datastore.h" 
     11#include "stratcon_realtime_http.h" 
    1112#include "noit_conf.h" 
    1213#include "noit_check.h" 
     
    1819#include <zlib.h> 
    1920 
     21static char *check_find = NULL; 
     22static const char *check_find_conf = "/stratcon/database/statements/findcheck"; 
    2023static char *check_insert = NULL; 
    2124static const char *check_insert_conf = "/stratcon/database/statements/check"; 
     
    3740#define MAX_PARAMS 8 
    3841#define POSTGRES_PARTS \ 
     42  PGresult *res; \ 
     43  int rv; \ 
    3944  int nparams; \ 
    4045  int metric_type; \ 
     
    5257 
    5358  char *data;  /* The raw string, NULL means the stream is done -- commit. */ 
     59  struct realtime_tracker *rt; 
     60 
    5461  int problematic; 
    5562  eventer_t completion_event; /* This event should be registered if non NULL */ 
     
    101108__get_conn_q_for_remote(struct sockaddr *remote) { 
    102109  conn_q *cq; 
     110  static const char __zeros[4] = { 0 }; 
    103111  int len = 0; 
    104   switch(remote->sa_family) { 
    105     case AF_INET: len = sizeof(struct sockaddr_in); break; 
    106     case AF_INET6: len = sizeof(struct sockaddr_in6); break; 
    107     case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break; 
    108     default: return NULL; 
     112  if(remote) { 
     113    switch(remote->sa_family) { 
     114      case AF_INET: len = sizeof(struct sockaddr_in); break; 
     115      case AF_INET6: len = sizeof(struct sockaddr_in6); break; 
     116      case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break; 
     117      default: return NULL; 
     118    } 
     119  } 
     120  else { 
     121    /* This is a dummy connection */ 
     122    remote = (struct sockaddr *)__zeros; 
     123    len = 4; 
    109124  } 
    110125  if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq)) 
     
    152167  d->nparams++; \ 
    153168} while(0) 
    154  
     169#define DECLARE_PARAM_INT(i) do { \ 
     170  int buffer__len; \ 
     171  char buffer__[32]; \ 
     172  snprintf(buffer__, sizeof(buffer__), "%d", (i)); \ 
     173  buffer__len = strlen(buffer__); \ 
     174  DECLARE_PARAM_STR(buffer__, buffer__len); \ 
     175} while(0) 
     176 
     177#define PG_GET_STR_COL(dest, row, name) do { \ 
     178  int colnum = PQfnumber(d->res, name); \ 
     179  dest = NULL; \ 
     180  if (colnum >= 0) \ 
     181    dest = PQgetisnull(d->res, row, colnum) \ 
     182         ? NULL : PQgetvalue(d->res, row, colnum); \ 
     183} while(0) 
     184 
     185#define PG_EXEC(cmd) do { \ 
     186  d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ 
     187                        (const char * const *)d->paramValues, \ 
     188                        d->paramLengths, d->paramFormats, 0); \ 
     189  d->rv = PQresultStatus(d->res); \ 
     190  if(d->rv != PGRES_COMMAND_OK && \ 
     191     d->rv != PGRES_TUPLES_OK) { \ 
     192    noitL(noit_error, "stratcon datasource bad (%d): %s\n", \ 
     193          d->rv, PQresultErrorMessage(d->res)); \ 
     194    PQclear(d->res); \ 
     195    goto bad_row; \ 
     196  } \ 
     197} while(0) 
     198 
     199execute_outcome_t 
     200stratcon_datastore_find(conn_q *cq, ds_job_detail *d) { 
     201  char *val; 
     202  int row_count; 
     203 
     204  if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid); 
     205  GET_QUERY(check_find); 
     206  PG_EXEC(check_find); 
     207  row_count = PQntuples(d->res); 
     208  if(row_count != 1) goto bad_row; 
     209 
     210  /* Get the check uuid */ 
     211  PG_GET_STR_COL(val, 0, "id"); 
     212  if(!val) goto bad_row; 
     213  if(uuid_parse(val, d->rt->checkid)) goto bad_row; 
     214 
     215  /* Get the remote_address (which noit owns this) */ 
     216  PG_GET_STR_COL(val, 0, "remote_address"); 
     217  if(!val) goto bad_row; 
     218  d->rt->noit = strdup(val); 
     219 
     220  PQclear(d->res); 
     221  return DS_EXEC_SUCCESS; 
     222 bad_row: 
     223  return DS_EXEC_ROW_FAILED; 
     224
    155225execute_outcome_t 
    156226stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) { 
     
    288358  } 
    289359 
    290 #define PG_EXEC(cmd) do { \ 
    291   PGresult *res; \ 
    292   int rv; \ 
    293   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ 
    294                      (const char * const *)d->paramValues, \ 
    295                      d->paramLengths, d->paramFormats, 0); \ 
    296   rv = PQresultStatus(res); \ 
    297   if(rv != PGRES_COMMAND_OK && \ 
    298      rv != PGRES_TUPLES_OK) { \ 
    299     noitL(noit_error, "stratcon datasource bad row (%d): %s\n", \ 
    300           rv, PQresultErrorMessage(res)); \ 
    301     PQclear(res); \ 
    302     goto bad_row; \ 
    303   } \ 
    304   PQclear(res); \ 
    305 } while(0) 
    306  
    307360  /* Now execute the query */ 
    308361  switch(type) { 
     
    310363      GET_QUERY(config_insert); 
    311364      PG_EXEC(config_insert); 
     365      PQclear(d->res); 
    312366      break; 
    313367    case 'C': 
    314368      GET_QUERY(check_insert); 
    315369      PG_EXEC(check_insert); 
     370      PQclear(d->res); 
    316371      break; 
    317372    case 'S': 
    318373      GET_QUERY(status_insert); 
    319374      PG_EXEC(status_insert); 
     375      PQclear(d->res); 
    320376      break; 
    321377    case 'M': 
     
    328384          GET_QUERY(metric_insert_numeric); 
    329385          PG_EXEC(metric_insert_numeric); 
     386          PQclear(d->res); 
    330387          break; 
    331388        case METRIC_STRING: 
    332389          GET_QUERY(metric_insert_text); 
    333390          PG_EXEC(metric_insert_text); 
     391          PQclear(d->res); 
    334392          break; 
    335393        default: 
     
    418476  last_sp = NULL; \ 
    419477} while(0) 
     478int 
     479stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, 
     480                                 struct timeval *now) { 
     481  conn_q *cq = closure; 
     482  ds_job_detail *current; 
     483  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     484 
     485  if(!cq->head) return 0;  
     486 
     487  stratcon_database_connect(cq); 
     488 
     489  current = cq->head;  
     490  while(current) { 
     491    if(current->rt) { 
     492      stratcon_datastore_find(cq, current); 
     493      current = current->next; 
     494    } 
     495    else if(current->completion_event) { 
     496      eventer_add(current->completion_event); 
     497      current = current->next; 
     498      __remove_until(cq, current); 
     499    } 
     500    else current = current->next; 
     501  } 
     502  return 0; 
     503} 
    420504int 
    421505stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, 
     
    483567  dsjd = calloc(1, sizeof(*dsjd)); 
    484568  switch(op) { 
     569    case DS_OP_FIND: 
     570      dsjd->rt = operand; 
     571      __append(cq, dsjd); 
     572      break; 
    485573    case DS_OP_INSERT: 
    486574      dsjd->data = operand; 
    487575      __append(cq, dsjd); 
    488576      break; 
     577    case DS_OP_FIND_COMPLETE: 
    489578    case DS_OP_CHKPT: 
    490579      dsjd->completion_event = operand; 
     
    492581      e = eventer_alloc(); 
    493582      e->mask = EVENTER_ASYNCH; 
    494       e->callback = stratcon_datastore_asynch_execute; 
     583      if(op == DS_OP_FIND_COMPLETE) 
     584        e->callback = stratcon_datastore_asynch_lookup; 
     585      else if(op == DS_OP_CHKPT) 
     586        e->callback = stratcon_datastore_asynch_execute; 
    495587      e->closure = cq; 
    496588      eventer_add(e); 
     
    521613    GET_QUERY(config_insert); 
    522614    PG_EXEC(config_insert); 
     615    PQclear(d->res); 
    523616    rv = 0; 
    524617