root/src/noit_jlog_listener.c

Revision a9077178423e39a94a9b624e44cd4b37899d6fd3, 8.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

refs #122, this stuff works well enough to pull dev back into trunk.
Closing the #122 branch.

flag-day(noitd,stratcond)

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_listener.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13
14 #include <unistd.h>
15 #include <sys/ioctl.h>
16 #define MAX_ROWS_AT_ONCE 1000
17 #define DEFAULT_SECONDS_BETWEEN_BATCHES 10
18
19 static noit_atomic32_t tmpfeedcounter = 0;
20
21 void
22 noit_jlog_listener_init() {
23   eventer_name_callback("log_transit/1.0", noit_jlog_handler);
24   noit_control_dispatch_delegate(noit_control_dispatch,
25                                  NOIT_JLOG_DATA_FEED,
26                                  noit_jlog_handler);
27   noit_control_dispatch_delegate(noit_control_dispatch,
28                                  NOIT_JLOG_DATA_TEMP_FEED,
29                                  noit_jlog_handler);
30 }
31
32 typedef struct {
33   jlog_ctx *jlog;
34   char *subscriber;
35   jlog_id chkpt;
36   jlog_id start;
37   jlog_id finish;
38   int count;
39   int wants_shutdown;
40 } noit_jlog_closure_t;
41
42 noit_jlog_closure_t *
43 noit_jlog_closure_alloc(void) {
44   noit_jlog_closure_t *jcl;
45   jcl = calloc(1, sizeof(*jcl));
46   return jcl;
47 }
48
49 void
50 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
51   if(jcl->jlog) {
52     if(jcl->subscriber) {
53       if(jcl->subscriber[0] == '~')
54         jlog_ctx_remove_subscriber(jcl->jlog, jcl->subscriber);
55       free(jcl->subscriber);
56     }
57     jlog_ctx_close(jcl->jlog);
58   }
59   free(jcl);
60 }
61
62 static int
63 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
64   int w, sofar = 0;
65   while(l > sofar) {
66     w = e->opset->write(e->fd, b + sofar, l - sofar, mask, e);
67     if(w <= 0) return w;
68     sofar += w;
69   }
70   return sofar;
71 }
72 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
73
74 static int
75 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
76   jlog_message msg;
77   int mask;
78   u_int32_t n_count;
79   n_count = htonl(jcl->count);
80   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
81     return -1;
82   while(jcl->count > 0) {
83     int rv;
84     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
85     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
86       return -1;
87
88     /* Here we actually push the message */
89     payload.chkpt.log = htonl(jcl->start.log);
90     payload.chkpt.marker = htonl(jcl->start.marker);
91     payload.n_sec  = htonl(msg.header->tv_sec);
92     payload.n_usec = htonl(msg.header->tv_usec);
93     payload.n_len  = htonl(msg.mess_len);
94     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
95       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
96             rv, (int)sizeof(payload));
97       return -1;
98     }
99     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
100       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
101             rv, msg.mess_len);
102       return -1;
103     }
104     /* Note what the client must checkpoint */
105     jcl->chkpt = jcl->start;
106
107     JLOG_ID_ADVANCE(&jcl->start);
108     jcl->count--;
109   }
110   return 0;
111 }
112
113 void *
114 noit_jlog_thread_main(void *e_vptr) {
115   int mask, bytes_read;
116   eventer_t e = e_vptr;
117   acceptor_closure_t *ac = e->closure;
118   noit_jlog_closure_t *jcl = ac->service_ctx;
119   long off = 0;
120   char inbuff[sizeof(jlog_id)];
121
122   /* Go into blocking mode */
123   ioctl(e->fd, FIONBIO, &off);
124
125   while(1) {
126     jlog_id client_chkpt;
127     int sleeptime = (ac->cmd == NOIT_JLOG_DATA_TEMP_FEED) ?
128                       1 : DEFAULT_SECONDS_BETWEEN_BATCHES;
129     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
130     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
131     if(jcl->count > MAX_ROWS_AT_ONCE) {
132       /* Artificially set down the range to make the batches a bit easier
133        * to handle on the stratcond/postgres end.
134        * However, we must have more data, so drop the sleeptime to 0
135        */
136       jcl->count = MAX_ROWS_AT_ONCE;
137       jcl->finish.marker = jcl->start.marker + jcl->count;
138       sleeptime = 0;
139     }
140     if(jcl->count > 0) {
141       if(noit_jlog_push(e, jcl)) {
142         goto alldone;
143       }
144       /* Read our jlog_id accounting for possibly short reads */
145       bytes_read = 0;
146       while(bytes_read < sizeof(jlog_id)) {
147         int len;
148         if((len = e->opset->read(e->fd, inbuff + bytes_read,
149                                  sizeof(jlog_id) - bytes_read,
150                                  &mask, e)) <= 0)
151           goto alldone;
152         bytes_read += len;
153       }
154       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
155       /* Fix the endian */
156       client_chkpt.log = ntohl(client_chkpt.log);
157       client_chkpt.marker = ntohl(client_chkpt.marker);
158  
159       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
160         noitL(noit_error,
161               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
162               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
163               jcl->chkpt.log, jcl->chkpt.marker);
164         goto alldone;
165       }
166       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
167     }
168     if(sleeptime) sleep(sleeptime);
169   }
170
171  alldone:
172   e->opset->close(e->fd, &mask, e);
173   if(jcl) noit_jlog_closure_free(jcl);
174   if(ac) acceptor_closure_free(ac);
175   return NULL;
176 }
177
178 int
179 noit_jlog_handler(eventer_t e, int mask, void *closure,
180                      struct timeval *now) {
181   eventer_t newe;
182   pthread_t tid;
183   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
184   acceptor_closure_t *ac = closure;
185   noit_jlog_closure_t *jcl = ac->service_ctx;
186
187   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
188 socket_error:
189     /* Exceptions cause us to simply snip the connection */
190     eventer_remove_fd(e->fd);
191     e->opset->close(e->fd, &newmask, e);
192     if(jcl) noit_jlog_closure_free(jcl);
193     if(ac) acceptor_closure_free(ac);
194     return 0;
195   }
196
197   if(!ac->service_ctx) {
198     noit_log_stream_t ls;
199     const char *logname;
200     char path[PATH_MAX], subscriber[32], *sub;
201     jcl = ac->service_ctx = noit_jlog_closure_alloc();
202     if(!noit_hash_retr_str(ac->config,
203                            "log_transit_feed_name",
204                            strlen("log_transit_feed_name"),
205                            &logname)) {
206       noitL(noit_error, "No 'log_transit_feed_name' specified in log_transit.\n");
207       goto socket_error;
208     }
209     ls = noit_log_stream_find(logname);
210     if(!ls) {
211       noitL(noit_error, "Could not find log '%s' for log_transit.\n",
212             logname);
213       goto socket_error;
214     }
215     if(!ls->type || strcmp(ls->type, "jlog")) {
216       noitL(noit_error, "Log '%s' for log_transit is not a jlog.\n",
217             logname);
218       goto socket_error;
219     }
220     if(ac->cmd == NOIT_JLOG_DATA_FEED) {
221       if(!ac->remote_cn) {
222         noitL(noit_error, "jlog transit started to unidentified party.\n");
223         goto socket_error;
224       }
225       strlcpy(subscriber, ac->remote_cn, sizeof(subscriber));
226     }
227     else {
228       snprintf(subscriber, sizeof(subscriber),
229                "~%07d", noit_atomic_inc32(&tmpfeedcounter));
230     }
231     jcl->subscriber = strdup(subscriber);
232
233     strlcpy(path, ls->path, sizeof(path));
234     sub = strchr(path, '(');
235     if(sub) {
236       char *esub = strchr(sub, ')');
237       if(esub) {
238         *esub = '\0';
239         *sub = '\0';
240         sub += 1;
241       }
242     }
243
244     jcl->jlog = jlog_new(path);
245     if(ac->cmd == NOIT_JLOG_DATA_TEMP_FEED)
246       if(jlog_ctx_add_subscriber(jcl->jlog, jcl->subscriber, JLOG_END) == -1)
247         noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber,
248               jlog_ctx_err_string(jcl->jlog));
249     if(jlog_ctx_open_reader(jcl->jlog, jcl->subscriber) == -1) {
250       noitL(noit_error, "jlog reader[%s] error: %s\n", jcl->subscriber,
251             jlog_ctx_err_string(jcl->jlog));
252       goto socket_error;
253     }
254   }
255
256   /* The jlog stuff is disk I/O and can block us.
257    * We'll create a new thread to just handle this connection.
258    */
259   eventer_remove_fd(e->fd);
260   newe = eventer_alloc();
261   memcpy(newe, e, sizeof(*e));
262   if(pthread_create(&tid, NULL, noit_jlog_thread_main, newe) == 0) {
263     return 0;
264   }
265
266   /* Undo our dup */
267   eventer_free(newe);
268   /* Creating the thread failed, close it down and deschedule. */
269   e->opset->close(e->fd, &newmask, e);
270   return 0;
271 }
Note: See TracBrowser for help on using the browser.