root/src/noit_jlog_listener.c

Revision 1f7257a8ff592ab11f5f72082f2e144955669d8c, 5.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

make the jlog open as we do in the logger

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7 #include "eventer/eventer.h"
8 #include "noit_listener.h"
9 #include "utils/noit_hash.h"
10 #include "utils/noit_log.h"
11 #include "jlog/jlog.h"
12 #include "noit_jlog_listener.h"
13
14 #include <unistd.h>
15 #include <sys/ioctl.h>
16
17 void
18 noit_jlog_listener_init() {
19   eventer_name_callback("log_transit", noit_jlog_handler);
20 }
21
22 typedef struct {
23   jlog_ctx *jlog;
24   jlog_id chkpt;
25   jlog_id start;
26   jlog_id finish;
27   int count;
28   int wants_shutdown;
29 } noit_jlog_closure_t;
30
31 noit_jlog_closure_t *
32 noit_jlog_closure_alloc(void) {
33   noit_jlog_closure_t *jcl;
34   jcl = calloc(1, sizeof(*jcl));
35   return jcl;
36 }
37
38 void
39 noit_jlog_closure_free(noit_jlog_closure_t *jcl) {
40   if(jcl->jlog) jlog_ctx_close(jcl->jlog);
41   free(jcl);
42 }
43
44 #define Ewrite(a,b) e->opset->write(e->fd, a, b, &mask, e)
45 static int
46 noit_jlog_push(eventer_t e, noit_jlog_closure_t *jcl) {
47   jlog_message msg;
48   int mask;
49   u_int32_t n_count;
50   n_count = htonl(jcl->count);
51   if(Ewrite(&n_count, sizeof(n_count)) != sizeof(n_count))
52     return -1;
53   while(jcl->count > 0) {
54     struct { jlog_id chkpt; u_int32_t n_sec, n_usec, n_len; } payload;
55     if(jlog_ctx_read_message(jcl->jlog, &jcl->start, &msg) == -1)
56       return -1;
57
58     /* Here we actually push the message */
59     payload.chkpt.log = htonl(jcl->start.log);
60     payload.chkpt.marker = htonl(jcl->start.marker);
61     payload.n_sec  = htonl(msg.header->tv_sec);
62     payload.n_usec = htonl(msg.header->tv_usec);
63     payload.n_len  = htonl(msg.mess_len);
64     if(Ewrite(&payload, sizeof(payload)) != sizeof(payload))
65       return -1;
66     if(Ewrite(msg.mess, msg.mess_len) != msg.mess_len)
67       return -1;
68     /* Note what the client must checkpoint */
69     jcl->chkpt = jcl->start;
70
71     JLOG_ID_ADVANCE(&jcl->start);
72     jcl->count--;
73   }
74   return 0;
75 }
76
77 void *
78 noit_jlog_thread_main(void *e_vptr) {
79   int mask, bytes_read;
80   eventer_t e = e_vptr;
81   acceptor_closure_t *ac = e->closure;
82   noit_jlog_closure_t *jcl = ac->service_ctx;
83   long off = 0;
84   char inbuff[sizeof(jlog_id)];
85
86   /* Go into blocking mode */
87   ioctl(e->fd, FIONBIO, &off);
88
89   while(1) {
90     jlog_id client_chkpt;
91     jlog_get_checkpoint(jcl->jlog, ac->remote_cn, &jcl->chkpt);
92     jcl->count = jlog_ctx_read_interval(jcl->jlog, &jcl->start, &jcl->finish);
93     if(jcl->count > 0) {
94       if(noit_jlog_push(e, jcl)) {
95         goto alldone;
96       }
97       /* Read our jlog_id accounting for possibly short reads */
98       bytes_read = 0;
99       while(bytes_read < sizeof(jlog_id)) {
100         int len;
101         if((len = e->opset->read(e->fd, inbuff + bytes_read,
102                                  sizeof(jlog_id) - bytes_read,
103                                  &mask, e)) <= 0)
104           goto alldone;
105         bytes_read += len;
106       }
107       memcpy(&client_chkpt, inbuff, sizeof(jlog_id));
108       /* Fix the endian */
109       client_chkpt.log = ntohl(client_chkpt.log);
110       client_chkpt.marker = ntohl(client_chkpt.marker);
111  
112       if(memcmp(&jcl->chkpt, &client_chkpt, sizeof(jlog_id))) {
113         noitL(noit_error,
114               "client %s submitted invalid checkpoint %u:%u expected %u:%u\n",
115               ac->remote_cn, client_chkpt.log, client_chkpt.marker,
116               jcl->chkpt.log, jcl->chkpt.marker);
117         goto alldone;
118       }
119       jlog_ctx_read_checkpoint(jcl->jlog, &jcl->chkpt);
120     }
121     sleep(5);
122   }
123
124  alldone:
125   e->opset->close(e->fd, &mask, e);
126   if(jcl) noit_jlog_closure_free(jcl);
127   if(ac) acceptor_closure_free(ac);
128   return NULL;
129 }
130
131 int
132 noit_jlog_handler(eventer_t e, int mask, void *closure,
133                      struct timeval *now) {
134   eventer_t newe;
135   pthread_t tid;
136   int newmask = EVENTER_READ | EVENTER_EXCEPTION;
137   acceptor_closure_t *ac = closure;
138   noit_jlog_closure_t *jcl = ac->service_ctx;
139
140   if(mask & EVENTER_EXCEPTION || (jcl && jcl->wants_shutdown)) {
141 socket_error:
142     /* Exceptions cause us to simply snip the connection */
143     eventer_remove_fd(e->fd);
144     e->opset->close(e->fd, &newmask, e);
145     if(jcl) noit_jlog_closure_free(jcl);
146     if(ac) acceptor_closure_free(ac);
147     return 0;
148   }
149
150   if(!ac->service_ctx) {
151     noit_log_stream_t ls;
152     const char *logname;
153     char path[PATH_MAX], *sub;
154     jcl = ac->service_ctx = noit_jlog_closure_alloc();
155     if(!noit_hash_retrieve(ac->config, "log", strlen("log"),
156                            (void **)&logname)) {
157       noitL(noit_error, "No 'log' specified in log_transit.\n");
158       goto socket_error;
159     }
160     ls = noit_log_stream_find(logname);
161     if(!ls) {
162       noitL(noit_error, "Could not find log '%s' for log_transit.\n",
163             logname);
164       goto socket_error;
165     }
166     if(!ls->type || strcmp(ls->type, "jlog")) {
167       noitL(noit_error, "Log '%s' for log_transit is not a jlog.\n",
168             logname);
169       goto socket_error;
170     }
171     if(!ac->remote_cn) {
172       noitL(noit_error, "jlog transit started to unidentified party.\n");
173       goto socket_error;
174     }
175
176     strlcpy(path, ls->path, sizeof(path));
177     sub = strchr(path, '(');
178     if(sub) {
179       char *esub = strchr(sub, ')');
180       if(esub) {
181         *esub = '\0';
182         *sub = '\0';
183         sub += 1;
184       }
185     }
186
187     jcl->jlog = jlog_new(path);
188     if(jlog_ctx_open_reader(jcl->jlog, ac->remote_cn) == -1) {
189       noitL(noit_error, "jlog reader[%s] error: %s\n", ac->remote_cn,
190             jlog_ctx_err_string(jcl->jlog));
191       goto socket_error;
192     }
193   }
194
195   /* The jlog stuff is disk I/O and can block us.
196    * We'll create a new thread to just handle this connection.
197    */
198   eventer_remove_fd(e->fd);
199   newe = eventer_alloc();
200   memcpy(newe, e, sizeof(*e));
201   if(pthread_create(&tid, NULL, noit_jlog_thread_main, newe) == 0) {
202     return 0;
203   }
204
205   /* Undo our dup */
206   eventer_free(newe);
207   /* Creating the thread failed, close it down and deschedule. */
208   e->opset->close(e->fd, &newmask, e);
209   return 0;
210 }
Note: See TracBrowser for help on using the browser.