root/src/stratcon_iep.c

Revision 6453a67b4303e1da90f053764144109fef67c505, 4.4 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

first step on interface into OpenESB IEP. No serf stuff yet, I'll let Umar take a whack at that. This has a double free issue that I'll troubleshoot on valgrind when I get more time. refs #119

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "utils/noit_log.h"
9 #include "utils/noit_b64.h"
10 #include "stratcon_datastore.h"
11 #include "noit_conf.h"
12 #include "noit_check.h"
13
14 #define SWEEP_DELAY { 0L, 10000L } /* 10ms */
15 eventer_jobq_t iep_jobq;
16
17 struct noit_line_list {
18   char *line;
19   struct noit_line_list *next;
20 };
21 struct iep_batch {
22   /* This lock only needs to be used for inserting... (in the pivot)
23    * Once the batch is "done" it is submitted to a thread and has
24    * no more concurrent access.
25    */
26   pthread_mutex_t lock;
27   int batch_size;
28   struct noit_line_list *head;
29   struct noit_line_list *tail;
30 };
31 /* We safely insert into the pivot... then lock and flip the batch
32  * into a self-contained iep_batch which is the closure to the asynch
33  * function that inserts it into OpenESB.
34  */
35 static struct iep_batch pivot_batch;
36
37 static int
38 stratcon_iep_batch_add_line(const char *data) {
39   int previous_size;
40   struct noit_line_list *nnode;
41   nnode = malloc(sizeof(*nnode));
42   nnode->line = strdup(data);
43   nnode->next = NULL;
44   pthread_mutex_lock(&pivot_batch.lock);
45   if(!pivot_batch.tail) pivot_batch.head = pivot_batch.tail = nnode;
46   else {
47     pivot_batch.tail->next = nnode;
48     pivot_batch.tail = nnode;
49   }
50   previous_size = pivot_batch.batch_size;
51   pivot_batch.batch_size++;
52   pthread_mutex_unlock(&pivot_batch.lock);
53   return previous_size;
54 }
55
56 static struct iep_batch *
57 stratcon_iep_batch_copytrunc() {
58   struct iep_batch *nbatch;
59   nbatch = calloc(1, sizeof(*nbatch));
60   /* Lock */
61   pthread_mutex_lock(&pivot_batch.lock);
62   /* Copy */
63   nbatch->batch_size = pivot_batch.batch_size;
64   nbatch->head = pivot_batch.head;
65   nbatch->tail = pivot_batch.tail;
66   /* Trunc */
67   pivot_batch.batch_size = 0;
68   pivot_batch.head = pivot_batch.tail = NULL;
69   /* Lock */
70   pthread_mutex_unlock(&pivot_batch.lock);
71   return nbatch;
72 }
73
74 static int
75 stratcon_iep_submitter(eventer_t e, int mask, void *closure,
76                        struct timeval *now) {
77   struct iep_batch *batch = closure;
78   if(!(mask & EVENTER_ASYNCH_WORK)) return 0;
79
80   /* pull from batch and submit */
81   noitL(noit_error, "Firing stratcon_iep_submitter on a batch of %d events\n",
82         batch->batch_size);
83
84   /* free all the memory associated with the batch */
85   while(batch->head) {
86     struct noit_line_list *l;
87     l = batch->head;
88     batch->head = l->next;
89     free(l->line);
90     free(l);
91   }
92   free(batch);
93   return 0;
94 }
95
96 static int
97 stratcon_iep_batch_sweep(eventer_t e, int mask, void *closure,
98                          struct timeval *now) {
99   struct iep_batch *nbatch;
100   struct timeval iep_timeout = { 5L, 0L };
101   eventer_t newe;
102
103   nbatch = stratcon_iep_batch_copytrunc();
104   if(nbatch->batch_size == 0) {
105     /* misfire */
106     free(nbatch);
107     return 0;
108   }
109
110   newe = eventer_alloc();
111   newe->mask = EVENTER_ASYNCH;
112   add_timeval(*now, iep_timeout, &newe->whence);
113   newe->callback = stratcon_iep_submitter;
114   newe->closure = nbatch;
115
116   eventer_add(newe);
117   return 0;
118 }
119
120 static void
121 stratcon_iep_datastore_onlooker(stratcon_datastore_op_t op,
122                                 struct sockaddr *remote, void *operand) {
123   /* We only care about inserts */
124   if(op != DS_OP_INSERT) return;
125
126   /* process operand and push onto queue */
127   if(stratcon_iep_batch_add_line((char *)operand) == 0) {
128     /* If this is the first in the queue, then we need to schedule a
129      * sweeper to submit the queue.
130      */
131     eventer_t e;
132     struct timeval __now, sweep_delay = SWEEP_DELAY;
133
134     gettimeofday(&__now, NULL);
135
136     e = eventer_alloc();
137     e->callback = stratcon_iep_batch_sweep;
138     add_timeval(__now, sweep_delay, &e->whence);
139     e->closure = NULL; /* we can only do one thing */
140     e->mask = EVENTER_TIMER;
141     eventer_add(e);
142   }
143 }
144
145 void
146 stratcon_iep_init() {
147   eventer_name_callback("stratcon_iep_batch_sweep", stratcon_iep_batch_sweep);
148   eventer_name_callback("stratcon_iep_submitter", stratcon_iep_submitter);
149
150   /* start up a thread pool of one */
151   memset(&iep_jobq, 0, sizeof(iep_jobq));
152   eventer_jobq_init(&iep_jobq, "iep_submitter");
153   iep_jobq.backq = eventer_default_backq();
154   eventer_jobq_increase_concurrency(&iep_jobq);
155
156   /* Setup our pivot batch */
157   memset(&pivot_batch, 0, sizeof(pivot_batch));
158   pthread_mutex_init(&pivot_batch.lock, NULL);
159
160   /* Add our onlooker */
161   stratcon_datastore_register_onlooker(stratcon_iep_datastore_onlooker);
162 }
163
Note: See TracBrowser for help on using the browser.