root/src/noit_jlog_listener.c

Revision 885a9d3fca7292fe71341749d026abf2938749c4, 6.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

make the jlog sender try harder when sending messages. The SSL write layer can perform partial writes on blocking sockets. closes #41

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_listener.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13
14 #include <unistd.h>
15 #include <sys/ioctl.h>
16 #define MAX_ROWS_AT_ONCE 1000
17 #define DEFAULT_SECONDS_BETWEEN_BATCHES 5
18
19 void
20 noit_jlog_listener_init() {
21   eventer_name_callback("log_transit", noit_jlog_handler);
22 }
23
24 typedef struct {
25   jlog_ctx *jlog;
26   jlog_id chkpt;
27   jlog_id start;
28   jlog_id finish;
29   int count;
30   int wants_shutdown;
31 } noit_jlog_closure_t;
32
33 noit_jlog_closure_t *
34 noit_jlog_closure_alloc(void) {
35   noit_jlog_closure_t *jcl;
36   jcl = calloc(1, sizeof(*jcl));
37   return jcl;
38 }
39
40 void
41 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
42   if(jcl->jlog) jlog_ctx_close(jcl->jlog);
43   free(jcl);
44 }
45
46 static int
47 __safe_Ewrite(eventer_t e, void *b, int l, int *mask) {
48   int w, sofar = 0;
49   while(l > sofar) {
50     w = e->opset->write(e->fd, b + sofar, l - sofar, mask, e);
51     if(w <= 0) return w;
52     sofar += w;
53   }
54   return sofar;
55 }
56 #define Ewrite(a,b) __safe_Ewrite(e,a,b,&mask)
57
58 static int
59 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
60   jlog_message msg;
61   int mask;
62   u_int32_t n_count;
63   n_count = htonl(jcl->count);
64   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
65     return -1;
66   while(jcl->count > 0) {
67     int rv;
68     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
69     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
70       return -1;
71
72     /* Here we actually push the message */
73     payload.chkpt.log = htonl(jcl->start.log);
74     payload.chkpt.marker = htonl(jcl->start.marker);
75     payload.n_sec  = htonl(msg.header->tv_sec);
76     payload.n_usec = htonl(msg.header->tv_usec);
77     payload.n_len  = htonl(msg.mess_len);
78     if((rv = Ewrite(&payload, sizeof(payload))) != sizeof(payload)) {
79       noitL(noit_error, "Error writing jlog header over SSL %d != %d\n",
80             rv, sizeof(payload));
81       return -1;
82     }
83     if((rv = Ewrite(msg.mess, msg.mess_len)) != msg.mess_len) {
84       noitL(noit_error, "Error writing jlog message over SSL %d != %d\n",
85             rv, msg.mess_len);
86       return -1;
87     }
88     /* Note what the client must checkpoint */
89     jcl->chkpt = jcl->start;
90
91     JLOG_ID_ADVANCE(&jcl->start);
92     jcl->count--;
93   }
94   return 0;
95 }
96
97 void *
98 noit_jlog_thread_main(void *e_vptr) {
99   int mask, bytes_read;
100   eventer_t e = e_vptr;
101   acceptor_closure_t *ac = e->closure;
102   noit_jlog_closure_t *jcl = ac->service_ctx;
103   long off = 0;
104   char inbuff[sizeof(jlog_id)];
105
106   /* Go into blocking mode */
107   ioctl(e->fd, FIONBIO, &off);
108
109   while(1) {
110     jlog_id client_chkpt;
111     int sleeptime = DEFAULT_SECONDS_BETWEEN_BATCHES;
112     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
113     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
114     if(jcl->count > MAX_ROWS_AT_ONCE) {
115       /* Artificially set down the range to make the batches a bit easier
116        * to handle on the stratcond/postgres end.
117        * However, we must have more data, so drop the sleeptime to 0
118        */
119       jcl->count = MAX_ROWS_AT_ONCE;
120       jcl->finish.marker = jcl->start.marker + jcl->count;
121       sleeptime = 0;
122     }
123     if(jcl->count > 0) {
124       if(noit_jlog_push(e, jcl)) {
125         goto alldone;
126       }
127       /* Read our jlog_id accounting for possibly short reads */
128       bytes_read = 0;
129       while(bytes_read < sizeof(jlog_id)) {
130         int len;
131         if((len = e->opset->read(e->fd, inbuff + bytes_read,
132                                  sizeof(jlog_id) - bytes_read,
133                                  &mask, e)) <= 0)
134           goto alldone;
135         bytes_read += len;
136       }
137       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
138       /* Fix the endian */
139       client_chkpt.log = ntohl(client_chkpt.log);
140       client_chkpt.marker = ntohl(client_chkpt.marker);
141  
142       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
143         noitL(noit_error,
144               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
145               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
146               jcl->chkpt.log, jcl->chkpt.marker);
147         goto alldone;
148       }
149       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
150     }
151     if(sleeptime) sleep(sleeptime);
152   }
153
154  alldone:
155   e->opset->close(e->fd, &mask, e);
156   if(jcl) noit_jlog_closure_free(jcl);
157   if(ac) acceptor_closure_free(ac);
158   return NULL;
159 }
160
161 int
162 noit_jlog_handler(eventer_t e, int mask, void *closure,
163                      struct timeval *now) {
164   eventer_t newe;
165   pthread_t tid;
166   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
167   acceptor_closure_t *ac = closure;
168   noit_jlog_closure_t *jcl = ac->service_ctx;
169
170   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
171 socket_error:
172     /* Exceptions cause us to simply snip the connection */
173     eventer_remove_fd(e->fd);
174     e->opset->close(e->fd, &newmask, e);
175     if(jcl) noit_jlog_closure_free(jcl);
176     if(ac) acceptor_closure_free(ac);
177     return 0;
178   }
179
180   if(!ac->service_ctx) {
181     noit_log_stream_t ls;
182     const char *logname;
183     char path[PATH_MAX], *sub;
184     jcl = ac->service_ctx = noit_jlog_closure_alloc();
185     if(!noit_hash_retrieve(ac->config, "log", strlen("log"),
186                            (void **)&logname)) {
187       noitL(noit_error, "No 'log' specified in log_transit.\n");
188       goto socket_error;
189     }
190     ls = noit_log_stream_find(logname);
191     if(!ls) {
192       noitL(noit_error, "Could not find log '%s' for log_transit.\n",
193             logname);
194       goto socket_error;
195     }
196     if(!ls->type || strcmp(ls->type, "jlog")) {
197       noitL(noit_error, "Log '%s' for log_transit is not a jlog.\n",
198             logname);
199       goto socket_error;
200     }
201     if(!ac->remote_cn) {
202       noitL(noit_error, "jlog transit started to unidentified party.\n");
203       goto socket_error;
204     }
205
206     strlcpy(path, ls->path, sizeof(path));
207     sub = strchr(path, '(');
208     if(sub) {
209       char *esub = strchr(sub, ')');
210       if(esub) {
211         *esub = '\0';
212         *sub = '\0';
213         sub += 1;
214       }
215     }
216
217     jcl->jlog = jlog_new(path);
218     if(jlog_ctx_open_reader(jcl->jlog, ac->remote_cn) == -1) {
219       noitL(noit_error, "jlog reader[%s] error: %s\n", ac->remote_cn,
220             jlog_ctx_err_string(jcl->jlog));
221       goto socket_error;
222     }
223   }
224
225   /* The jlog stuff is disk I/O and can block us.
226    * We'll create a new thread to just handle this connection.
227    */
228   eventer_remove_fd(e->fd);
229   newe = eventer_alloc();
230   memcpy(newe, e, sizeof(*e));
231   if(pthread_create(&tid, NULL, noit_jlog_thread_main, newe) == 0) {
232     return 0;
233   }
234
235   /* Undo our dup */
236   eventer_free(newe);
237   /* Creating the thread failed, close it down and deschedule. */
238   e->opset->close(e->fd, &newmask, e);
239   return 0;
240 }
Note: See TracBrowser for help on using the browser.