Changeset 21b0c6c1011f78327925b8a1914d98cdd5cd43d5
- Timestamp:
- 01/17/09 20:05:36 (4 years ago)
- git-parent:
- Files:
-
- src/noit_http.c (modified) (3 diffs)
- src/noit_http.h (modified) (3 diffs)
- src/noitd.c (modified) (2 diffs)
- src/stratcon.conf (modified) (4 diffs)
- src/stratcon_datastore.c (modified) (13 diffs)
- src/stratcon_datastore.h (modified) (1 diff)
- src/stratcon_jlog_streamer.c (modified) (26 diffs)
- src/stratcon_jlog_streamer.h (modified) (2 diffs)
- src/stratcon_realtime_http.c (modified) (6 diffs)
- src/stratcon_realtime_http.h (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
src/noit_http.c
r7940b59 r21b0c6c 390 390 memset(&ctx->res, 0, sizeof(ctx->res)); 391 391 } 392 void 393 noit_http_ctx_session_release(noit_http_session_ctx *ctx) { 394 if(noit_atomic_dec32(&ctx->ref_cnt) == 0) { 395 noit_http_request_release(ctx); 396 noit_http_response_release(ctx); 397 free(ctx); 398 } 399 } 392 400 int 393 401 noit_http_session_drive(eventer_t e, int origmask, void *closure, … … 424 432 return 0; 425 433 release: 426 noit_http_request_release(ctx); 427 noit_http_response_release(ctx); 434 noit_http_ctx_session_release(ctx); 428 435 return 0; 429 436 } … … 433 440 noit_http_session_ctx *ctx; 434 441 ctx = calloc(1, sizeof(*ctx)); 442 ctx->ref_cnt = 1; 435 443 ctx->req.complete = noit_false; 436 444 ctx->conn.e = e; src/noit_http.h
r49e329e r21b0c6c 10 10 #include "eventer/eventer.h" 11 11 #include "utils/noit_hash.h" 12 #include "utils/noit_atomic.h" 12 13 13 14 typedef enum { … … 84 85 85 86 typedef struct noit_http_session_ctx { 87 noit_atomic32_t ref_cnt; 86 88 noit_http_connection conn; 87 89 noit_http_request req; … … 94 96 API_EXPORT(noit_http_session_ctx *) 95 97 noit_http_session_ctx_new(noit_http_dispatch_func, void *, eventer_t); 98 API_EXPORT(void) 99 noit_http_session_ctx_release(noit_http_session_ctx *); 96 100 97 101 API_EXPORT(int) src/noitd.c
r839ed4b r21b0c6c 20 20 #include "noit_console.h" 21 21 #include "noit_jlog_listener.h" 22 #include "noit_livestream_listener.h" 22 23 #include "noit_module.h" 23 24 #include "noit_conf.h" … … 205 206 noit_console_init(); 206 207 noit_jlog_listener_init(); 208 noit_livestream_listener_init(); 207 209 208 210 noit_module_init(); src/stratcon.conf
r55168c7 r21b0c6c 6 6 <outlet name="stderr"/> 7 7 <log name="error"/> 8 <log name="debug" disabled="true"/> 8 <log name="debug"/> 9 <log name="error/eventer" disabled="true"/> 10 <log name="debug/eventer" disabled="true"/> 9 11 </console_output> 10 12 </logs> … … 30 32 <database> 31 33 <dbconfig> 32 <host> postgres83dev.office.omniti.com</host>34 <host>noit.office.omniti.com</host> 33 35 <dbname>reconnoiter</dbname> 34 36 <user>stratcon</user> … … 36 38 </dbconfig> 37 39 <statements> 40 <findcheck><![CDATA[ 41 SELECT remote_address, id 42 FROM stratcon.mv_loading_dock_check_s 43 WHERE sid = $1 44 ]]></findcheck> 38 45 <check><![CDATA[ 39 46 INSERT INTO stratcon.loading_dock_check_s … … 77 84 </consoles> 78 85 <realtime type="stratcon_realtime_http"> 79 <listener address="*" port="80 80">86 <listener address="*" port="80"> 80 87 </listener> 81 88 </realtime> src/stratcon_datastore.c
raa6712f r21b0c6c 9 9 #include "utils/noit_b64.h" 10 10 #include "stratcon_datastore.h" 11 #include "stratcon_realtime_http.h" 11 12 #include "noit_conf.h" 12 13 #include "noit_check.h" … … 18 19 #include <zlib.h> 19 20 21 static char *check_find = NULL; 22 static const char *check_find_conf = "/stratcon/database/statements/findcheck"; 20 23 static char *check_insert = NULL; 21 24 static const char *check_insert_conf = "/stratcon/database/statements/check"; … … 37 40 #define MAX_PARAMS 8 38 41 #define POSTGRES_PARTS \ 42 PGresult *res; \ 43 int rv; \ 39 44 int nparams; \ 40 45 int metric_type; \ … … 52 57 53 58 char *data; /* The raw string, NULL means the stream is done -- commit. */ 59 struct realtime_tracker *rt; 60 54 61 int problematic; 55 62 eventer_t completion_event; /* This event should be registered if non NULL */ … … 101 108 __get_conn_q_for_remote(struct sockaddr *remote) { 102 109 conn_q *cq; 110 static const char __zeros[4] = { 0 }; 103 111 int len = 0; 104 switch(remote->sa_family) { 105 case AF_INET: len = sizeof(struct sockaddr_in); break; 106 case AF_INET6: len = sizeof(struct sockaddr_in6); break; 107 case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break; 108 default: return NULL; 112 if(remote) { 113 switch(remote->sa_family) { 114 case AF_INET: len = sizeof(struct sockaddr_in); break; 115 case AF_INET6: len = sizeof(struct sockaddr_in6); break; 116 case AF_UNIX: len = SUN_LEN(((struct sockaddr_un *)remote)); break; 117 default: return NULL; 118 } 119 } 120 else { 121 /* This is a dummy connection */ 122 remote = (struct sockaddr *)__zeros; 123 len = 4; 109 124 } 110 125 if(noit_hash_retrieve(&ds_conns, (const char *)remote, len, (void **)&cq)) … … 152 167 d->nparams++; \ 153 168 } while(0) 154 169 #define DECLARE_PARAM_INT(i) do { \ 170 int buffer__len; \ 171 char buffer__[32]; \ 172 snprintf(buffer__, sizeof(buffer__), "%d", (i)); \ 173 buffer__len = strlen(buffer__); \ 174 DECLARE_PARAM_STR(buffer__, buffer__len); \ 175 } while(0) 176 177 #define PG_GET_STR_COL(dest, row, name) do { \ 178 int colnum = PQfnumber(d->res, name); \ 179 dest = NULL; \ 180 if (colnum >= 0) \ 181 dest = PQgetisnull(d->res, row, colnum) \ 182 ? NULL : PQgetvalue(d->res, row, colnum); \ 183 } while(0) 184 185 #define PG_EXEC(cmd) do { \ 186 d->res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \ 187 (const char * const *)d->paramValues, \ 188 d->paramLengths, d->paramFormats, 0); \ 189 d->rv = PQresultStatus(d->res); \ 190 if(d->rv != PGRES_COMMAND_OK && \ 191 d->rv != PGRES_TUPLES_OK) { \ 192 noitL(noit_error, "stratcon datasource bad (%d): %s\n", \ 193 d->rv, PQresultErrorMessage(d->res)); \ 194 PQclear(d->res); \ 195 goto bad_row; \ 196 } \ 197 } while(0) 198 199 execute_outcome_t 200 stratcon_datastore_find(conn_q *cq, ds_job_detail *d) { 201 char *val; 202 int row_count; 203 204 if(!d->nparams) DECLARE_PARAM_INT(d->rt->sid); 205 GET_QUERY(check_find); 206 PG_EXEC(check_find); 207 row_count = PQntuples(d->res); 208 if(row_count != 1) goto bad_row; 209 210 /* Get the check uuid */ 211 PG_GET_STR_COL(val, 0, "id"); 212 if(!val) goto bad_row; 213 if(uuid_parse(val, d->rt->checkid)) goto bad_row; 214 215 /* Get the remote_address (which noit owns this) */ 216 PG_GET_STR_COL(val, 0, "remote_address"); 217 if(!val) goto bad_row; 218 d->rt->noit = strdup(val); 219 220 PQclear(d->res); 221 return DS_EXEC_SUCCESS; 222 bad_row: 223 return DS_EXEC_ROW_FAILED; 224 } 155 225 execute_outcome_t 156 226 stratcon_datastore_execute(conn_q *cq, struct sockaddr *r, ds_job_detail *d) { … … 288 358 } 289 359 290 #define PG_EXEC(cmd) do { \291 PGresult *res; \292 int rv; \293 res = PQexecParams(cq->dbh, cmd, d->nparams, NULL, \294 (const char * const *)d->paramValues, \295 d->paramLengths, d->paramFormats, 0); \296 rv = PQresultStatus(res); \297 if(rv != PGRES_COMMAND_OK && \298 rv != PGRES_TUPLES_OK) { \299 noitL(noit_error, "stratcon datasource bad row (%d): %s\n", \300 rv, PQresultErrorMessage(res)); \301 PQclear(res); \302 goto bad_row; \303 } \304 PQclear(res); \305 } while(0)306 307 360 /* Now execute the query */ 308 361 switch(type) { … … 310 363 GET_QUERY(config_insert); 311 364 PG_EXEC(config_insert); 365 PQclear(d->res); 312 366 break; 313 367 case 'C': 314 368 GET_QUERY(check_insert); 315 369 PG_EXEC(check_insert); 370 PQclear(d->res); 316 371 break; 317 372 case 'S': 318 373 GET_QUERY(status_insert); 319 374 PG_EXEC(status_insert); 375 PQclear(d->res); 320 376 break; 321 377 case 'M': … … 328 384 GET_QUERY(metric_insert_numeric); 329 385 PG_EXEC(metric_insert_numeric); 386 PQclear(d->res); 330 387 break; 331 388 case METRIC_STRING: 332 389 GET_QUERY(metric_insert_text); 333 390 PG_EXEC(metric_insert_text); 391 PQclear(d->res); 334 392 break; 335 393 default: … … 418 476 last_sp = NULL; \ 419 477 } while(0) 478 int 479 stratcon_datastore_asynch_lookup(eventer_t e, int mask, void *closure, 480 struct timeval *now) { 481 conn_q *cq = closure; 482 ds_job_detail *current; 483 if(!(mask & EVENTER_ASYNCH_WORK)) return 0; 484 485 if(!cq->head) return 0; 486 487 stratcon_database_connect(cq); 488 489 current = cq->head; 490 while(current) { 491 if(current->rt) { 492 stratcon_datastore_find(cq, current); 493 current = current->next; 494 } 495 else if(current->completion_event) { 496 eventer_add(current->completion_event); 497 current = current->next; 498 __remove_until(cq, current); 499 } 500 else current = current->next; 501 } 502 return 0; 503 } 420 504 int 421 505 stratcon_datastore_asynch_execute(eventer_t e, int mask, void *closure, … … 483 567 dsjd = calloc(1, sizeof(*dsjd)); 484 568 switch(op) { 569 case DS_OP_FIND: 570 dsjd->rt = operand; 571 __append(cq, dsjd); 572 break; 485 573 case DS_OP_INSERT: 486 574 dsjd->data = operand; 487 575 __append(cq, dsjd); 488 576 break; 577 case DS_OP_FIND_COMPLETE: 489 578 case DS_OP_CHKPT: 490 579 dsjd->completion_event = operand; … … 492 581 e = eventer_alloc(); 493 582 e->mask = EVENTER_ASYNCH; 494 e->callback = stratcon_datastore_asynch_execute; 583 if(op == DS_OP_FIND_COMPLETE) 584 e->callback = stratcon_datastore_asynch_lookup; 585 else if(op == DS_OP_CHKPT) 586 e->callback = stratcon_datastore_asynch_execute; 495 587 e->closure = cq; 496 588 eventer_add(e); … … 521 613 GET_QUERY(config_insert); 522 614 PG_EXEC(config_insert); 615 PQclear(d->res); 523 616 rv = 0; 524 617 src/stratcon_datastore.h
rdcd539d r21b0c6c 16 16 typedef enum { 17 17 DS_OP_INSERT = 1, 18 DS_OP_CHKPT = 2 18 DS_OP_CHKPT = 2, 19 DS_OP_FIND = 3, 20 DS_OP_FIND_COMPLETE = 4 19 21 } stratcon_datastore_op_t; 20 22 src/stratcon_jlog_streamer.c
r6bb9ef8 r21b0c6c 12 12 #include "noit_jlog_listener.h" 13 13 #include "stratcon_datastore.h" 14 #include "stratcon_jlog_streamer.h" 14 15 15 16 #include <unistd.h> … … 28 29 29 30 typedef struct jlog_streamer_ctx_t { 30 union {31 struct sockaddr remote;32 struct sockaddr_un remote_un;33 struct sockaddr_in remote_in;34 struct sockaddr_in6 remote_in6;35 } r;36 socklen_t remote_len;37 char *remote_cn;38 u_int32_t current_backoff;39 int wants_shutdown;40 noit_hash_table *config;41 noit_hash_table *sslconfig;42 31 int bytes_expected; 43 32 int bytes_read; … … 58 47 u_int32_t message_len; 59 48 } header; 60 61 eventer_t timeout_event;62 49 } jlog_streamer_ctx_t; 63 50 64 static void jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx);51 static void noit_connection_initiate_connection(noit_connection_ctx_t *ctx); 65 52 66 53 jlog_streamer_ctx_t * … … 70 57 return ctx; 71 58 } 59 noit_connection_ctx_t * 60 noit_connection_ctx_alloc(void) { 61 noit_connection_ctx_t *ctx; 62 ctx = calloc(1, sizeof(*ctx)); 63 return ctx; 64 } 72 65 int 73 jlog_streamer_reinitiate(eventer_t e, int mask, void *closure,66 noit_connection_reinitiate(eventer_t e, int mask, void *closure, 74 67 struct timeval *now) { 75 jlog_streamer_ctx_t *ctx = closure;68 noit_connection_ctx_t *ctx = closure; 76 69 ctx->timeout_event = NULL; 77 jlog_streamer_initiate_connection(closure);70 noit_connection_initiate_connection(closure); 78 71 return 0; 79 72 } 80 73 void 81 jlog_streamer_schedule_reattempt(jlog_streamer_ctx_t *ctx,82 struct timeval *now) {74 noit_connection_schedule_reattempt(noit_connection_ctx_t *ctx, 75 struct timeval *now) { 83 76 struct timeval __now, interval; 84 77 const char *v; … … 114 107 else 115 108 ctx->timeout_event = eventer_alloc(); 116 ctx->timeout_event->callback = jlog_streamer_reinitiate;109 ctx->timeout_event->callback = noit_connection_reinitiate; 117 110 ctx->timeout_event->closure = ctx; 118 111 ctx->timeout_event->mask = EVENTER_TIMER; … … 121 114 } 122 115 void 123 jlog_streamer_ctx_free(jlog_streamer_ctx_t *ctx) { 124 if(ctx->buffer) free(ctx->buffer); 116 noit_connection_ctx_free(noit_connection_ctx_t *ctx) { 125 117 if(ctx->remote_cn) free(ctx->remote_cn); 126 118 if(ctx->timeout_event) { … … 128 120 eventer_free(ctx->timeout_event); 129 121 } 122 ctx->consumer_free(ctx->consumer_ctx); 123 free(ctx); 124 } 125 void 126 jlog_streamer_ctx_free(void *cl) { 127 jlog_streamer_ctx_t *ctx = cl; 128 if(ctx->buffer) free(ctx->buffer); 130 129 free(ctx); 131 130 } … … 182 181 struct timeval *now) { 183 182 static u_int32_t jlog_feed_cmd = 0; 184 jlog_streamer_ctx_t *ctx = closure; 183 noit_connection_ctx_t *nctx = closure; 184 jlog_streamer_ctx_t *ctx = nctx->consumer_ctx; 185 185 int len; 186 186 jlog_id n_chkpt; … … 188 188 if(!jlog_feed_cmd) jlog_feed_cmd = htonl(NOIT_JLOG_DATA_FEED); 189 189 190 if(mask & EVENTER_EXCEPTION || ctx->wants_shutdown) {190 if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) { 191 191 socket_error: 192 192 ctx->state = WANT_INITIATE; … … 196 196 if(ctx->buffer) free(ctx->buffer); 197 197 ctx->buffer = NULL; 198 jlog_streamer_schedule_reattempt(ctx, now);198 noit_connection_schedule_reattempt(nctx, now); 199 199 eventer_remove_fd(e->fd); 200 200 e->opset->close(e->fd, &mask, e); … … 244 244 case WANT_BODY: 245 245 FULLREAD(e, ctx, (unsigned long)ctx->header.message_len); 246 stratcon_datastore_push(DS_OP_INSERT, & ctx->r.remote, ctx->buffer);246 stratcon_datastore_push(DS_OP_INSERT, &nctx->r.remote, ctx->buffer); 247 247 /* Don't free the buffer, it's used by the datastore process. */ 248 248 ctx->buffer = NULL; … … 255 255 completion_e->mask = EVENTER_WRITE | EVENTER_EXCEPTION; 256 256 ctx->state = WANT_CHKPT; 257 stratcon_datastore_push(DS_OP_CHKPT, & ctx->r.remote, completion_e);257 stratcon_datastore_push(DS_OP_CHKPT, &nctx->r.remote, completion_e); 258 258 noitL(noit_debug, "Pushing batch asynch...\n"); 259 259 return 0; … … 287 287 288 288 int 289 jlog_streamer_ssl_upgrade(eventer_t e, int mask, void *closure,290 struct timeval *now) {291 jlog_streamer_ctx_t *ctx = closure;289 noit_connection_ssl_upgrade(eventer_t e, int mask, void *closure, 290 struct timeval *now) { 291 noit_connection_ctx_t *nctx = closure; 292 292 int rv; 293 293 … … 295 295 if(rv > 0) { 296 296 eventer_ssl_ctx_t *sslctx; 297 e->callback = stratcon_jlog_recv_handler;297 e->callback = nctx->consumer_callback; 298 298 /* We must make a copy of the acceptor_closure_t for each new 299 299 * connection. … … 306 306 end = cn; 307 307 while(*end && *end != '/') end++; 308 ctx->remote_cn = malloc(end - cn + 1);309 memcpy( ctx->remote_cn, cn, end - cn);310 ctx->remote_cn[end-cn] = '\0';308 nctx->remote_cn = malloc(end - cn + 1); 309 memcpy(nctx->remote_cn, cn, end - cn); 310 nctx->remote_cn[end-cn] = '\0'; 311 311 } 312 312 } … … 317 317 eventer_remove_fd(e->fd); 318 318 e->opset->close(e->fd, &mask, e); 319 jlog_streamer_schedule_reattempt(ctx, now);319 noit_connection_schedule_reattempt(nctx, now); 320 320 return 0; 321 321 } 322 322 int 323 jlog_streamer_complete_connect(eventer_t e, int mask, void *closure,324 struct timeval *now) {325 jlog_streamer_ctx_t *ctx = closure;323 noit_connection_complete_connect(eventer_t e, int mask, void *closure, 324 struct timeval *now) { 325 noit_connection_ctx_t *nctx = closure; 326 326 char *cert, *key, *ca, *ciphers; 327 327 eventer_ssl_ctx_t *sslctx; … … 331 331 eventer_remove_fd(e->fd); 332 332 e->opset->close(e->fd, &mask, e); 333 jlog_streamer_schedule_reattempt(ctx, now);333 noit_connection_schedule_reattempt(nctx, now); 334 334 return 0; 335 335 } 336 336 337 337 #define SSLCONFGET(var,name) do { \ 338 if(!noit_hash_retrieve( ctx->sslconfig, name, strlen(name), \338 if(!noit_hash_retrieve(nctx->sslconfig, name, strlen(name), \ 339 339 (void **)&var)) var = NULL; } while(0) 340 340 SSLCONFGET(cert, "certificate_file"); … … 346 346 347 347 eventer_ssl_ctx_set_verify(sslctx, eventer_ssl_verify_cert, 348 ctx->sslconfig);348 nctx->sslconfig); 349 349 EVENTER_ATTACH_SSL(e, sslctx); 350 e->callback = jlog_streamer_ssl_upgrade;350 e->callback = noit_connection_ssl_upgrade; 351 351 return e->callback(e, mask, closure, now); 352 352 } 353 353 static void 354 jlog_streamer_initiate_connection(jlog_streamer_ctx_t *ctx) {354 noit_connection_initiate_connection(noit_connection_ctx_t *nctx) { 355 355 struct timeval __now; 356 356 eventer_t e; … … 359 359 360 360 /* Open a socket */ 361 fd = socket( ctx->r.remote.sa_family, SOCK_STREAM, 0);361 fd = socket(nctx->r.remote.sa_family, SOCK_STREAM, 0); 362 362 if(fd < 0) goto reschedule; 363 363 … … 367 367 368 368 /* Initiate a connection */ 369 rv = connect(fd, & ctx->r.remote,ctx->remote_len);369 rv = connect(fd, &nctx->r.remote, nctx->remote_len); 370 370 if(rv == -1 && errno != EINPROGRESS) goto reschedule; 371 371 … … 374 374 e->fd = fd; 375 375 e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION; 376 e->callback = jlog_streamer_complete_connect;377 e->closure = ctx;376 e->callback = noit_connection_complete_connect; 377 e->closure = nctx; 378 378 eventer_add(e); 379 379 return; … … 382 382 if(fd >= 0) close(fd); 383 383 gettimeofday(&__now, NULL); 384 jlog_streamer_schedule_reattempt(ctx, &__now);384 noit_connection_schedule_reattempt(nctx, &__now); 385 385 return; 386 386 } 387 387 388 388 int 389 initiate_jlog_streamer(const char *host, unsigned short port, 390 noit_hash_table *sslconfig, noit_hash_table *config) { 391 jlog_streamer_ctx_t *ctx; 389 initiate_noit_connection(const char *host, unsigned short port, 390 noit_hash_table *sslconfig, noit_hash_table *config, 391 eventer_func_t handler, void *closure, 392 void (*freefunc)(void *)) { 393 noit_connection_ctx_t *ctx; 392 394 393 395 int8_t family; … … 414 416 } 415 417 416 ctx = jlog_streamer_ctx_alloc();418 ctx = noit_connection_ctx_alloc(); 417 419 418 420 memset(&ctx->r, 0, sizeof(ctx->r)); … … 449 451 noit_hash_merge_as_dict(ctx->config, config); 450 452 451 jlog_streamer_initiate_connection(ctx); 453 ctx->consumer_callback = handler; 454 ctx->consumer_free = freefunc; 455 ctx->consumer_ctx = closure; 456 noit_connection_initiate_connection(ctx); 452 457 return 0; 453 458 } 454 459 455 460 void 456 stratcon_jlog_streamer_reload(const char *toplevel) { 461 stratcon_streamer_connection(const char *toplevel, const char *destination, 462 eventer_func_t handler, 463 void *(*handler_alloc)(void), void *handler_ctx, 464 void (*handler_free)(void *)) { 457 465 int i, cnt = 0; 458 466 noit_conf_section_t *noit_configs; … … 474 482 continue; 475 483 } 484 /* if destination is specified, exact match it */ 485 if(destination && strcmp(address, destination)) continue; 486 476 487 if(!noit_conf_get_int(noit_configs[i], 477 488 "ancestor-or-self::node()/@port", &portint)) … … 487 498 config = noit_conf_get_hash(noit_configs[i], "config"); 488 499 489 initiate_jlog_streamer(address, port, sslconfig, config); 490 } 500 initiate_noit_connection(address, port, sslconfig, config, 501 handler, 502 handler_alloc ? handler_alloc() : handler_ctx, 503 handler_free); 504 } 505 } 506 void 507 stratcon_jlog_streamer_reload(const char *toplevel) { 508 stratcon_streamer_connection(toplevel, NULL, 509 stratcon_jlog_recv_handler, 510 (void *(*)())jlog_streamer_ctx_alloc, NULL, 511 jlog_streamer_ctx_free); 491 512 } 492 513 493 514 void 494 515 stratcon_jlog_streamer_init(const char *toplevel) { 495 eventer_name_callback(" jlog_streamer_reinitiate",496 jlog_streamer_reinitiate);516 eventer_name_callback("noit_connection_reinitiate", 517 noit_connection_reinitiate); 497 518 eventer_name_callback("stratcon_jlog_recv_handler", 498 519 stratcon_jlog_recv_handler); 499 eventer_name_callback(" jlog_streamer_ssl_upgrade",500 jlog_streamer_ssl_upgrade);501 eventer_name_callback(" jlog_streamer_complete_connect",502 jlog_streamer_complete_connect);520 eventer_name_callback("noit_connection_ssl_upgrade", 521 noit_connection_ssl_upgrade); 522 eventer_name_callback("noit_connection_complete_connect", 523 noit_connection_complete_connect); 503 524 stratcon_jlog_streamer_reload(toplevel); 504 525 } src/stratcon_jlog_streamer.h
ra7304b5 r21b0c6c 7 7 #define _STRATCON_LOG_STREAMER_H 8 8 9 #include "noit_defines.h" 9 #include "noit_conf.h" 10 #include <netinet/in.h> 11 #include <sys/un.h> 12 #include <arpa/inet.h> 13 14 typedef struct noit_connection_ctx_t { 15 union { 16 struct sockaddr remote; 17 struct sockaddr_un remote_un; 18 struct sockaddr_in remote_in; 19 struct sockaddr_in6 remote_in6; 20 } r; 21 socklen_t remote_len; 22 char *remote_cn; 23 u_int32_t current_backoff; 24 int wants_shutdown; 25 noit_hash_table *config; 26 noit_hash_table *sslconfig; 27 eventer_t timeout_event; 28 29 eventer_func_t consumer_callback; 30 void (*consumer_free)(void *); 31 void *consumer_ctx; 32 } noit_connection_ctx_t; 10 33 11 34 API_EXPORT(void) … … 13 36 API_EXPORT(void) 14 37 stratcon_jlog_streamer_reload(const char *toplevel); 38 API_EXPORT(void) 39 stratcon_streamer_connection(const char *toplevel, const char *destination, 40 eventer_func_t handler, 41 void *(*handler_alloc)(void), void *handler_ctx, 42 void (*handler_free)(void *)); 15 43 16 44 #endif src/stratcon_realtime_http.c
r8757d86 r21b0c6c 13 13 #include "noit_listener.h" 14 14 #include "noit_http.h" 15 16 typedef struct { 17 int setup; 15 #include "noit_livestream_listener.h" 16 #include "stratcon_realtime_http.h" 17 #include "stratcon_jlog_streamer.h" 18 #include "stratcon_datastore.h" 19 20 #include <unistd.h> 21 #include <assert.h> 22 #include <errno.h> 23 #include <sys/types.h> 24 #include <sys/socket.h> 25 #ifdef HAVE_SYS_FILIO_H 26 #include <sys/filio.h> 27 #endif 28 #include <netinet/in.h> 29 #include <sys/un.h> 30 #include <arpa/inet.h> 31 32 33 typedef struct realtime_recv_ctx_t { 34 int bytes_expected; 35 int bytes_read; 36 char *buffer; /* These guys are for doing partial reads */ 37 38 enum { 39 WANT_INITIATE = 0, 40 WANT_SEND_INTERVAL = 1, 41 WANT_SEND_UUID = 2, 42 WANT_HEADER = 3, 43 WANT_BODY = 4, 44 } state; 45 int count; /* Number of jlog messages we need to read */ 46 noit_http_session_ctx *ctx; 47 struct realtime_tracker *rt; 48 } realtime_recv_ctx_t; 49 50 typedef struct realtime_context { 51 enum { RC_INITIAL = 0, RC_REQ_RECV, RC_INTERESTS_RESOLVED, RC_FEEDING } setup; 52 struct realtime_tracker *checklist; 18 53 } realtime_context; 19 54 … … 22 57 return calloc(sizeof(*ctx), 1); 23 58 } 24 int 25 stratcon_realtime_ticker(eventer_t old, int mask, void *closure, 26 struct timeval *now) { 59 static void free_realtime_tracker(struct realtime_tracker *rt) { 60 if(rt->noit) free(rt->noit); 61 free(rt); 62 } 63 static void clear_realtime_context(realtime_context *rc) { 64 while(rc->checklist) { 65 struct realtime_tracker *tofree; 66 tofree = rc->checklist; 67 rc->checklist = tofree->next; 68 free_realtime_tracker(tofree); 69 } 70 } 71 int 72 stratcon_line_to_javascript(noit_http_session_ctx *ctx, char *buff) { 27 73 char buffer[1024]; 28 noit_http_session_ctx *ctx = closure; 29 74 char *scp, *ecp, *token; 75 int len; 76 77 #define PROCESS_NEXT_FIELD(t,l) do { \ 78 if(!*scp) goto bad_row; \ 79 ecp = strchr(scp, '\t'); \ 80 if(!ecp) goto bad_row; \ 81 t = scp; \ 82 l = (ecp-scp); \ 83 scp = ecp + 1; \ 84 } while(0) 85 #define PROCESS_LAST_FIELD(t,l) do { \ 86 if(!*scp) ecp = scp; \ 87 else { \ 88 ecp = scp + strlen(scp); /* Puts us at the '\0' */ \ 89 if(*(ecp-1) == '\n') ecp--; /* We back up on letter if we ended in \n */ \ 90 } \ 91 t = scp; \ 92 l = (ecp-scp); \ 93 } while(0) 94 95 PROCESS_NEXT_FIELD(token,len); /* Skip the leader */ 96 if(buff[0] == 'M') { 97 scp = buff; 98 snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('"); 99 if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 100 101 /* Time */ 102 PROCESS_NEXT_FIELD(token,len); 103 if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 104 105 snprintf(buffer, sizeof(buffer), "', '"); 106 if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 107 108 /* UUID */ 109 PROCESS_NEXT_FIELD(token,len); 110 if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 111 112 snprintf(buffer, sizeof(buffer), "', '"); 113 if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 114 115 /* name */ 116 PROCESS_NEXT_FIELD(token,len); 117 if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 118 119 snprintf(buffer, sizeof(buffer), "', '"); 120 if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 121 122 PROCESS_NEXT_FIELD(token,len); /* skip type */ 123 PROCESS_LAST_FIELD(token,len); /* value */ 124 if(noit_http_response_append(ctx, token, len) == noit_false) return -1; 125 126 snprintf(buffer, sizeof(buffer), "');</script>\n"); 127 if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; 128 129 if(noit_http_response_flush(ctx, noit_false) == noit_false) return -1; 130 } 131 132 return 0; 133 134 bad_row: 135 return -1; 30 136 if(0) { 31 137 noit_http_response_end(ctx); … … 34 140 return 0; 35 141 } 36 37 eventer_t e = eventer_alloc(); 38 gettimeofday(&e->whence, NULL); 39 40 snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('%lu','%s','%0.3f','%lu');</script>\n", 41 (unsigned long)1179, "allocator_requests", (float)(rand() % 100000) / 1000.0, 42 e->whence.tv_sec * 1000 + e->whence.tv_usec / 1000); 43 noit_http_response_append(ctx, buffer, strlen(buffer)); 44 noit_http_response_flush(ctx, noit_false); 45 46 e->mask = EVENTER_TIMER; 47 e->whence.tv_sec += 0; 48 e->whence.tv_usec += 500000; 49 e->callback = stratcon_realtime_ticker; 50 e->closure = closure; 51 eventer_add(e); 142 } 143 int 144 stratcon_realtime_uri_parse(realtime_context *rc, char *uri) { 145 int len, cnt = 0; 146 char *cp, *copy, *interest, *brk; 147 if(strncmp(uri, "/data/", 6)) return 0; 148 cp = uri + 6; 149 len = strlen(cp); 150 copy = alloca(len + 1); 151 if(!copy) return 0; 152 memcpy(copy, cp, len); 153 copy[len] = '\0'; 154 155 for (interest = strtok_r(copy, "/", &brk); 156 interest; 157 interest = strtok_r(NULL, "/", &brk)) { 158 struct realtime_tracker *node; 159 char *interval; 160 161 interval = strchr(interest, '@'); 162 if(!interval) return 0; 163 *interval++ = '\0'; 164 node = calloc(1, sizeof(*node)); 165 node->rc = rc; 166 node->sid = atoi(interest); 167 node->interval = atoi(interval); 168 node->next = rc->checklist; 169 rc->checklist = node; 170 cnt++; 171 } 172 return cnt; 173 } 174 static void 175 free_realtime_recv_ctx(void *vctx) { 176 realtime_recv_ctx_t *rrctx = vctx; 177 noit_atomic_dec32(&rrctx->ctx->ref_cnt); 178 free(rrctx); 179 } 180 #define Eread(a,b) e->opset->read(e->fd, (a), (b), &mask, e) 181 static int 182 __read_on_ctx(eventer_t e, realtime_recv_ctx_t *ctx, int *newmask) { 183 int len, mask; 184 while(ctx->bytes_read < ctx->bytes_expected) { 185 len = Eread(ctx->buffer + ctx->bytes_read, 186 ctx->bytes_expected - ctx->bytes_read); 187 if(len < 0) { 188 *newmask = mask; 189 return -1; 190 } 191 /* if we get 0 inside SSL, and there was a real error, we 192 * will actually get a -1 here. 193 * if(len == 0) return ctx->bytes_read; 194 */ 195 ctx->bytes_read += len; 196 } 197 assert(ctx->bytes_read == ctx->bytes_expected); 198 return ctx->bytes_read; 199 } 200 #define FULLREAD(e,ctx,size) do { \ 201 int mask, len; \ 202 if(!ctx->bytes_expected) { \ 203 ctx->bytes_expected = size; \ 204 if(ctx->buffer) free(ctx->buffer); \ 205 ctx->buffer = malloc(size + 1); \ 206 if(ctx->buffer == NULL) { \ 207 noitL(noit_error, "malloc(%lu) failed.\n", (unsigned long)size + 1); \ 208 goto socket_error; \ 209 } \ 210 ctx->buffer[size] = '\0'; \ 211 } \ 212 len = __read_on_ctx(e, ctx, &mask); \ 213 if(len < 0) { \ 214 if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \ 215 noitL(noit_error, "SSL read error: %s\n", strerror(errno)); \ 216 goto socket_error; \ 217 } \ 218 ctx->bytes_read = 0; \ 219 ctx->bytes_expected = 0; \ 220 if(len != size) { \ 221 noitL(noit_error, "SSL short read [%d] (%d/%lu). Reseting connection.\n", \ 222 ctx->state, len, (unsigned long)size); \ 223 goto socket_error; \ 224 } \ 225 } while(0) 226 227 int 228 stratcon_realtime_recv_handler(eventer_t e, int mask, void *closure, 229 struct timeval *now) { 230 static u_int32_t livestream_cmd = 0; 231 noit_connection_ctx_t *nctx = closure; 232 realtime_recv_ctx_t *ctx = nctx->consumer_ctx; 233 int len; 234 u_int32_t nint; 235 char uuid_str[37]; 236 237 if(!livestream_cmd) livestream_cmd = htonl(NOIT_LIVESTREAM_DATA_FEED); 238 239 if(mask & EVENTER_EXCEPTION || nctx->wants_shutdown) { 240 socket_error: 241 ctx->state = WANT_INITIATE; 242 ctx->count = 0; 243 ctx->bytes_read = 0; 244 ctx->bytes_expected = 0; 245 if(ctx->buffer) free(ctx->buffer); 246 ctx->buffer = NULL; 247 eventer_remove_fd(e->fd); 248 e->opset->close(e->fd, &mask, e); 249 return 0; 250 } 251 252 #define full_nb_write(data, wlen) do { \ 253 len = e->opset->write(e->fd, data, wlen, \ 254 &mask, e); \ 255 if(len < 0) { \ 256 if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \ 257 goto socket_error; \ 258 } \ 259 if(len != sizeof(livestream_cmd)) { \ 260 noitL(noit_error, "short write on initiating stream.\n"); \ 261 goto socket_error; \ 262 } \ 263 } while(0) 264 265 while(1) { 266 switch(ctx->state) { 267 case WANT_INITIATE: 268 full_nb_write(&livestream_cmd, sizeof(livestream_cmd)); 269 ctx->state = WANT_SEND_INTERVAL; 270 case WANT_SEND_INTERVAL: 271 nint = htonl(ctx->rt->interval); 272 full_nb_write(&nint, sizeof(nint)); 273 ctx->state = WANT_SEND_UUID; 274 case WANT_SEND_UUID: 275 uuid_unparse_lower(ctx->rt->checkid, uuid_str); 276 full_nb_write(uuid_str, 36); 277 ctx->state = WANT_HEADER; 278 case WANT_HEADER: 279 FULLREAD(e, ctx, sizeof(u_int32_t)); 280 memcpy(&ctx->bytes_expected, ctx->buffer, sizeof(u_int32_t)); 281 ctx->bytes_expected = ntohl(ctx->bytes_expected); 282 free(ctx->buffer); ctx->buffer = NULL; 283 ctx->state = WANT_BODY; 284 break; 285 case WANT_BODY: 286 FULLREAD(e, ctx, ctx->bytes_expected); 287 noitL(noit_error, "Read: '%s'\n", ctx->buffer); 288 if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error; 289 free(ctx->buffer); ctx->buffer = NULL; 290 ctx->state = WANT_BODY; 291 break; 292 } 293 } 294 295 } 296 297 int 298 stratcon_realtime_http_postresolve(eventer_t e, int mask, void *closure, 299 struct timeval *now) { 300 noit_http_session_ctx *ctx = closure; 301 realtime_context *rc = ctx->dispatcher_closure; 302 struct realtime_tracker *node; 303 304 for(node = rc->checklist; node; node = node->next) { 305 if(node->noit) { 306 realtime_recv_ctx_t *rrctx; 307 rrctx = calloc(1, sizeof(*rrctx)); 308 rrctx->ctx = ctx; 309 rrctx->rt = node; 310 stratcon_streamer_connection(NULL, node->noit, 311 stratcon_realtime_recv_handler, 312 NULL, rrctx, 313 free_realtime_recv_ctx); 314 } 315 else 316 noit_atomic_dec32(&ctx->ref_cnt); 317 } 318 if(ctx->ref_cnt == 1) { 319 noit_http_response_end(ctx); 320 clear_realtime_context(rc); 321 memset(ctx->dispatcher_closure, 0, sizeof(realtime_context)); 322 if(ctx->conn.e) eventer_trigger(ctx->conn.e, EVENTER_WRITE); 323 } 52 324 return 0; 53 325 } … … 60 332 noit_http_request *req = &ctx->req; 61 333 62 if(strcmp(ctx->req.uri_str, "/data")) { 63 noit_http_response_status_set(ctx, 404, "OK"); 64 noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE); 65 noit_http_response_end(ctx); 66 return 0; 67 } 68 if(!rc->setup) { 334 if(rc->setup == RC_INITIAL) { 335 eventer_t completion; 336 struct realtime_tracker *node; 69 337 char c[1024]; 338 int num_interests; 339 340 num_interests = stratcon_realtime_uri_parse(rc, ctx->req.uri_str); 341 if(num_interests == 0) { 342 noit_http_response_status_set(ctx, 404, "OK"); 343 noit_http_response_option_set(ctx, NOIT_HTTP_CLOSE); 344 noit_http_response_end(ctx); 345 return 0; 346 } 347 70 348 noitL(noit_error, "http: %s %s %s\n", 71 349 req->method_str, req->uri_str, req->protocol_str); … … 75 353 noit_http_response_status_set(ctx, 200, "OK"); 76 354 noit_http_response_option_set(ctx, NOIT_HTTP_CHUNKED); 77 /* noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE); */ 355 noit_http_response_option_set(ctx, NOIT_HTTP_DEFLATE); 78 356 noit_http_response_header_set(ctx, "Content-Type", "text/html"); 79 357 … … 81 359 noit_http_response_append(ctx, c, strlen(c)); 82 360 83 memset(c, ' ', 1024); 361 /* this dumb crap is to make some browsers happy (Safari) */ 362 memset(c, ' ', sizeof(c)); 84 363 noit_http_response_append(ctx, c, sizeof(c)); 85 364 noit_http_response_flush(ctx, noit_false); 86 365 87 stratcon_realtime_ticker(NULL, 0, ctx, NULL); 88 rc->setup = 1; 366 rc->setup = RC_REQ_RECV; 367 /* Each interest references the ctx */ 368 for(node = rc->checklist; node; node = node->next) { 369 noit_atomic_inc32(&ctx->ref_cnt); 370 stratcon_datastore_push(DS_OP_FIND, NULL, node); 371 noitL(noit_error, "Resolving sid: %d\n", node->sid); 372 } 373 completion = eventer_alloc(); 374 completion->mask = EVENTER_TIMER; 375 completion->callback = stratcon_realtime_http_postresolve; 376 completion->closure = ctx; 377 gettimeofday(&completion->whence, NULL); 378 stratcon_datastore_push(DS_OP_FIND_COMPLETE, NULL, completion); 89 379 } 90 380 return EVENTER_EXCEPTION; src/stratcon_realtime_http.h
r55168c7 r21b0c6c 7 7 #define _STRATCON_REALTIME_HTTP_H 8 8 9 #include "noit_defines.h" 9 #include "noit_conf.h" 10 11 /* This is in the public header as the stratcon_datastore must know 12 * how to resolve this 13 */ 14 struct realtime_tracker { 15 int sid; /* set by request */ 16 int interval; /* set by request */ 17 char *noit; /* resolved by datastore */ 18 uuid_t checkid; /* resolved by datastore */ 19 struct realtime_tracker *next; /* next in series */ 20 eventer_t conn; /* used to track noitd connection feeding this */ 21 struct realtime_context *rc; /* link back to the rc that justified us */ 22 }; 10 23 11 24 API_EXPORT(void)
