Changeset 986ca9cdcea688fff7615506f7a522fed667967b
- Timestamp:
- 05/13/11 03:05:48
(7 years ago)
- Author:
- Theo Schlossnagle <jesus@omniti.com>
- git-committer:
- Theo Schlossnagle <jesus@omniti.com> 1305255948 -0400
- git-parent:
[d2b2cf6a9fe0bc981e17e3ca09ad4c053f699dff]
- git-author:
- Theo Schlossnagle <jesus@omniti.com> 1305255948 -0400
- Message:
add support for asynchronous jlog'in
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
r2648da9 |
r986ca9c |
|
226 | 226 | }; |
---|
227 | 227 | |
---|
| 228 | typedef struct jlog_line { |
---|
| 229 | char *buf; |
---|
| 230 | char buf_static[512]; |
---|
| 231 | char *buf_dynamic; |
---|
| 232 | int len; |
---|
| 233 | void *next; |
---|
| 234 | } jlog_line; |
---|
| 235 | |
---|
| 236 | typedef struct { |
---|
| 237 | jlog_ctx *log; |
---|
| 238 | pthread_t writer; |
---|
| 239 | void *head; |
---|
| 240 | } jlog_asynch_ctx; |
---|
| 241 | |
---|
228 | 242 | static int |
---|
229 | 243 | jlog_lspath_to_fspath(noit_log_stream_t ls, char *buff, int len, |
---|
… | … | |
267 | 281 | static int |
---|
268 | 282 | jlog_logio_cleanse(noit_log_stream_t ls) { |
---|
| 283 | jlog_asynch_ctx *actx; |
---|
269 | 284 | jlog_ctx *log; |
---|
270 | 285 | DIR *d; |
---|
… | … | |
274 | 289 | int size = 0; |
---|
275 | 290 | |
---|
276 | | log = (jlog_ctx *)ls->op_ctx; |
---|
| 291 | actx = (jlog_asynch_ctx *)ls->op_ctx; |
---|
| 292 | if(!actx) return -1; |
---|
| 293 | log = actx->log; |
---|
277 | 294 | if(!log) return -1; |
---|
278 | 295 | if(jlog_lspath_to_fspath(ls, path, sizeof(path), NULL) <= 0) return -1; |
---|
… | … | |
315 | 332 | jlog_logio_reopen(noit_log_stream_t ls) { |
---|
316 | 333 | char **subs; |
---|
| 334 | jlog_asynch_ctx *actx = ls->op_ctx; |
---|
317 | 335 | int i; |
---|
318 | 336 | /* reopening only has the effect of removing temporary subscriptions */ |
---|
… | … | |
320 | 338 | |
---|
321 | 339 | if(ls->lock) pthread_rwlock_wrlock(ls->lock); |
---|
322 | | if(jlog_ctx_list_subscribers(ls->op_ctx, &subs) == -1) |
---|
| 340 | if(jlog_ctx_list_subscribers(actx->log, &subs) == -1) |
---|
323 | 341 | goto bail; |
---|
324 | 342 | |
---|
325 | 343 | for(i=0;subs[i];i++) |
---|
326 | 344 | if(subs[i][0] == '~') |
---|
327 | | jlog_ctx_remove_subscriber(ls->op_ctx, subs[i]); |
---|
328 | | |
---|
329 | | jlog_ctx_list_subscribers_dispose(ls->op_ctx, subs); |
---|
| 345 | jlog_ctx_remove_subscriber(actx->log, subs[i]); |
---|
| 346 | |
---|
| 347 | jlog_ctx_list_subscribers_dispose(actx->log, subs); |
---|
330 | 348 | jlog_logio_cleanse(ls); |
---|
331 | 349 | bail: |
---|
… | … | |
333 | 351 | return 0; |
---|
334 | 352 | } |
---|
| 353 | static jlog_line * |
---|
| 354 | jlog_asynch_pop(jlog_asynch_ctx *actx, jlog_line **iter) { |
---|
| 355 | jlog_line *h = NULL, *rev = NULL; |
---|
| 356 | |
---|
| 357 | if(*iter) { /* we have more on the previous list */ |
---|
| 358 | h = *iter; |
---|
| 359 | *iter = h->next; |
---|
| 360 | return h; |
---|
| 361 | } |
---|
| 362 | |
---|
| 363 | while(1) { |
---|
| 364 | h = actx->head; |
---|
| 365 | if(noit_atomic_casptr((volatile void **)&actx->head, NULL, h) == h) break; |
---|
| 366 | } |
---|
| 367 | while(h) { |
---|
| 368 | /* which unshifted things into the queue -- it's backwards, reverse it */ |
---|
| 369 | jlog_line *tmp = h; |
---|
| 370 | h = h->next; |
---|
| 371 | tmp->next = rev; |
---|
| 372 | rev = tmp; |
---|
| 373 | } |
---|
| 374 | if(rev) *iter = rev->next; |
---|
| 375 | else *iter = NULL; |
---|
| 376 | return rev; |
---|
| 377 | } |
---|
| 378 | void |
---|
| 379 | jlog_asynch_push(jlog_asynch_ctx *actx, jlog_line *n) { |
---|
| 380 | while(1) { |
---|
| 381 | n->next = actx->head; |
---|
| 382 | if(noit_atomic_casptr((volatile void **)&actx->head, n, n->next) == n->next) return; |
---|
| 383 | } |
---|
| 384 | } |
---|
| 385 | static void * |
---|
| 386 | jlog_logio_asynch_writer(void *vls) { |
---|
| 387 | noit_log_stream_t ls = vls; |
---|
| 388 | jlog_asynch_ctx *actx = ls->op_ctx; |
---|
| 389 | jlog_line *iter = NULL; |
---|
| 390 | while(1) { |
---|
| 391 | int fast = 0, max = 1000; |
---|
| 392 | jlog_line *line; |
---|
| 393 | pthread_rwlock_rdlock(ls->lock); |
---|
| 394 | while(max > 0 && NULL != (line = jlog_asynch_pop(actx, &iter))) { |
---|
| 395 | jlog_ctx_write(actx->log, line->buf_dynamic ? |
---|
| 396 | line->buf_dynamic : |
---|
| 397 | line->buf_static, |
---|
| 398 | line->len); |
---|
| 399 | if(line->buf_dynamic != NULL) free(line->buf_dynamic); |
---|
| 400 | free(line); |
---|
| 401 | fast = 1; |
---|
| 402 | max--; |
---|
| 403 | } |
---|
| 404 | pthread_rwlock_unlock(ls->lock); |
---|
| 405 | if(max > 0) { |
---|
| 406 | /* we didn't hit our limit... so we ran the queue dry */ |
---|
| 407 | /* 200ms if there was nothing, 10ms otherwise */ |
---|
| 408 | usleep(fast ? 10000 : 200000); |
---|
| 409 | } |
---|
| 410 | } |
---|
| 411 | } |
---|
335 | 412 | static int |
---|
336 | 413 | jlog_logio_open(noit_log_stream_t ls) { |
---|
337 | 414 | char path[PATH_MAX], *sub, **subs, *p; |
---|
| 415 | jlog_asynch_ctx *actx; |
---|
338 | 416 | jlog_ctx *log = NULL; |
---|
| 417 | pthread_attr_t tattr; |
---|
339 | 418 | int i, listed, found; |
---|
340 | 419 | |
---|
… | … | |
417 | 496 | } |
---|
418 | 497 | |
---|
419 | | ls->op_ctx = log; |
---|
| 498 | actx = calloc(1, sizeof(*actx)); |
---|
| 499 | actx->log = log; |
---|
| 500 | ls->op_ctx = actx; |
---|
420 | 501 | /* We do this to clean things up */ |
---|
421 | 502 | jlog_logio_reopen(ls); |
---|
| 503 | |
---|
| 504 | pthread_attr_init(&tattr); |
---|
| 505 | pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); |
---|
| 506 | if(pthread_create(&actx->writer, &tattr, jlog_logio_asynch_writer, ls) != 0) { |
---|
| 507 | return -1; |
---|
| 508 | } |
---|
422 | 509 | return 0; |
---|
423 | 510 | } |
---|
… | … | |
425 | 512 | jlog_logio_write(noit_log_stream_t ls, const void *buf, size_t len) { |
---|
426 | 513 | int rv = -1; |
---|
| 514 | jlog_asynch_ctx *actx; |
---|
| 515 | jlog_line *line; |
---|
427 | 516 | if(!ls->op_ctx) return -1; |
---|
428 | | pthread_rwlock_rdlock(ls->lock); |
---|
429 | | if(jlog_ctx_write((jlog_ctx *)ls->op_ctx, buf, len) == 0) |
---|
430 | | rv = len; |
---|
431 | | pthread_rwlock_unlock(ls->lock); |
---|
| 517 | actx = ls->op_ctx; |
---|
| 518 | line = calloc(1, sizeof(*line)); |
---|
| 519 | if(len > sizeof(line->buf_static)) { |
---|
| 520 | line->buf_dynamic = malloc(len); |
---|
| 521 | memcpy(line->buf_dynamic, buf, len); |
---|
| 522 | } |
---|
| 523 | else { |
---|
| 524 | memcpy(line->buf_static, buf, len); |
---|
| 525 | } |
---|
| 526 | line->len = len; |
---|
| 527 | jlog_asynch_push(actx, line); |
---|
432 | 528 | return rv; |
---|
433 | 529 | } |
---|
… | … | |
435 | 531 | jlog_logio_close(noit_log_stream_t ls) { |
---|
436 | 532 | if(ls->op_ctx) { |
---|
437 | | jlog_ctx_close((jlog_ctx *)ls->op_ctx); |
---|
| 533 | jlog_asynch_ctx *actx = ls->op_ctx; |
---|
| 534 | jlog_ctx_close(actx->log); |
---|
438 | 535 | ls->op_ctx = NULL; |
---|
439 | 536 | } |
---|
… | … | |
443 | 540 | jlog_logio_size(noit_log_stream_t ls) { |
---|
444 | 541 | size_t size; |
---|
| 542 | jlog_asynch_ctx *actx; |
---|
445 | 543 | if(!ls->op_ctx) return -1; |
---|
| 544 | actx = ls->op_ctx; |
---|
446 | 545 | pthread_rwlock_rdlock(ls->lock); |
---|
447 | | size = jlog_raw_size((jlog_ctx *)ls->op_ctx); |
---|
| 546 | size = jlog_raw_size(actx->log); |
---|
448 | 547 | pthread_rwlock_unlock(ls->lock); |
---|
449 | 548 | return size; |
---|