Changeset 2d89607d671aa2191518fe131eb3f8bcee49127a
- Timestamp:
- 01/25/09 17:12:28
(4 years ago)
- Author:
- Theo Schlossnagle <jesus@omniti.com>
- git-committer:
- Theo Schlossnagle <jesus@omniti.com> 1232903548 +0000
- git-parent:
[1b6952c0dbe227d2f82b8d693a0b6d776470ce41]
- git-author:
- Theo Schlossnagle <jesus@omniti.com> 1232903548 +0000
- Message:
fixes... holy crap, it works. This things leaks and persists when it shouldn't... but we have data flow! refs #71
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| r057e0c6 |
r2d89607 |
|
| 801 | 801 | report_change = 1; |
|---|
| 802 | 802 | |
|---|
| 803 | | noitL(noit_error, "%s`%s <- [%s]\n", check->target, check->name, |
|---|
| | 803 | noitL(noit_debug, "%s`%s <- [%s]\n", check->target, check->name, |
|---|
| 804 | 804 | check->stats.current.status); |
|---|
| 805 | 805 | if(report_change) { |
|---|
| 806 | | noitL(noit_error, "%s`%s -> [%s:%s]\n", |
|---|
| | 806 | noitL(noit_debug, "%s`%s -> [%s:%s]\n", |
|---|
| 807 | 807 | check->target, check->name, |
|---|
| 808 | 808 | noit_check_available_string(check->stats.current.available), |
|---|
| r21b0c6c |
r2d89607 |
|
| 512 | 512 | return noit_false; |
|---|
| 513 | 513 | if(!ctx->res.output) |
|---|
| 514 | | ctx->res.output = bchain_alloc(DEFAULT_BCHAINSIZE); |
|---|
| | 514 | assert(ctx->res.output = bchain_alloc(DEFAULT_BCHAINSIZE)); |
|---|
| 515 | 515 | o = ctx->res.output; |
|---|
| 516 | 516 | while(o->next) o = o->next; |
|---|
| r15ce866 |
r2d89607 |
|
| 90 | 90 | noit_livestream_closure_t *jcl; |
|---|
| 91 | 91 | jcl = calloc(1, sizeof(*jcl)); |
|---|
| | 92 | pthread_mutex_init(&jcl->lqueue_lock, NULL); |
|---|
| | 93 | sem_init(&jcl->lqueue_sem, 0, 0); |
|---|
| 92 | 94 | return jcl; |
|---|
| 93 | 95 | } |
|---|
| … | … | |
| 126 | 128 | |
|---|
| 127 | 129 | /* Go into blocking mode */ |
|---|
| 128 | | ioctl(e->fd, FIONBIO, &off); |
|---|
| | 130 | if(ioctl(e->fd, FIONBIO, &off) == -1) { |
|---|
| | 131 | noitL(noit_error, "ioctl failed setting livestream to blocking: [%d] [%s]\n", |
|---|
| | 132 | errno, strerror(errno)); |
|---|
| | 133 | goto alldone; |
|---|
| | 134 | } |
|---|
| 129 | 135 | |
|---|
| 130 | 136 | while(1) { |
|---|
| r1b6952c |
r2d89607 |
|
| 34 | 34 | int bytes_expected; |
|---|
| 35 | 35 | int bytes_read; |
|---|
| | 36 | int bytes_written; |
|---|
| | 37 | int body_len; |
|---|
| 36 | 38 | char *buffer; /* These guys are for doing partial reads */ |
|---|
| 37 | 39 | |
|---|
| … | … | |
| 93 | 95 | } while(0) |
|---|
| 94 | 96 | |
|---|
| | 97 | scp = buff; |
|---|
| 95 | 98 | PROCESS_NEXT_FIELD(token,len); /* Skip the leader */ |
|---|
| 96 | 99 | if(buff[0] == 'M') { |
|---|
| 97 | | scp = buff; |
|---|
| 98 | 100 | snprintf(buffer, sizeof(buffer), "<script>window.parent.plot_iframe_data('"); |
|---|
| 99 | 101 | if(noit_http_response_append(ctx, buffer, strlen(buffer)) == noit_false) return -1; |
|---|
| … | … | |
| 242 | 244 | ctx->count = 0; |
|---|
| 243 | 245 | ctx->bytes_read = 0; |
|---|
| | 246 | ctx->bytes_written = 0; |
|---|
| 244 | 247 | ctx->bytes_expected = 0; |
|---|
| 245 | 248 | if(ctx->buffer) free(ctx->buffer); |
|---|
| … | … | |
| 252 | 255 | |
|---|
| 253 | 256 | #define full_nb_write(data, wlen) do { \ |
|---|
| 254 | | len = e->opset->write(e->fd, data, wlen, \ |
|---|
| 255 | | &mask, e); \ |
|---|
| 256 | | if(len < 0) { \ |
|---|
| 257 | | if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \ |
|---|
| | 257 | if(!ctx->bytes_expected) { \ |
|---|
| | 258 | ctx->bytes_written = 0; \ |
|---|
| | 259 | ctx->bytes_expected = wlen; \ |
|---|
| | 260 | } \ |
|---|
| | 261 | while(ctx->bytes_written < ctx->bytes_expected) { \ |
|---|
| | 262 | while(-1 == (len = e->opset->write(e->fd, ((char *)data) + ctx->bytes_written, \ |
|---|
| | 263 | ctx->bytes_expected - ctx->bytes_written, \ |
|---|
| | 264 | &mask, e)) && errno == EINTR); \ |
|---|
| | 265 | if(len < 0) { \ |
|---|
| | 266 | if(errno == EAGAIN) return mask | EVENTER_EXCEPTION; \ |
|---|
| | 267 | goto socket_error; \ |
|---|
| | 268 | } \ |
|---|
| | 269 | ctx->bytes_written += len; \ |
|---|
| | 270 | } \ |
|---|
| | 271 | if(ctx->bytes_written != ctx->bytes_expected) { \ |
|---|
| | 272 | noitL(noit_error, "short write on initiating stream [%d != %d].\n", \ |
|---|
| | 273 | ctx->bytes_written, ctx->bytes_expected); \ |
|---|
| 258 | 274 | goto socket_error; \ |
|---|
| 259 | 275 | } \ |
|---|
| 260 | | if(len != sizeof(livestream_cmd)) { \ |
|---|
| 261 | | noitL(noit_error, "short write on initiating stream.\n"); \ |
|---|
| 262 | | goto socket_error; \ |
|---|
| 263 | | } \ |
|---|
| | 276 | ctx->bytes_expected = 0; \ |
|---|
| 264 | 277 | } while(0) |
|---|
| 265 | 278 | |
|---|
| 266 | 279 | while(1) { |
|---|
| | 280 | u_int32_t net_body_len; |
|---|
| | 281 | |
|---|
| 267 | 282 | switch(ctx->state) { |
|---|
| 268 | 283 | case WANT_INITIATE: |
|---|
| … | … | |
| 279 | 294 | case WANT_HEADER: |
|---|
| 280 | 295 | FULLREAD(e, ctx, sizeof(u_int32_t)); |
|---|
| 281 | | memcpy(&ctx->bytes_expected, ctx->buffer, sizeof(u_int32_t)); |
|---|
| 282 | | ctx->bytes_expected = ntohl(ctx->bytes_expected); |
|---|
| | 296 | memcpy(&net_body_len, ctx->buffer, sizeof(u_int32_t)); |
|---|
| | 297 | ctx->body_len = ntohl(net_body_len); |
|---|
| 283 | 298 | free(ctx->buffer); ctx->buffer = NULL; |
|---|
| 284 | 299 | ctx->state = WANT_BODY; |
|---|
| 285 | 300 | break; |
|---|
| 286 | 301 | case WANT_BODY: |
|---|
| 287 | | FULLREAD(e, ctx, ctx->bytes_expected); |
|---|
| | 302 | FULLREAD(e, ctx, ctx->body_len); |
|---|
| 288 | 303 | noitL(noit_error, "Read: '%s'\n", ctx->buffer); |
|---|
| 289 | 304 | if(stratcon_line_to_javascript(ctx->ctx, ctx->buffer)) goto socket_error; |
|---|
| 290 | 305 | free(ctx->buffer); ctx->buffer = NULL; |
|---|
| 291 | | ctx->state = WANT_BODY; |
|---|
| | 306 | ctx->state = WANT_HEADER; |
|---|
| 292 | 307 | break; |
|---|
| 293 | 308 | } |
|---|