Changeset 986ca9cdcea688fff7615506f7a522fed667967b
- Timestamp:
- 05/13/11 03:05:48
(2 years ago)
- Author:
- Theo Schlossnagle <jesus@omniti.com>
- git-committer:
- Theo Schlossnagle <jesus@omniti.com> 1305255948 -0400
- git-parent:
[d2b2cf6a9fe0bc981e17e3ca09ad4c053f699dff]
- git-author:
- Theo Schlossnagle <jesus@omniti.com> 1305255948 -0400
- Message:
add support for asynchronous jlog'in
-
Files:
-
Legend:
- Unmodified
- Added
- Removed
- Modified
- Copied
- Moved
| r2648da9 |
r986ca9c |
|
| 226 | 226 | }; |
|---|
| 227 | 227 | |
|---|
| | 228 | typedef struct jlog_line { |
|---|
| | 229 | char *buf; |
|---|
| | 230 | char buf_static[512]; |
|---|
| | 231 | char *buf_dynamic; |
|---|
| | 232 | int len; |
|---|
| | 233 | void *next; |
|---|
| | 234 | } jlog_line; |
|---|
| | 235 | |
|---|
| | 236 | typedef struct { |
|---|
| | 237 | jlog_ctx *log; |
|---|
| | 238 | pthread_t writer; |
|---|
| | 239 | void *head; |
|---|
| | 240 | } jlog_asynch_ctx; |
|---|
| | 241 | |
|---|
| 228 | 242 | static int |
|---|
| 229 | 243 | jlog_lspath_to_fspath(noit_log_stream_t ls, char *buff, int len, |
|---|
| … | … | |
| 267 | 281 | static int |
|---|
| 268 | 282 | jlog_logio_cleanse(noit_log_stream_t ls) { |
|---|
| | 283 | jlog_asynch_ctx *actx; |
|---|
| 269 | 284 | jlog_ctx *log; |
|---|
| 270 | 285 | DIR *d; |
|---|
| … | … | |
| 274 | 289 | int size = 0; |
|---|
| 275 | 290 | |
|---|
| 276 | | log = (jlog_ctx *)ls->op_ctx; |
|---|
| | 291 | actx = (jlog_asynch_ctx *)ls->op_ctx; |
|---|
| | 292 | if(!actx) return -1; |
|---|
| | 293 | log = actx->log; |
|---|
| 277 | 294 | if(!log) return -1; |
|---|
| 278 | 295 | if(jlog_lspath_to_fspath(ls, path, sizeof(path), NULL) <= 0) return -1; |
|---|
| … | … | |
| 315 | 332 | jlog_logio_reopen(noit_log_stream_t ls) { |
|---|
| 316 | 333 | char **subs; |
|---|
| | 334 | jlog_asynch_ctx *actx = ls->op_ctx; |
|---|
| 317 | 335 | int i; |
|---|
| 318 | 336 | /* reopening only has the effect of removing temporary subscriptions */ |
|---|
| … | … | |
| 320 | 338 | |
|---|
| 321 | 339 | if(ls->lock) pthread_rwlock_wrlock(ls->lock); |
|---|
| 322 | | if(jlog_ctx_list_subscribers(ls->op_ctx, &subs) == -1) |
|---|
| | 340 | if(jlog_ctx_list_subscribers(actx->log, &subs) == -1) |
|---|
| 323 | 341 | goto bail; |
|---|
| 324 | 342 | |
|---|
| 325 | 343 | for(i=0;subs[i];i++) |
|---|
| 326 | 344 | if(subs[i][0] == '~') |
|---|
| 327 | | jlog_ctx_remove_subscriber(ls->op_ctx, subs[i]); |
|---|
| 328 | | |
|---|
| 329 | | jlog_ctx_list_subscribers_dispose(ls->op_ctx, subs); |
|---|
| | 345 | jlog_ctx_remove_subscriber(actx->log, subs[i]); |
|---|
| | 346 | |
|---|
| | 347 | jlog_ctx_list_subscribers_dispose(actx->log, subs); |
|---|
| 330 | 348 | jlog_logio_cleanse(ls); |
|---|
| 331 | 349 | bail: |
|---|
| … | … | |
| 333 | 351 | return 0; |
|---|
| 334 | 352 | } |
|---|
| | 353 | static jlog_line * |
|---|
| | 354 | jlog_asynch_pop(jlog_asynch_ctx *actx, jlog_line **iter) { |
|---|
| | 355 | jlog_line *h = NULL, *rev = NULL; |
|---|
| | 356 | |
|---|
| | 357 | if(*iter) { /* we have more on the previous list */ |
|---|
| | 358 | h = *iter; |
|---|
| | 359 | *iter = h->next; |
|---|
| | 360 | return h; |
|---|
| | 361 | } |
|---|
| | 362 | |
|---|
| | 363 | while(1) { |
|---|
| | 364 | h = actx->head; |
|---|
| | 365 | if(noit_atomic_casptr((volatile void **)&actx->head, NULL, h) == h) break; |
|---|
| | 366 | } |
|---|
| | 367 | while(h) { |
|---|
| | 368 | /* which unshifted things into the queue -- it's backwards, reverse it */ |
|---|
| | 369 | jlog_line *tmp = h; |
|---|
| | 370 | h = h->next; |
|---|
| | 371 | tmp->next = rev; |
|---|
| | 372 | rev = tmp; |
|---|
| | 373 | } |
|---|
| | 374 | if(rev) *iter = rev->next; |
|---|
| | 375 | else *iter = NULL; |
|---|
| | 376 | return rev; |
|---|
| | 377 | } |
|---|
| | 378 | void |
|---|
| | 379 | jlog_asynch_push(jlog_asynch_ctx *actx, jlog_line *n) { |
|---|
| | 380 | while(1) { |
|---|
| | 381 | n->next = actx->head; |
|---|
| | 382 | if(noit_atomic_casptr((volatile void **)&actx->head, n, n->next) == n->next) return; |
|---|
| | 383 | } |
|---|
| | 384 | } |
|---|
| | 385 | static void * |
|---|
| | 386 | jlog_logio_asynch_writer(void *vls) { |
|---|
| | 387 | noit_log_stream_t ls = vls; |
|---|
| | 388 | jlog_asynch_ctx *actx = ls->op_ctx; |
|---|
| | 389 | jlog_line *iter = NULL; |
|---|
| | 390 | while(1) { |
|---|
| | 391 | int fast = 0, max = 1000; |
|---|
| | 392 | jlog_line *line; |
|---|
| | 393 | pthread_rwlock_rdlock(ls->lock); |
|---|
| | 394 | while(max > 0 && NULL != (line = jlog_asynch_pop(actx, &iter))) { |
|---|
| | 395 | jlog_ctx_write(actx->log, line->buf_dynamic ? |
|---|
| | 396 | line->buf_dynamic : |
|---|
| | 397 | line->buf_static, |
|---|
| | 398 | line->len); |
|---|
| | 399 | if(line->buf_dynamic != NULL) free(line->buf_dynamic); |
|---|
| | 400 | free(line); |
|---|
| | 401 | fast = 1; |
|---|
| | 402 | max--; |
|---|
| | 403 | } |
|---|
| | 404 | pthread_rwlock_unlock(ls->lock); |
|---|
| | 405 | if(max > 0) { |
|---|
| | 406 | /* we didn't hit our limit... so we ran the queue dry */ |
|---|
| | 407 | /* 200ms if there was nothing, 10ms otherwise */ |
|---|
| | 408 | usleep(fast ? 10000 : 200000); |
|---|
| | 409 | } |
|---|
| | 410 | } |
|---|
| | 411 | } |
|---|
| 335 | 412 | static int |
|---|
| 336 | 413 | jlog_logio_open(noit_log_stream_t ls) { |
|---|
| 337 | 414 | char path[PATH_MAX], *sub, **subs, *p; |
|---|
| | 415 | jlog_asynch_ctx *actx; |
|---|
| 338 | 416 | jlog_ctx *log = NULL; |
|---|
| | 417 | pthread_attr_t tattr; |
|---|
| 339 | 418 | int i, listed, found; |
|---|
| 340 | 419 | |
|---|
| … | … | |
| 417 | 496 | } |
|---|
| 418 | 497 | |
|---|
| 419 | | ls->op_ctx = log; |
|---|
| | 498 | actx = calloc(1, sizeof(*actx)); |
|---|
| | 499 | actx->log = log; |
|---|
| | 500 | ls->op_ctx = actx; |
|---|
| 420 | 501 | /* We do this to clean things up */ |
|---|
| 421 | 502 | jlog_logio_reopen(ls); |
|---|
| | 503 | |
|---|
| | 504 | pthread_attr_init(&tattr); |
|---|
| | 505 | pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED); |
|---|
| | 506 | if(pthread_create(&actx->writer, &tattr, jlog_logio_asynch_writer, ls) != 0) { |
|---|
| | 507 | return -1; |
|---|
| | 508 | } |
|---|
| 422 | 509 | return 0; |
|---|
| 423 | 510 | } |
|---|
| … | … | |
| 425 | 512 | jlog_logio_write(noit_log_stream_t ls, const void *buf, size_t len) { |
|---|
| 426 | 513 | int rv = -1; |
|---|
| | 514 | jlog_asynch_ctx *actx; |
|---|
| | 515 | jlog_line *line; |
|---|
| 427 | 516 | if(!ls->op_ctx) return -1; |
|---|
| 428 | | pthread_rwlock_rdlock(ls->lock); |
|---|
| 429 | | if(jlog_ctx_write((jlog_ctx *)ls->op_ctx, buf, len) == 0) |
|---|
| 430 | | rv = len; |
|---|
| 431 | | pthread_rwlock_unlock(ls->lock); |
|---|
| | 517 | actx = ls->op_ctx; |
|---|
| | 518 | line = calloc(1, sizeof(*line)); |
|---|
| | 519 | if(len > sizeof(line->buf_static)) { |
|---|
| | 520 | line->buf_dynamic = malloc(len); |
|---|
| | 521 | memcpy(line->buf_dynamic, buf, len); |
|---|
| | 522 | } |
|---|
| | 523 | else { |
|---|
| | 524 | memcpy(line->buf_static, buf, len); |
|---|
| | 525 | } |
|---|
| | 526 | line->len = len; |
|---|
| | 527 | jlog_asynch_push(actx, line); |
|---|
| 432 | 528 | return rv; |
|---|
| 433 | 529 | } |
|---|
| … | … | |
| 435 | 531 | jlog_logio_close(noit_log_stream_t ls) { |
|---|
| 436 | 532 | if(ls->op_ctx) { |
|---|
| 437 | | jlog_ctx_close((jlog_ctx *)ls->op_ctx); |
|---|
| | 533 | jlog_asynch_ctx *actx = ls->op_ctx; |
|---|
| | 534 | jlog_ctx_close(actx->log); |
|---|
| 438 | 535 | ls->op_ctx = NULL; |
|---|
| 439 | 536 | } |
|---|
| … | … | |
| 443 | 540 | jlog_logio_size(noit_log_stream_t ls) { |
|---|
| 444 | 541 | size_t size; |
|---|
| | 542 | jlog_asynch_ctx *actx; |
|---|
| 445 | 543 | if(!ls->op_ctx) return -1; |
|---|
| | 544 | actx = ls->op_ctx; |
|---|
| 446 | 545 | pthread_rwlock_rdlock(ls->lock); |
|---|
| 447 | | size = jlog_raw_size((jlog_ctx *)ls->op_ctx); |
|---|
| | 546 | size = jlog_raw_size(actx->log); |
|---|
| 448 | 547 | pthread_rwlock_unlock(ls->lock); |
|---|
| 449 | 548 | return size; |
|---|