| 1 |
/* |
|---|
| 2 |
* Copyright (c) 2007, OmniTI Computer Consulting, Inc. |
|---|
| 3 |
* All rights reserved. |
|---|
| 4 |
*/ |
|---|
| 5 |
|
|---|
| 6 |
#include "noit_defines.h" |
|---|
| 7 |
#include "eventer/eventer.h" |
|---|
| 8 |
#include "utils/noit_log.h" |
|---|
| 9 |
#include "utils/noit_b64.h" |
|---|
| 10 |
#include "stratcon_datastore.h" |
|---|
| 11 |
#include "noit_conf.h" |
|---|
| 12 |
#include "noit_check.h" |
|---|
| 13 |
|
|---|
| 14 |
#define SWEEP_DELAY { 0L, 10000L } /* 10ms */ |
|---|
| 15 |
eventer_jobq_t iep_jobq; |
|---|
| 16 |
|
|---|
| 17 |
struct noit_line_list { |
|---|
| 18 |
char *line; |
|---|
| 19 |
struct noit_line_list *next; |
|---|
| 20 |
}; |
|---|
| 21 |
struct iep_batch { |
|---|
| 22 |
/* This lock only needs to be used for inserting... (in the pivot) |
|---|
| 23 |
* Once the batch is "done" it is submitted to a thread and has |
|---|
| 24 |
* no more concurrent access. |
|---|
| 25 |
*/ |
|---|
| 26 |
pthread_mutex_t lock; |
|---|
| 27 |
int batch_size; |
|---|
| 28 |
struct noit_line_list *head; |
|---|
| 29 |
struct noit_line_list *tail; |
|---|
| 30 |
}; |
|---|
| 31 |
/* We safely insert into the pivot... then lock and flip the batch |
|---|
| 32 |
* into a self-contained iep_batch which is the closure to the asynch |
|---|
| 33 |
* function that inserts it into OpenESB. |
|---|
| 34 |
*/ |
|---|
| 35 |
static struct iep_batch pivot_batch; |
|---|
| 36 |
|
|---|
| 37 |
static int |
|---|
| 38 |
stratcon_iep_batch_add_line(const char *data) { |
|---|
| 39 |
int previous_size; |
|---|
| 40 |
struct noit_line_list *nnode; |
|---|
| 41 |
nnode = malloc(sizeof(*nnode)); |
|---|
| 42 |
nnode->line = strdup(data); |
|---|
| 43 |
nnode->next = NULL; |
|---|
| 44 |
pthread_mutex_lock(&pivot_batch.lock); |
|---|
| 45 |
if(!pivot_batch.tail) pivot_batch.head = pivot_batch.tail = nnode; |
|---|
| 46 |
else { |
|---|
| 47 |
pivot_batch.tail->next = nnode; |
|---|
| 48 |
pivot_batch.tail = nnode; |
|---|
| 49 |
} |
|---|
| 50 |
previous_size = pivot_batch.batch_size; |
|---|
| 51 |
pivot_batch.batch_size++; |
|---|
| 52 |
pthread_mutex_unlock(&pivot_batch.lock); |
|---|
| 53 |
return previous_size; |
|---|
| 54 |
} |
|---|
| 55 |
|
|---|
| 56 |
static struct iep_batch * |
|---|
| 57 |
stratcon_iep_batch_copytrunc() { |
|---|
| 58 |
struct iep_batch *nbatch; |
|---|
| 59 |
nbatch = calloc(1, sizeof(*nbatch)); |
|---|
| 60 |
/* Lock */ |
|---|
| 61 |
pthread_mutex_lock(&pivot_batch.lock); |
|---|
| 62 |
/* Copy */ |
|---|
| 63 |
nbatch->batch_size = pivot_batch.batch_size; |
|---|
| 64 |
nbatch->head = pivot_batch.head; |
|---|
| 65 |
nbatch->tail = pivot_batch.tail; |
|---|
| 66 |
/* Trunc */ |
|---|
| 67 |
pivot_batch.batch_size = 0; |
|---|
| 68 |
pivot_batch.head = pivot_batch.tail = NULL; |
|---|
| 69 |
/* Lock */ |
|---|
| 70 |
pthread_mutex_unlock(&pivot_batch.lock); |
|---|
| 71 |
return nbatch; |
|---|
| 72 |
} |
|---|
| 73 |
|
|---|
| 74 |
static int |
|---|
| 75 |
stratcon_iep_submitter(eventer_t e, int mask, void *closure, |
|---|
| 76 |
struct timeval *now) { |
|---|
| 77 |
struct iep_batch *batch = closure; |
|---|
| 78 |
if(!(mask & EVENTER_ASYNCH_WORK)) return 0; |
|---|
| 79 |
|
|---|
| 80 |
/* pull from batch and submit */ |
|---|
| 81 |
noitL(noit_error, "Firing stratcon_iep_submitter on a batch of %d events\n", |
|---|
| 82 |
batch->batch_size); |
|---|
| 83 |
|
|---|
| 84 |
/* free all the memory associated with the batch */ |
|---|
| 85 |
while(batch->head) { |
|---|
| 86 |
struct noit_line_list *l; |
|---|
| 87 |
l = batch->head; |
|---|
| 88 |
batch->head = l->next; |
|---|
| 89 |
free(l->line); |
|---|
| 90 |
free(l); |
|---|
| 91 |
} |
|---|
| 92 |
free(batch); |
|---|
| 93 |
return 0; |
|---|
| 94 |
} |
|---|
| 95 |
|
|---|
| 96 |
static int |
|---|
| 97 |
stratcon_iep_batch_sweep(eventer_t e, int mask, void *closure, |
|---|
| 98 |
struct timeval *now) { |
|---|
| 99 |
struct iep_batch *nbatch; |
|---|
| 100 |
struct timeval iep_timeout = { 5L, 0L }; |
|---|
| 101 |
eventer_t newe; |
|---|
| 102 |
|
|---|
| 103 |
nbatch = stratcon_iep_batch_copytrunc(); |
|---|
| 104 |
if(nbatch->batch_size == 0) { |
|---|
| 105 |
/* misfire */ |
|---|
| 106 |
free(nbatch); |
|---|
| 107 |
return 0; |
|---|
| 108 |
} |
|---|
| 109 |
|
|---|
| 110 |
newe = eventer_alloc(); |
|---|
| 111 |
newe->mask = EVENTER_ASYNCH; |
|---|
| 112 |
add_timeval(*now, iep_timeout, &newe->whence); |
|---|
| 113 |
newe->callback = stratcon_iep_submitter; |
|---|
| 114 |
newe->closure = nbatch; |
|---|
| 115 |
|
|---|
| 116 |
eventer_add(newe); |
|---|
| 117 |
return 0; |
|---|
| 118 |
} |
|---|
| 119 |
|
|---|
| 120 |
static void |
|---|
| 121 |
stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op, |
|---|
| 122 |
struct sockaddr *remote, void *operand) { |
|---|
| 123 |
/* We only care about inserts */ |
|---|
| 124 |
if(op != DS_OP_INSERT) return; |
|---|
| 125 |
|
|---|
| 126 |
/* process operand and push onto queue */ |
|---|
| 127 |
if(stratcon_iep_batch_add_line((char *)operand) == 0) { |
|---|
| 128 |
/* If this is the first in the queue, then we need to schedule a |
|---|
| 129 |
* sweeper to submit the queue. |
|---|
| 130 |
*/ |
|---|
| 131 |
eventer_t e; |
|---|
| 132 |
struct timeval __now, sweep_delay = SWEEP_DELAY; |
|---|
| 133 |
|
|---|
| 134 |
gettimeofday(&__now, NULL); |
|---|
| 135 |
|
|---|
| 136 |
e = eventer_alloc(); |
|---|
| 137 |
e->callback = stratcon_iep_batch_sweep; |
|---|
| 138 |
add_timeval(__now, sweep_delay, &e->whence); |
|---|
| 139 |
e->closure = NULL; /* we can only do one thing */ |
|---|
| 140 |
e->mask = EVENTER_TIMER; |
|---|
| 141 |
eventer_add(e); |
|---|
| 142 |
} |
|---|
| 143 |
} |
|---|
| 144 |
|
|---|
| 145 |
void |
|---|
| 146 |
stratcon_iep_init() { |
|---|
| 147 |
eventer_name_callback("stratcon_iep_batch_sweep", stratcon_iep_batch_sweep); |
|---|
| 148 |
eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter); |
|---|
| 149 |
|
|---|
| 150 |
/* start up a thread pool of one */ |
|---|
| 151 |
memset(&iep_jobq, 0, sizeof(iep_jobq)); |
|---|
| 152 |
eventer_jobq_init(&iep_jobq, "iep_submitter"); |
|---|
| 153 |
iep_jobq.backq = eventer_default_backq(); |
|---|
| 154 |
eventer_jobq_increase_concurrency(&iep_jobq); |
|---|
| 155 |
|
|---|
| 156 |
/* Setup our pivot batch */ |
|---|
| 157 |
memset(&pivot_batch, 0, sizeof(pivot_batch)); |
|---|
| 158 |
pthread_mutex_init(&pivot_batch.lock, NULL); |
|---|
| 159 |
|
|---|
| 160 |
/* Add our onlooker */ |
|---|
| 161 |
stratcon_datastore_register_onlooker(stratcon_iep_datastore_onlooker); |
|---|
| 162 |
} |
|---|
| 163 |
|
|---|