root/src/noit_jlog_listener.c

Revision 9c7a041c2f0b4cc5e850c8cdfb2b4664cb63a2d1, 7.0 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 5 years ago)

refs #117... make sure people riding on the stream can expect data every second

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_listener.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13
14 #include <unistd.h>
15 #include <sys/ioctl.h>
16 #define MAX_ROWS_AT_ONCE 1000
17 #define DEFAULT_SECONDS_BETWEEN_BATCHES 1
18
19 void
20 noit_jlog_listener_init() {
21   eventer_name_callback("log_transit", noit_jlog_handler);
22   noit_control_dispatch_delegate(noit_control_dispatch,
23                                  NOIT_JLOG_DATA_FEED,
24                                  noit_jlog_handler);
25 }
26
27 typedef struct {
28   jlog_ctx *jlog;
29   jlog_id chkpt;
30   jlog_id start;
31   jlog_id finish;
32   int count;
33   int wants_shutdown;
34 } noit_jlog_closure_t;
35
36 noit_jlog_closure_t *
37 noit_jlog_closure_alloc(void) {
38   noit_jlog_closure_t *jcl;
39   jcl = calloc(1, sizeof(*jcl));
40   return jcl;
41 }
42
43 void
44 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
45   if(jcl->jlog) jlog_ctx_close(jcl->jlog);
46   free(jcl);
47 }
48
49 static int
50 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
51   int w, sofar = 0;
52   while(l > sofar) {
53     w = e->opset->write(e->fd, b + sofar, l - sofar, mask, e);
54     if(w <= 0) return w;
55     sofar += w;
56   }
57   return sofar;
58 }
59 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
60
61 static int
62 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
63   jlog_message msg;
64   int mask;
65   u_int32_t n_count;
66   n_count = htonl(jcl->count);
67   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
68     return -1;
69   while(jcl->count > 0) {
70     int rv;
71     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
72     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
73       return -1;
74
75     /* Here we actually push the message */
76     payload.chkpt.log = htonl(jcl->start.log);
77     payload.chkpt.marker = htonl(jcl->start.marker);
78     payload.n_sec  = htonl(msg.header->tv_sec);
79     payload.n_usec = htonl(msg.header->tv_usec);
80     payload.n_len  = htonl(msg.mess_len);
81     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
82       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
83             rv, (int)sizeof(payload));
84       return -1;
85     }
86     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
87       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
88             rv, msg.mess_len);
89       return -1;
90     }
91     /* Note what the client must checkpoint */
92     jcl->chkpt = jcl->start;
93
94     JLOG_ID_ADVANCE(&jcl->start);
95     jcl->count--;
96   }
97   return 0;
98 }
99
100 void *
101 noit_jlog_thread_main(void *e_vptr) {
102   int mask, bytes_read;
103   eventer_t e = e_vptr;
104   acceptor_closure_t *ac = e->closure;
105   noit_jlog_closure_t *jcl = ac->service_ctx;
106   long off = 0;
107   char inbuff[sizeof(jlog_id)];
108
109   /* Go into blocking mode */
110   ioctl(e->fd, FIONBIO, &off);
111
112   while(1) {
113     jlog_id client_chkpt;
114     int sleeptime = DEFAULT_SECONDS_BETWEEN_BATCHES;
115     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
116     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
117     if(jcl->count > MAX_ROWS_AT_ONCE) {
118       /* Artificially set down the range to make the batches a bit easier
119        * to handle on the stratcond/postgres end.
120        * However, we must have more data, so drop the sleeptime to 0
121        */
122       jcl->count = MAX_ROWS_AT_ONCE;
123       jcl->finish.marker = jcl->start.marker + jcl->count;
124       sleeptime = 0;
125     }
126     if(jcl->count > 0) {
127       if(noit_jlog_push(e, jcl)) {
128         goto alldone;
129       }
130       /* Read our jlog_id accounting for possibly short reads */
131       bytes_read = 0;
132       while(bytes_read < sizeof(jlog_id)) {
133         int len;
134         if((len = e->opset->read(e->fd, inbuff + bytes_read,
135                                  sizeof(jlog_id) - bytes_read,
136                                  &mask, e)) <= 0)
137           goto alldone;
138         bytes_read += len;
139       }
140       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
141       /* Fix the endian */
142       client_chkpt.log = ntohl(client_chkpt.log);
143       client_chkpt.marker = ntohl(client_chkpt.marker);
144  
145       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
146         noitL(noit_error,
147               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
148               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
149               jcl->chkpt.log, jcl->chkpt.marker);
150         goto alldone;
151       }
152       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
153     }
154     if(sleeptime) sleep(sleeptime);
155   }
156
157  alldone:
158   e->opset->close(e->fd, &mask, e);
159   if(jcl) noit_jlog_closure_free(jcl);
160   if(ac) acceptor_closure_free(ac);
161   return NULL;
162 }
163
164 int
165 noit_jlog_handler(eventer_t e, int mask, void *closure,
166                      struct timeval *now) {
167   eventer_t newe;
168   pthread_t tid;
169   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
170   acceptor_closure_t *ac = closure;
171   noit_jlog_closure_t *jcl = ac->service_ctx;
172
173   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
174 socket_error:
175     /* Exceptions cause us to simply snip the connection */
176     eventer_remove_fd(e->fd);
177     e->opset->close(e->fd, &newmask, e);
178     if(jcl) noit_jlog_closure_free(jcl);
179     if(ac) acceptor_closure_free(ac);
180     return 0;
181   }
182
183   if(!ac->service_ctx) {
184     noit_log_stream_t ls;
185     const char *logname;
186     char path[PATH_MAX], *sub;
187     jcl = ac->service_ctx = noit_jlog_closure_alloc();
188     if(!noit_hash_retr_str(ac->config,
189                            "log_transit_feed_name",
190                            strlen("log_transit_feed_name"),
191                            &logname)) {
192       noitL(noit_error, "No 'log_transit_feed_name' specified in log_transit.\n");
193       goto socket_error;
194     }
195     ls = noit_log_stream_find(logname);
196     if(!ls) {
197       noitL(noit_error, "Could not find log '%s' for log_transit.\n",
198             logname);
199       goto socket_error;
200     }
201     if(!ls->type || strcmp(ls->type, "jlog")) {
202       noitL(noit_error, "Log '%s' for log_transit is not a jlog.\n",
203             logname);
204       goto socket_error;
205     }
206     if(!ac->remote_cn) {
207       noitL(noit_error, "jlog transit started to unidentified party.\n");
208       goto socket_error;
209     }
210
211     strlcpy(path, ls->path, sizeof(path));
212     sub = strchr(path, '(');
213     if(sub) {
214       char *esub = strchr(sub, ')');
215       if(esub) {
216         *esub = '\0';
217         *sub = '\0';
218         sub += 1;
219       }
220     }
221
222     jcl->jlog = jlog_new(path);
223     if(jlog_ctx_open_reader(jcl->jlog, ac->remote_cn) == -1) {
224       noitL(noit_error, "jlog reader[%s] error: %s\n", ac->remote_cn,
225             jlog_ctx_err_string(jcl->jlog));
226       goto socket_error;
227     }
228   }
229
230   /* The jlog stuff is disk I/O and can block us.
231    * We'll create a new thread to just handle this connection.
232    */
233   eventer_remove_fd(e->fd);
234   newe = eventer_alloc();
235   memcpy(newe, e, sizeof(*e));
236   if(pthread_create(&tid, NULL, noit_jlog_thread_main, newe) == 0) {
237     return 0;
238   }
239
240   /* Undo our dup */
241   eventer_free(newe);
242   /* Creating the thread failed, close it down and deschedule. */
243   e->opset->close(e->fd, &newmask, e);
244   return 0;
245 }
Note: See TracBrowser for help on using the browser.