Changeset 21b0c6c1011f78327925b8a1914d98cdd5cd43d5

Show
Ignore:
Timestamp:
01/17/09 20:05:36 (5 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1232222736 +0000
git-parent:

[2f419952f26b44b72a3e237c5aea5d3cf9af86c9]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1232222736 +0000
Message:

first whack at feeding actual data. refs #71

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/noit_http.c

    r7940b59 r21b0c6c  
    390390  memset(&ctx->res, 0, sizeof(ctx->res)); 
    391391} 
     392void 
     393noit_http_ctx_session_release(noit_http_session_ctx *ctx) { 
     394  if(noit_atomic_dec32(&ctx->ref_cnt) == 0) { 
     395    noit_http_request_release(ctx); 
     396    noit_http_response_release(ctx); 
     397    free(ctx); 
     398  } 
     399} 
    392400int 
    393401noit_http_session_drive(eventer_t e, int origmask, void *closure, 
     
    424432  return 0; 
    425433 release: 
    426   noit_http_request_release(ctx); 
    427   noit_http_response_release(ctx); 
     434  noit_http_ctx_session_release(ctx); 
    428435  return 0; 
    429436} 
     
    433440  noit_http_session_ctx *ctx; 
    434441  ctx = calloc(1, sizeof(*ctx)); 
     442  ctx->ref_cnt = 1; 
    435443  ctx->req.complete = noit_false; 
    436444  ctx->conn.e = e; 
  • src/noit_http.h

    r49e329e r21b0c6c  
    1010#include "eventer/eventer.h" 
    1111#include "utils/noit_hash.h" 
     12#include "utils/noit_atomic.h" 
    1213 
    1314typedef enum { 
     
    8485 
    8586typedef struct noit_http_session_ctx { 
     87  noit_atomic32_t ref_cnt; 
    8688  noit_http_connection conn; 
    8789  noit_http_request req; 
     
    9496API_EXPORT(noit_http_session_ctx *) 
    9597  noit_http_session_ctx_new(noit_http_dispatch_func, void *, eventer_t); 
     98API_EXPORT(void) 
     99  noit_http_session_ctx_release(noit_http_session_ctx *); 
    96100 
    97101API_EXPORT(int) 
  • src/noitd.c

    r839ed4b r21b0c6c  
    2020#include "noit_console.h" 
    2121#include "noit_jlog_listener.h" 
     22#include "noit_livestream_listener.h" 
    2223#include "noit_module.h" 
    2324#include "noit_conf.h" 
     
    205206  noit_console_init(); 
    206207  noit_jlog_listener_init(); 
     208  noit_livestream_listener_init(); 
    207209 
    208210  noit_module_init(); 
  • src/stratcon.conf

    r55168c7 r21b0c6c  
    66      <outlet name="stderr"/> 
    77      <log name="error"/> 
    8       <log name="debug" disabled="true"/> 
     8      <log name="debug"/> 
     9      <log name="error/eventer" disabled="true"/> 
     10      <log name="debug/eventer" disabled="true"/> 
    911    </console_output> 
    1012  </logs> 
     
    3032  <database> 
    3133    <dbconfig> 
    32       <host>postgres83dev.office.omniti.com</host> 
     34      <host>noit.office.omniti.com</host> 
    3335      <dbname>reconnoiter</dbname> 
    3436      <user>stratcon</user> 
     
    3638    </dbconfig> 
    3739    <statements> 
     40      <findcheck><![CDATA[ 
     41        SELECT remote_address, id 
     42          FROM stratcon.mv_loading_dock_check_s 
     43         WHERE sid = $1 
     44      ]]></findcheck> 
    3845      <check><![CDATA[ 
    3946        INSERT INTO stratcon.loading_dock_check_s 
     
    7784  </consoles> 
    7885  <realtime type="stratcon_realtime_http"> 
    79     <listener address="*" port="8080"> 
     86    <listener address="*" port="80"> 
    8087    </listener> 
    8188  </realtime> 
  • src/stratcon_datastore.c

    raa6712f r21b0c6c  
    99#include "utils/noit_b64.h" 
    1010#include "stratcon_datastore.h" 
     11#include "stratcon_realtime_http.h" 
    1112#include "noit_conf.h" 
    1213#include "noit_check.h" 
     
    1819#include <zlib.h> 
    1920 
     21static char *check_find = NULL; 
     22static const char *check_find_conf = "/stratcon/database/statements/findcheck"; 
    2023static char *check_insert = NULL; 
    2124static const char *check_insert_conf = "/stratcon/database/statements/check"; 
     
    3740#define MAX_PARAMS 8 
    3841#define POSTGRES_PARTS \ 
     42  PGresult *res; \ 
     43  int rv; \ 
    3944  int nparams; \ 
    4045  int metric_type; \ 
     
    5257 
    5358  char *data;  /* The raw string, NULL means the stream is done -- commit. */ 
     59  struct realtime_tracker *rt; 
     60 
    5461  int problematic; 
    5562  eventer_t completion_event; /* This event should be registered if non NULL */ 
     
    101108__get_conn_q_for_remote(struct sockaddr *remote) { 
    102109  conn_q *cq; 
     110  static const char __zeros[4] = { 0 }; 
    103111  int len = 0; 
    104   switch(remote->sa_family) { 
    105     case AF_INET: len = sizeof(struct sockaddr_in); break; 
    106     case AF_INET6: len = sizeof(struct sockaddr_in6); break; 
    107     case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break; 
    108     default: return NULL; 
     112  if(remote) { 
     113    switch(remote->sa_family) { 
     114      case AF_INET: len = sizeof(struct sockaddr_in); break; 
     115      case AF_INET6: len = sizeof(struct sockaddr_in6); break; 
     116      case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break; 
     117      default: return NULL; 
     118    } 
     119  } 
     120  else { 
     121    /* This is a dummy connection */ 
     122    remote = (struct sockaddr *)__zeros; 
     123    len = 4; 
    109124  } 
    110125  if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq)) 
     
    152167  d->nparams++; \ 
    153168} while(0) 
    154  
     169#define DECLARE_PARAM_INT(i) do { \ 
     170  int buffer__len; \ 
     171  char buffer__[32]; \ 
     172  snprintf(buffer__, sizeof(buffer__), "%d", (i)); \ 
     173  buffer__len = strlen(buffer__); \ 
     174  DECLARE_PARAM_STR(buffer__, buffer__len); \ 
     175} while(0) 
     176 
     177#define PG_GET_STR_COL(dest, row, name) do { \ 
     178  int colnum = PQfnumber(d->res, name); \ 
     179  dest = NULL; \ 
     180  if (colnum >= 0) \ 
     181    dest = PQgetisnull(d->res, row, colnum) \ 
     182         ? NULL : PQgetvalue(d->res, row, colnum); \ 
     183} while(0) 
     184 
     185#define PG_EXEC(cmd) do { \ 
     186  d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ 
     187                        (const char * const *)d->paramValues, \ 
     188                        d->paramLengths, d->paramFormats, 0); \ 
     189  d->rv = PQresultStatus(d->res); \ 
     190  if(d->rv != PGRES_COMMAND_OK && \ 
     191     d->rv != PGRES_TUPLES_OK) { \ 
     192    noitL(noit_error, "stratcon datasource bad (%d): %s\n", \ 
     193          d->rv, PQresultErrorMessage(d->res)); \ 
     194    PQclear(d->res); \ 
     195    goto bad_row; \ 
     196  } \ 
     197} while(0) 
     198 
     199execute_outcome_t 
     200stratcon_datastore_find(conn_q *cq, ds_job_detail *d) { 
     201  char *val; 
     202  int row_count; 
     203 
     204  if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid); 
     205  GET_QUERY(check_find); 
     206  PG_EXEC(check_find); 
     207  row_count = PQntuples(d->res); 
     208  if(row_count != 1) goto bad_row; 
     209 
     210  /* Get the check uuid */ 
     211  PG_GET_STR_COL(val, 0, "id"); 
     212  if(!val) goto bad_row; 
     213  if(uuid_parse(val, d->rt->checkid)) goto bad_row; 
     214 
     215  /* Get the remote_address (which noit owns this) */ 
     216  PG_GET_STR_COL(val, 0, "remote_address"); 
     217  if(!val) goto bad_row; 
     218  d->rt->noit = strdup(val); 
     219 
     220  PQclear(d->res); 
     221  return DS_EXEC_SUCCESS; 
     222 bad_row: 
     223  return DS_EXEC_ROW_FAILED; 
     224
    155225execute_outcome_t 
    156226stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) { 
     
    288358  } 
    289359 
    290 #define PG_EXEC(cmd) do { \ 
    291   PGresult *res; \ 
    292   int rv; \ 
    293   res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ 
    294                      (const char * const *)d->paramValues, \ 
    295                      d->paramLengths, d->paramFormats, 0); \ 
    296   rv = PQresultStatus(res); \ 
    297   if(rv != PGRES_COMMAND_OK && \ 
    298      rv != PGRES_TUPLES_OK) { \ 
    299     noitL(noit_error, "stratcon datasource bad row (%d): %s\n", \ 
    300           rv, PQresultErrorMessage(res)); \ 
    301     PQclear(res); \ 
    302     goto bad_row; \ 
    303   } \ 
    304   PQclear(res); \ 
    305 } while(0) 
    306  
    307360  /* Now execute the query */ 
    308361  switch(type) { 
     
    310363      GET_QUERY(config_insert); 
    311364      PG_EXEC(config_insert); 
     365      PQclear(d->res); 
    312366      break; 
    313367    case 'C': 
    314368      GET_QUERY(check_insert); 
    315369      PG_EXEC(check_insert); 
     370      PQclear(d->res); 
    316371      break; 
    317372    case 'S': 
    318373      GET_QUERY(status_insert); 
    319374      PG_EXEC(status_insert); 
     375      PQclear(d->res); 
    320376      break; 
    321377    case 'M': 
     
    328384          GET_QUERY(metric_insert_numeric); 
    329385          PG_EXEC(metric_insert_numeric); 
     386          PQclear(d->res); 
    330387          break; 
    331388        case METRIC_STRING: 
    332389          GET_QUERY(metric_insert_text); 
    333390          PG_EXEC(metric_insert_text); 
     391          PQclear(d->res); 
    334392          break; 
    335393        default: 
     
    418476  last_sp = NULL; \ 
    419477} while(0) 
     478int 
     479stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, 
     480                                 struct timeval *now) { 
     481  conn_q *cq = closure; 
     482  ds_job_detail *current; 
     483  if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 
     484 
     485  if(!cq->head) return 0;  
     486 
     487  stratcon_database_connect(cq); 
     488 
     489  current = cq->head;  
     490  while(current) { 
     491    if(current->rt) { 
     492      stratcon_datastore_find(cq, current); 
     493      current = current->next; 
     494    } 
     495    else if(current->completion_event) { 
     496      eventer_add(current->completion_event); 
     497      current = current->next; 
     498      __remove_until(cq, current); 
     499    } 
     500    else current = current->next; 
     501  } 
     502  return 0; 
     503} 
    420504int 
    421505stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, 
     
    483567  dsjd = calloc(1, sizeof(*dsjd)); 
    484568  switch(op) { 
     569    case DS_OP_FIND: 
     570      dsjd->rt = operand; 
     571      __append(cq, dsjd); 
     572      break; 
    485573    case DS_OP_INSERT: 
    486574      dsjd->data = operand; 
    487575      __append(cq, dsjd); 
    488576      break; 
     577    case DS_OP_FIND_COMPLETE: 
    489578    case DS_OP_CHKPT: 
    490579      dsjd->completion_event = operand; 
     
    492581      e = eventer_alloc(); 
    493582      e->mask = EVENTER_ASYNCH; 
    494       e->callback = stratcon_datastore_asynch_execute; 
     583      if(op == DS_OP_FIND_COMPLETE) 
     584        e->callback = stratcon_datastore_asynch_lookup; 
     585      else if(op == DS_OP_CHKPT) 
     586        e->callback = stratcon_datastore_asynch_execute; 
    495587      e->closure = cq; 
    496588      eventer_add(e); 
     
    521613    GET_QUERY(config_insert); 
    522614    PG_EXEC(config_insert); 
     615    PQclear(d->res); 
    523616    rv = 0; 
    524617 
  • src/stratcon_datastore.h

    rdcd539d r21b0c6c  
    1616typedef enum { 
    1717 DS_OP_INSERT = 1, 
    18  DS_OP_CHKPT = 2 
     18 DS_OP_CHKPT = 2, 
     19 DS_OP_FIND = 3, 
     20 DS_OP_FIND_COMPLETE = 4 
    1921} stratcon_datastore_op_t; 
    2022 
  • src/stratcon_jlog_streamer.c

    r6bb9ef8 r21b0c6c  
    1212#include "noit_jlog_listener.h" 
    1313#include "stratcon_datastore.h" 
     14#include "stratcon_jlog_streamer.h" 
    1415 
    1516#include <unistd.h> 
     
    2829 
    2930typedef struct jlog_streamer_ctx_t { 
    30   union { 
    31     struct sockaddr remote; 
    32     struct sockaddr_un remote_un; 
    33     struct sockaddr_in remote_in; 
    34     struct sockaddr_in6 remote_in6; 
    35   } r; 
    36   socklen_t remote_len; 
    37   char *remote_cn; 
    38   u_int32_t current_backoff; 
    39   int wants_shutdown; 
    40   noit_hash_table *config; 
    41   noit_hash_table *sslconfig; 
    4231  int bytes_expected; 
    4332  int bytes_read; 
     
    5847    u_int32_t message_len; 
    5948  } header; 
    60  
    61   eventer_t timeout_event; 
    6249} jlog_streamer_ctx_t; 
    6350 
    64 static void jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx); 
     51static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx); 
    6552 
    6653jlog_streamer_ctx_t * 
     
    7057  return ctx; 
    7158} 
     59noit_connection_ctx_t * 
     60noit_connection_ctx_alloc(void) { 
     61  noit_connection_ctx_t *ctx; 
     62  ctx = calloc(1, sizeof(*ctx)); 
     63  return ctx; 
     64} 
    7265int 
    73 jlog_streamer_reinitiate(eventer_t e, int mask, void *closure, 
     66noit_connection_reinitiate(eventer_t e, int mask, void *closure, 
    7467                         struct timeval *now) { 
    75   jlog_streamer_ctx_t *ctx = closure; 
     68  noit_connection_ctx_t *ctx = closure; 
    7669  ctx->timeout_event = NULL; 
    77   jlog_streamer_initiate_connection(closure); 
     70  noit_connection_initiate_connection(closure); 
    7871  return 0; 
    7972} 
    8073void 
    81 jlog_streamer_schedule_reattempt(jlog_streamer_ctx_t *ctx, 
    82                                  struct timeval *now) { 
     74noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx, 
     75                                   struct timeval *now) { 
    8376  struct timeval __now, interval; 
    8477  const char *v; 
     
    114107  else 
    115108    ctx->timeout_event = eventer_alloc(); 
    116   ctx->timeout_event->callback = jlog_streamer_reinitiate; 
     109  ctx->timeout_event->callback = noit_connection_reinitiate; 
    117110  ctx->timeout_event->closure = ctx; 
    118111  ctx->timeout_event->mask = EVENTER_TIMER; 
     
    121114} 
    122115void 
    123 jlog_streamer_ctx_free(jlog_streamer_ctx_t *ctx) { 
    124   if(ctx->buffer) free(ctx->buffer); 
     116noit_connection_ctx_free(noit_connection_ctx_t *ctx) { 
    125117  if(ctx->remote_cn) free(ctx->remote_cn); 
    126118  if(ctx->timeout_event) { 
     
    128120    eventer_free(ctx->timeout_event); 
    129121  } 
     122  ctx->consumer_free(ctx->consumer_ctx); 
     123  free(ctx); 
     124} 
     125void 
     126jlog_streamer_ctx_free(void *cl) { 
     127  jlog_streamer_ctx_t *ctx = cl; 
     128  if(ctx->buffer) free(ctx->buffer); 
    130129  free(ctx); 
    131130} 
     
    182181                           struct timeval *now) { 
    183182  static u_int32_t jlog_feed_cmd = 0; 
    184   jlog_streamer_ctx_t *ctx = closure; 
     183  noit_connection_ctx_t *nctx = closure; 
     184  jlog_streamer_ctx_t *ctx = nctx->consumer_ctx; 
    185185  int len; 
    186186  jlog_id n_chkpt; 
     
    188188  if(!jlog_feed_cmd) jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED); 
    189189 
    190   if(mask & EVENTER_EXCEPTION || ctx->wants_shutdown) { 
     190  if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) { 
    191191 socket_error: 
    192192    ctx->state = WANT_INITIATE; 
     
    196196    if(ctx->buffer) free(ctx->buffer); 
    197197    ctx->buffer = NULL; 
    198     jlog_streamer_schedule_reattempt(ctx, now); 
     198    noit_connection_schedule_reattempt(nctx, now); 
    199199    eventer_remove_fd(e->fd); 
    200200    e->opset->close(e->fd, &mask, e); 
     
    244244      case WANT_BODY: 
    245245        FULLREAD(e, ctx, (unsigned long)ctx->header.message_len); 
    246         stratcon_datastore_push(DS_OP_INSERT, &ctx->r.remote, ctx->buffer); 
     246        stratcon_datastore_push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 
    247247        /* Don't free the buffer, it's used by the datastore process. */ 
    248248        ctx->buffer = NULL; 
     
    255255          completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION; 
    256256          ctx->state = WANT_CHKPT; 
    257           stratcon_datastore_push(DS_OP_CHKPT, &ctx->r.remote, completion_e); 
     257          stratcon_datastore_push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 
    258258          noitL(noit_debug, "Pushing batch asynch...\n"); 
    259259          return 0; 
     
    287287 
    288288int 
    289 jlog_streamer_ssl_upgrade(eventer_t e, int mask, void *closure, 
    290                           struct timeval *now) { 
    291   jlog_streamer_ctx_t *ctx = closure; 
     289noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure, 
     290                            struct timeval *now) { 
     291  noit_connection_ctx_t *nctx = closure; 
    292292  int rv; 
    293293 
     
    295295  if(rv > 0) { 
    296296    eventer_ssl_ctx_t *sslctx; 
    297     e->callback = stratcon_jlog_recv_handler
     297    e->callback = nctx->consumer_callback
    298298    /* We must make a copy of the acceptor_closure_t for each new 
    299299     * connection. 
     
    306306        end = cn; 
    307307        while(*end && *end != '/') end++; 
    308         ctx->remote_cn = malloc(end - cn + 1); 
    309         memcpy(ctx->remote_cn, cn, end - cn); 
    310         ctx->remote_cn[end-cn] = '\0'; 
     308        nctx->remote_cn = malloc(end - cn + 1); 
     309        memcpy(nctx->remote_cn, cn, end - cn); 
     310        nctx->remote_cn[end-cn] = '\0'; 
    311311      } 
    312312    } 
     
    317317  eventer_remove_fd(e->fd); 
    318318  e->opset->close(e->fd, &mask, e); 
    319   jlog_streamer_schedule_reattempt(ctx, now); 
     319  noit_connection_schedule_reattempt(nctx, now); 
    320320  return 0; 
    321321} 
    322322int 
    323 jlog_streamer_complete_connect(eventer_t e, int mask, void *closure, 
    324                                struct timeval *now) { 
    325   jlog_streamer_ctx_t *ctx = closure; 
     323noit_connection_complete_connect(eventer_t e, int mask, void *closure, 
     324                                 struct timeval *now) { 
     325  noit_connection_ctx_t *nctx = closure; 
    326326  char *cert, *key, *ca, *ciphers; 
    327327  eventer_ssl_ctx_t *sslctx; 
     
    331331    eventer_remove_fd(e->fd); 
    332332    e->opset->close(e->fd, &mask, e); 
    333     jlog_streamer_schedule_reattempt(ctx, now); 
     333    noit_connection_schedule_reattempt(nctx, now); 
    334334    return 0; 
    335335  } 
    336336 
    337337#define SSLCONFGET(var,name) do { \ 
    338   if(!noit_hash_retrieve(ctx->sslconfig, name, strlen(name), \ 
     338  if(!noit_hash_retrieve(nctx->sslconfig, name, strlen(name), \ 
    339339                         (void **)&var)) var = NULL; } while(0) 
    340340  SSLCONFGET(cert, "certificate_file"); 
     
    346346 
    347347  eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert, 
    348                              ctx->sslconfig); 
     348                             nctx->sslconfig); 
    349349  EVENTER_ATTACH_SSL(e, sslctx); 
    350   e->callback = jlog_streamer_ssl_upgrade; 
     350  e->callback = noit_connection_ssl_upgrade; 
    351351  return e->callback(e, mask, closure, now); 
    352352} 
    353353static void 
    354 jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx) { 
     354noit_connection_initiate_connection(noit_connection_ctx_t *nctx) { 
    355355  struct timeval __now; 
    356356  eventer_t e; 
     
    359359 
    360360  /* Open a socket */ 
    361   fd = socket(ctx->r.remote.sa_family, SOCK_STREAM, 0); 
     361  fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0); 
    362362  if(fd < 0) goto reschedule; 
    363363 
     
    367367 
    368368  /* Initiate a connection */ 
    369   rv = connect(fd, &ctx->r.remote, ctx->remote_len); 
     369  rv = connect(fd, &nctx->r.remote, nctx->remote_len); 
    370370  if(rv == -1 && errno != EINPROGRESS) goto reschedule; 
    371371 
     
    374374  e->fd = fd; 
    375375  e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION; 
    376   e->callback = jlog_streamer_complete_connect; 
    377   e->closure = ctx; 
     376  e->callback = noit_connection_complete_connect; 
     377  e->closure = nctx; 
    378378  eventer_add(e); 
    379379  return; 
     
    382382  if(fd >= 0) close(fd); 
    383383  gettimeofday(&__now, NULL); 
    384   jlog_streamer_schedule_reattempt(ctx, &__now); 
     384  noit_connection_schedule_reattempt(nctx, &__now); 
    385385  return; 
    386386} 
    387387 
    388388int 
    389 initiate_jlog_streamer(const char *host, unsigned short port, 
    390                        noit_hash_table *sslconfig, noit_hash_table *config) { 
    391   jlog_streamer_ctx_t *ctx; 
     389initiate_noit_connection(const char *host, unsigned short port, 
     390                         noit_hash_table *sslconfig, noit_hash_table *config, 
     391                         eventer_func_t handler, void *closure, 
     392                         void (*freefunc)(void *)) { 
     393  noit_connection_ctx_t *ctx; 
    392394 
    393395  int8_t family; 
     
    414416  } 
    415417 
    416   ctx = jlog_streamer_ctx_alloc(); 
     418  ctx = noit_connection_ctx_alloc(); 
    417419   
    418420  memset(&ctx->r, 0, sizeof(ctx->r)); 
     
    449451  noit_hash_merge_as_dict(ctx->config, config); 
    450452 
    451   jlog_streamer_initiate_connection(ctx); 
     453  ctx->consumer_callback = handler; 
     454  ctx->consumer_free = freefunc; 
     455  ctx->consumer_ctx = closure; 
     456  noit_connection_initiate_connection(ctx); 
    452457  return 0; 
    453458} 
    454459 
    455460void 
    456 stratcon_jlog_streamer_reload(const char *toplevel) { 
     461stratcon_streamer_connection(const char *toplevel, const char *destination, 
     462                             eventer_func_t handler, 
     463                             void *(*handler_alloc)(void), void *handler_ctx, 
     464                             void (*handler_free)(void *)) { 
    457465  int i, cnt = 0; 
    458466  noit_conf_section_t *noit_configs; 
     
    474482      continue; 
    475483    } 
     484    /* if destination is specified, exact match it */ 
     485    if(destination && strcmp(address, destination)) continue; 
     486 
    476487    if(!noit_conf_get_int(noit_configs[i], 
    477488                          "ancestor-or-self::node()/@port", &portint)) 
     
    487498    config = noit_conf_get_hash(noit_configs[i], "config"); 
    488499 
    489     initiate_jlog_streamer(address, port, sslconfig, config); 
    490   } 
     500    initiate_noit_connection(address, port, sslconfig, config, 
     501                             handler, 
     502                             handler_alloc ? handler_alloc() : handler_ctx, 
     503                             handler_free); 
     504  } 
     505
     506void 
     507stratcon_jlog_streamer_reload(const char *toplevel) { 
     508  stratcon_streamer_connection(toplevel, NULL, 
     509                               stratcon_jlog_recv_handler, 
     510                               (void *(*)())jlog_streamer_ctx_alloc, NULL, 
     511                               jlog_streamer_ctx_free); 
    491512} 
    492513 
    493514void 
    494515stratcon_jlog_streamer_init(const char *toplevel) { 
    495   eventer_name_callback("jlog_streamer_reinitiate", 
    496                         jlog_streamer_reinitiate); 
     516  eventer_name_callback("noit_connection_reinitiate", 
     517                        noit_connection_reinitiate); 
    497518  eventer_name_callback("stratcon_jlog_recv_handler", 
    498519                        stratcon_jlog_recv_handler); 
    499   eventer_name_callback("jlog_streamer_ssl_upgrade", 
    500                         jlog_streamer_ssl_upgrade); 
    501   eventer_name_callback("jlog_streamer_complete_connect", 
    502                         jlog_streamer_complete_connect); 
     520  eventer_name_callback("noit_connection_ssl_upgrade", 
     521                        noit_connection_ssl_upgrade); 
     522  eventer_name_callback("noit_connection_complete_connect", 
     523                        noit_connection_complete_connect); 
    503524  stratcon_jlog_streamer_reload(toplevel); 
    504525} 
  • src/stratcon_jlog_streamer.h

    ra7304b5 r21b0c6c  
    77#define _STRATCON_LOG_STREAMER_H 
    88 
    9 #include "noit_defines.h" 
     9#include "noit_conf.h" 
     10#include <netinet/in.h> 
     11#include <sys/un.h> 
     12#include <arpa/inet.h> 
     13 
     14typedef struct noit_connection_ctx_t { 
     15  union { 
     16    struct sockaddr remote; 
     17    struct sockaddr_un remote_un; 
     18    struct sockaddr_in remote_in; 
     19    struct sockaddr_in6 remote_in6; 
     20  } r; 
     21  socklen_t remote_len; 
     22  char *remote_cn; 
     23  u_int32_t current_backoff; 
     24  int wants_shutdown; 
     25  noit_hash_table *config; 
     26  noit_hash_table *sslconfig; 
     27  eventer_t timeout_event; 
     28 
     29  eventer_func_t consumer_callback; 
     30  void (*consumer_free)(void *); 
     31  void *consumer_ctx; 
     32} noit_connection_ctx_t; 
    1033 
    1134API_EXPORT(void) 
     
    1336API_EXPORT(void) 
    1437  stratcon_jlog_streamer_reload(const char *toplevel); 
     38API_EXPORT(void) 
     39  stratcon_streamer_connection(const char *toplevel, const char *destination, 
     40                               eventer_func_t handler, 
     41                               void *(*handler_alloc)(void), void *handler_ctx, 
     42                               void (*handler_free)(void *)); 
    1543 
    1644#endif 
  • src/stratcon_realtime_http.c

    r8757d86 r21b0c6c  
    1313#include "noit_listener.h" 
    1414#include "noit_http.h" 
    15  
    16 typedef struct { 
    17   int setup; 
     15#include "noit_livestream_listener.h" 
     16#include "stratcon_realtime_http.h" 
     17#include "stratcon_jlog_streamer.h" 
     18#include "stratcon_datastore.h" 
     19 
     20#include <unistd.h> 
     21#include <assert.h> 
     22#include <errno.h> 
     23#include <sys/types.h> 
     24#include <sys/socket.h> 
     25#ifdef HAVE_SYS_FILIO_H 
     26#include <sys/filio.h> 
     27#endif 
     28#include <netinet/in.h> 
     29#include <sys/un.h> 
     30#include <arpa/inet.h> 
     31 
     32 
     33typedef struct realtime_recv_ctx_t { 
     34  int bytes_expected; 
     35  int bytes_read; 
     36  char *buffer;         /* These guys are for doing partial reads */ 
     37 
     38  enum { 
     39    WANT_INITIATE = 0, 
     40    WANT_SEND_INTERVAL = 1, 
     41    WANT_SEND_UUID = 2, 
     42    WANT_HEADER = 3, 
     43    WANT_BODY = 4, 
     44  } state; 
     45  int count;            /* Number of jlog messages we need to read */ 
     46  noit_http_session_ctx *ctx; 
     47  struct realtime_tracker *rt; 
     48} realtime_recv_ctx_t; 
     49 
     50typedef struct realtime_context { 
     51  enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup; 
     52  struct realtime_tracker *checklist; 
    1853} realtime_context; 
    1954 
     
    2257  return calloc(sizeof(*ctx), 1); 
    2358} 
    24 int 
    25 stratcon_realtime_ticker(eventer_t old, int mask, void *closure, 
    26                          struct timeval *now) { 
     59static void free_realtime_tracker(struct realtime_tracker *rt) { 
     60  if(rt->noit) free(rt->noit); 
     61  free(rt); 
     62
     63static void clear_realtime_context(realtime_context *rc) { 
     64  while(rc->checklist) { 
     65    struct realtime_tracker *tofree; 
     66    tofree = rc->checklist; 
     67    rc->checklist = tofree->next; 
     68    free_realtime_tracker(tofree); 
     69  } 
     70
     71int 
     72stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) { 
    2773  char buffer[1024]; 
    28   noit_http_session_ctx *ctx = closure; 
    29  
     74  char *scp, *ecp, *token; 
     75  int len; 
     76 
     77#define PROCESS_NEXT_FIELD(t,l) do { \ 
     78  if(!*scp) goto bad_row; \ 
     79  ecp = strchr(scp, '\t'); \ 
     80  if(!ecp) goto bad_row; \ 
     81  t = scp; \ 
     82  l = (ecp-scp); \ 
     83  scp = ecp + 1; \ 
     84} while(0) 
     85#define PROCESS_LAST_FIELD(t,l) do { \ 
     86  if(!*scp) ecp = scp; \ 
     87  else { \ 
     88    ecp = scp + strlen(scp); /* Puts us at the '\0' */ \ 
     89    if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \ 
     90  } \ 
     91  t = scp; \ 
     92  l = (ecp-scp); \ 
     93} while(0) 
     94 
     95  PROCESS_NEXT_FIELD(token,len); /* Skip the leader */ 
     96  if(buff[0] == 'M') { 
     97    scp = buff; 
     98    snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('"); 
     99    if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 
     100 
     101    /* Time */ 
     102    PROCESS_NEXT_FIELD(token,len); 
     103    if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 
     104 
     105    snprintf(buffer, sizeof(buffer), "', '"); 
     106    if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 
     107 
     108    /* UUID */ 
     109    PROCESS_NEXT_FIELD(token,len); 
     110    if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 
     111 
     112    snprintf(buffer, sizeof(buffer), "', '"); 
     113    if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 
     114 
     115    /* name */ 
     116    PROCESS_NEXT_FIELD(token,len); 
     117    if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 
     118 
     119    snprintf(buffer, sizeof(buffer), "', '"); 
     120    if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 
     121 
     122    PROCESS_NEXT_FIELD(token,len); /* skip type */ 
     123    PROCESS_LAST_FIELD(token,len); /* value */ 
     124    if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 
     125 
     126    snprintf(buffer, sizeof(buffer), "');</script>\n"); 
     127    if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 
     128 
     129    if(noit_http_response_flush(ctx, noit_false) == noit_false) return -1; 
     130  } 
     131 
     132  return 0; 
     133 
     134 bad_row: 
     135  return -1; 
    30136  if(0) { 
    31137    noit_http_response_end(ctx); 
     
    34140    return 0; 
    35141  } 
    36  
    37   eventer_t e = eventer_alloc(); 
    38   gettimeofday(&e->whence, NULL); 
    39  
    40   snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('%lu','%s','%0.3f','%lu');</script>\n", 
    41            (unsigned long)1179, "allocator_requests", (float)(rand() % 100000) / 1000.0, 
    42            e->whence.tv_sec * 1000 + e->whence.tv_usec / 1000); 
    43   noit_http_response_append(ctx, buffer, strlen(buffer)); 
    44   noit_http_response_flush(ctx, noit_false); 
    45  
    46   e->mask = EVENTER_TIMER; 
    47   e->whence.tv_sec += 0; 
    48   e->whence.tv_usec += 500000; 
    49   e->callback = stratcon_realtime_ticker; 
    50   e->closure = closure; 
    51   eventer_add(e); 
     142
     143int 
     144stratcon_realtime_uri_parse(realtime_context *rc, char *uri) { 
     145  int len, cnt = 0; 
     146  char *cp, *copy, *interest, *brk; 
     147  if(strncmp(uri, "/data/", 6)) return 0; 
     148  cp = uri + 6; 
     149  len = strlen(cp); 
     150  copy = alloca(len + 1); 
     151  if(!copy) return 0; 
     152  memcpy(copy, cp, len); 
     153  copy[len] = '\0'; 
     154 
     155  for (interest = strtok_r(copy, "/", &brk); 
     156       interest; 
     157       interest = strtok_r(NULL, "/", &brk)) { 
     158    struct realtime_tracker *node; 
     159    char *interval; 
     160 
     161    interval = strchr(interest, '@'); 
     162    if(!interval) return 0; 
     163    *interval++ = '\0'; 
     164    node = calloc(1, sizeof(*node)); 
     165    node->rc = rc; 
     166    node->sid = atoi(interest); 
     167    node->interval = atoi(interval); 
     168    node->next = rc->checklist; 
     169    rc->checklist = node; 
     170    cnt++; 
     171  } 
     172  return cnt; 
     173
     174static void 
     175free_realtime_recv_ctx(void *vctx) { 
     176  realtime_recv_ctx_t *rrctx = vctx; 
     177  noit_atomic_dec32(&rrctx->ctx->ref_cnt); 
     178  free(rrctx); 
     179
     180#define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e) 
     181static int 
     182__read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) { 
     183  int len, mask; 
     184  while(ctx->bytes_read < ctx->bytes_expected) { 
     185    len = Eread(ctx->buffer + ctx->bytes_read, 
     186                ctx->bytes_expected - ctx->bytes_read); 
     187    if(len < 0) { 
     188      *newmask = mask; 
     189      return -1; 
     190    } 
     191    /* if we get 0 inside SSL, and there was a real error, we 
     192     * will actually get a -1 here. 
     193     * if(len == 0) return ctx->bytes_read; 
     194     */ 
     195    ctx->bytes_read += len; 
     196  } 
     197  assert(ctx->bytes_read == ctx->bytes_expected); 
     198  return ctx->bytes_read; 
     199
     200#define FULLREAD(e,ctx,size) do { \ 
     201  int mask, len; \ 
     202  if(!ctx->bytes_expected) { \ 
     203    ctx->bytes_expected = size; \ 
     204    if(ctx->buffer) free(ctx->buffer); \ 
     205    ctx->buffer = malloc(size + 1); \ 
     206    if(ctx->buffer == NULL) { \ 
     207      noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \ 
     208      goto socket_error; \ 
     209    } \ 
     210    ctx->buffer[size] = '\0'; \ 
     211  } \ 
     212  len = __read_on_ctx(e, ctx, &mask); \ 
     213  if(len < 0) { \ 
     214    if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \ 
     215    noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \ 
     216    goto socket_error; \ 
     217  } \ 
     218  ctx->bytes_read = 0; \ 
     219  ctx->bytes_expected = 0; \ 
     220  if(len != size) { \ 
     221    noitL(noit_error, "SSL short read [%d] (%d/%lu).  Reseting connection.\n", \ 
     222          ctx->state, len, (unsigned long)size); \ 
     223    goto socket_error; \ 
     224  } \ 
     225} while(0) 
     226 
     227int 
     228stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure, 
     229                               struct timeval *now) { 
     230  static u_int32_t livestream_cmd = 0; 
     231  noit_connection_ctx_t *nctx = closure; 
     232  realtime_recv_ctx_t *ctx = nctx->consumer_ctx; 
     233  int len; 
     234  u_int32_t nint; 
     235  char uuid_str[37]; 
     236 
     237  if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED); 
     238 
     239  if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) { 
     240 socket_error: 
     241    ctx->state = WANT_INITIATE; 
     242    ctx->count = 0; 
     243    ctx->bytes_read = 0; 
     244    ctx->bytes_expected = 0; 
     245    if(ctx->buffer) free(ctx->buffer); 
     246    ctx->buffer = NULL; 
     247    eventer_remove_fd(e->fd); 
     248    e->opset->close(e->fd, &mask, e); 
     249    return 0; 
     250  } 
     251 
     252#define full_nb_write(data, wlen) do { \ 
     253  len = e->opset->write(e->fd, data, wlen, \ 
     254                        &mask, e); \ 
     255  if(len < 0) { \ 
     256    if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \ 
     257    goto socket_error; \ 
     258  } \ 
     259  if(len != sizeof(livestream_cmd)) { \ 
     260    noitL(noit_error, "short write on initiating stream.\n"); \ 
     261    goto socket_error; \ 
     262  } \ 
     263} while(0) 
     264 
     265  while(1) { 
     266    switch(ctx->state) { 
     267      case WANT_INITIATE: 
     268        full_nb_write(&livestream_cmd, sizeof(livestream_cmd)); 
     269        ctx->state = WANT_SEND_INTERVAL; 
     270      case WANT_SEND_INTERVAL: 
     271        nint = htonl(ctx->rt->interval); 
     272        full_nb_write(&nint, sizeof(nint)); 
     273        ctx->state = WANT_SEND_UUID; 
     274      case WANT_SEND_UUID: 
     275        uuid_unparse_lower(ctx->rt->checkid, uuid_str); 
     276        full_nb_write(uuid_str, 36); 
     277        ctx->state = WANT_HEADER; 
     278      case WANT_HEADER: 
     279        FULLREAD(e, ctx, sizeof(u_int32_t)); 
     280        memcpy(&ctx->bytes_expected, ctx->buffer, sizeof(u_int32_t)); 
     281        ctx->bytes_expected = ntohl(ctx->bytes_expected); 
     282        free(ctx->buffer); ctx->buffer = NULL; 
     283        ctx->state = WANT_BODY; 
     284        break; 
     285      case WANT_BODY: 
     286        FULLREAD(e, ctx, ctx->bytes_expected); 
     287        noitL(noit_error, "Read: '%s'\n", ctx->buffer); 
     288        if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error; 
     289        free(ctx->buffer); ctx->buffer = NULL; 
     290        ctx->state = WANT_BODY; 
     291        break; 
     292    } 
     293  } 
     294 
     295
     296 
     297int 
     298stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure, 
     299                                   struct timeval *now) { 
     300  noit_http_session_ctx *ctx = closure; 
     301  realtime_context *rc = ctx->dispatcher_closure; 
     302  struct realtime_tracker *node; 
     303 
     304  for(node = rc->checklist; node; node = node->next) { 
     305    if(node->noit) { 
     306      realtime_recv_ctx_t *rrctx; 
     307      rrctx = calloc(1, sizeof(*rrctx)); 
     308      rrctx->ctx = ctx; 
     309      rrctx->rt = node; 
     310      stratcon_streamer_connection(NULL, node->noit, 
     311                                   stratcon_realtime_recv_handler, 
     312                                   NULL, rrctx, 
     313                                   free_realtime_recv_ctx); 
     314    } 
     315    else 
     316      noit_atomic_dec32(&ctx->ref_cnt); 
     317  } 
     318  if(ctx->ref_cnt == 1) { 
     319    noit_http_response_end(ctx); 
     320    clear_realtime_context(rc); 
     321    memset(ctx->dispatcher_closure, 0, sizeof(realtime_context)); 
     322    if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE); 
     323  } 
    52324  return 0; 
    53325} 
     
    60332  noit_http_request *req = &ctx->req; 
    61333 
    62   if(strcmp(ctx->req.uri_str, "/data")) { 
    63     noit_http_response_status_set(ctx, 404, "OK"); 
    64     noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE); 
    65     noit_http_response_end(ctx); 
    66     return 0; 
    67   } 
    68   if(!rc->setup) { 
     334  if(rc->setup == RC_INITIAL) { 
     335    eventer_t completion; 
     336    struct realtime_tracker *node; 
    69337    char c[1024]; 
     338    int num_interests; 
     339 
     340    num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str); 
     341    if(num_interests == 0) { 
     342      noit_http_response_status_set(ctx, 404, "OK"); 
     343      noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE); 
     344      noit_http_response_end(ctx); 
     345      return 0; 
     346    } 
     347 
    70348    noitL(noit_error, "http: %s %s %s\n", 
    71349          req->method_str, req->uri_str, req->protocol_str); 
     
    75353    noit_http_response_status_set(ctx, 200, "OK"); 
    76354    noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED); 
    77 /*    noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE); */ 
     355    noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE); 
    78356    noit_http_response_header_set(ctx, "Content-Type", "text/html"); 
    79357 
     
    81359    noit_http_response_append(ctx, c, strlen(c)); 
    82360 
    83     memset(c, ' ', 1024); 
     361    /* this dumb crap is to make some browsers happy (Safari) */ 
     362    memset(c, ' ', sizeof(c)); 
    84363    noit_http_response_append(ctx, c, sizeof(c)); 
    85364    noit_http_response_flush(ctx, noit_false); 
    86365 
    87     stratcon_realtime_ticker(NULL, 0, ctx, NULL); 
    88     rc->setup = 1; 
     366    rc->setup = RC_REQ_RECV; 
     367    /* Each interest references the ctx */ 
     368    for(node = rc->checklist; node; node = node->next) { 
     369      noit_atomic_inc32(&ctx->ref_cnt); 
     370      stratcon_datastore_push(DS_OP_FIND, NULL, node); 
     371      noitL(noit_error, "Resolving sid: %d\n", node->sid); 
     372    } 
     373    completion = eventer_alloc(); 
     374    completion->mask = EVENTER_TIMER; 
     375    completion->callback = stratcon_realtime_http_postresolve; 
     376    completion->closure = ctx; 
     377    gettimeofday(&completion->whence, NULL); 
     378    stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion); 
    89379  } 
    90380  return EVENTER_EXCEPTION; 
  • src/stratcon_realtime_http.h

    r55168c7 r21b0c6c  
    77#define _STRATCON_REALTIME_HTTP_H 
    88 
    9 #include "noit_defines.h" 
     9#include "noit_conf.h" 
     10 
     11/* This is in the public header as the stratcon_datastore must know 
     12 * how to resolve this 
     13 */ 
     14struct realtime_tracker { 
     15  int sid;        /* set by request */ 
     16  int interval;   /* set by request */ 
     17  char *noit;     /* resolved by datastore */ 
     18  uuid_t checkid; /* resolved by datastore */ 
     19  struct realtime_tracker *next; /* next in series */ 
     20  eventer_t conn; /* used to track noitd connection feeding this */ 
     21  struct realtime_context *rc; /* link back to the rc that justified us */ 
     22}; 
    1023 
    1124API_EXPORT(void)