root/src/stratcon_iep.c

Revision e7ae97b8a3932bf5cb12dbfe71a21caa328d49af, 4.5 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

refs #119, scaffolding is there. I don't like how it is a slave to the postgres stuff. Perhaps separate streams would be good.

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "utils/noit_b64.h"
10 #include "stratcon_datastore.h"
11 #include "noit_conf.h"
12 #include "noit_check.h"
13
14 #define SWEEP_DELAY { 0L, 10000L } /* 10ms */
15 eventer_jobq_t iep_jobq;
16
17 struct noit_line_list {
18   char *line;
19   struct noit_line_list *next;
20 };
21 struct iep_batch {
22   /* This lock only needs to be used for inserting... (in the pivot)
23    * Once the batch is "done" it is submitted to a thread and has
24    * no more concurrent access.
25    */
26   pthread_mutex_t lock;
27   int batch_size;
28   struct noit_line_list *head;
29   struct noit_line_list *tail;
30 };
31 /* We safely insert into the pivot... then lock and flip the batch
32  * into a self-contained iep_batch which is the closure to the asynch
33  * function that inserts it into OpenESB.
34  */
35 static struct iep_batch pivot_batch;
36
37 static int
38 stratcon_iep_batch_add_line(const char *data) {
39   int previous_size;
40   struct noit_line_list *nnode;
41   nnode = malloc(sizeof(*nnode));
42   nnode->line = strdup(data);
43   nnode->next = NULL;
44   pthread_mutex_lock(&pivot_batch.lock);
45   if(!pivot_batch.tail) pivot_batch.head = pivot_batch.tail = nnode;
46   else {
47     pivot_batch.tail->next = nnode;
48     pivot_batch.tail = nnode;
49   }
50   previous_size = pivot_batch.batch_size;
51   pivot_batch.batch_size++;
52   pthread_mutex_unlock(&pivot_batch.lock);
53   return previous_size;
54 }
55
56 static struct iep_batch *
57 stratcon_iep_batch_copytrunc() {
58   struct iep_batch *nbatch;
59   nbatch = calloc(1, sizeof(*nbatch));
60   /* Lock */
61   pthread_mutex_lock(&pivot_batch.lock);
62   /* Copy */
63   nbatch->batch_size = pivot_batch.batch_size;
64   nbatch->head = pivot_batch.head;
65   nbatch->tail = pivot_batch.tail;
66   /* Trunc */
67   pivot_batch.batch_size = 0;
68   pivot_batch.head = pivot_batch.tail = NULL;
69   /* Lock */
70   pthread_mutex_unlock(&pivot_batch.lock);
71   return nbatch;
72 }
73
74 static int
75 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
76                        struct timeval *now) {
77   struct iep_batch *batch = closure;
78   /* We only play when it is an asynch event */
79   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
80
81   if(mask & EVENTER_ASYNCH_CLEANUP) {
82     /* free all the memory associated with the batch */
83     while(batch->head) {
84       struct noit_line_list *l;
85       l = batch->head;
86       batch->head = l->next;
87       free(l->line);
88       free(l);
89     }
90     free(batch);
91     return 0;
92   }
93
94   /* pull from batch and submit */
95   noitL(noit_error, "Firing stratcon_iep_submitter on a batch of %d events\n",
96         batch->batch_size);
97
98   return 0;
99 }
100
101 static int
102 stratcon_iep_batch_sweep(eventer_t e, int mask, void *closure,
103                          struct timeval *now) {
104   struct iep_batch *nbatch;
105   struct timeval iep_timeout = { 5L, 0L };
106   eventer_t newe;
107
108   nbatch = stratcon_iep_batch_copytrunc();
109   if(nbatch->batch_size == 0) {
110     /* misfire */
111     free(nbatch);
112     return 0;
113   }
114
115   newe = eventer_alloc();
116   newe->mask = EVENTER_ASYNCH;
117   add_timeval(*now, iep_timeout, &newe->whence);
118   newe->callback = stratcon_iep_submitter;
119   newe->closure = nbatch;
120
121   eventer_add(newe);
122   return 0;
123 }
124
125 static void
126 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op,
127                                 struct sockaddr *remote, void *operand) {
128   /* We only care about inserts */
129   if(op != DS_OP_INSERT) return;
130
131   /* process operand and push onto queue */
132   if(stratcon_iep_batch_add_line((char *)operand) == 0) {
133     /* If this is the first in the queue, then we need to schedule a
134      * sweeper to submit the queue.
135      */
136     eventer_t e;
137     struct timeval __now, sweep_delay = SWEEP_DELAY;
138
139     gettimeofday(&__now, NULL);
140
141     e = eventer_alloc();
142     e->callback = stratcon_iep_batch_sweep;
143     add_timeval(__now, sweep_delay, &e->whence);
144     e->closure = NULL; /* we can only do one thing */
145     e->mask = EVENTER_TIMER;
146     eventer_add(e);
147   }
148 }
149
150 void
151 stratcon_iep_init() {
152   eventer_name_callback("stratcon_iep_batch_sweep", stratcon_iep_batch_sweep);
153   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
154
155   /* start up a thread pool of one */
156   memset(&iep_jobq, 0, sizeof(iep_jobq));
157   eventer_jobq_init(&iep_jobq, "iep_submitter");
158   iep_jobq.backq = eventer_default_backq();
159   eventer_jobq_increase_concurrency(&iep_jobq);
160
161   /* Setup our pivot batch */
162   memset(&pivot_batch, 0, sizeof(pivot_batch));
163   pthread_mutex_init(&pivot_batch.lock, NULL);
164
165   /* Add our onlooker */
166   stratcon_datastore_register_onlooker(stratcon_iep_datastore_onlooker);
167 }
168
Note: See TracBrowser for help on using the browser.