root/src/modules/lua_noit.c

Revision 1431d991f15fe5e02906e85dc4721a1a99f2eeb1, 10.2 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

fix pipelining, refs #28

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7
8 #include "noit_conf.h"
9 #include "noit_module.h"
10 #include "noit_check.h"
11 #include "noit_check_tools.h"
12 #include "utils/noit_log.h"
13 #include "eventer/eventer.h"
14 #include "lua_noit.h"
15
16 #include <assert.h>
17 #include <math.h>
18 #include <errno.h>
19 #include <unistd.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22
23 struct nl_slcl {
24   struct timeval start;
25   luaL_Buffer inbuff;
26   int read_sofar;
27   int read_goal;
28   const char *read_terminator;
29   const char *outbuff;
30   int write_sofar;
31   int write_goal;
32   eventer_t *eptr;
33   lua_State *L;
34 };
35
36 static int
37 noit_lua_socket_connect_complete(eventer_t e, int mask, void *vcl,
38                                  struct timeval *now) {
39   noit_lua_check_info_t *ci;
40   struct nl_slcl *cl = vcl;
41   int args = 0;
42
43   ci = get_ci(cl->L);
44   assert(ci);
45   noit_lua_check_deregister_event(ci, e, 0);
46
47   *(cl->eptr) = eventer_alloc();
48   memcpy(*cl->eptr, e, sizeof(*e));
49   noit_lua_check_register_event(ci, *cl->eptr);
50
51   if(!(mask & EVENTER_EXCEPTION) &&
52      mask & EVENTER_WRITE) {
53     /* Connect completed successfully */
54     lua_pushinteger(cl->L, 0);
55     args = 1;
56   }
57   else {
58     lua_pushinteger(cl->L, -1);
59     lua_pushstring(cl->L, strerror(errno));
60     args = 2;
61   }
62   noit_lua_resume(ci, args);
63   return 0;
64 }
65 static int
66 noit_lua_socket_connect(lua_State *L) {
67   noit_lua_check_info_t *ci;
68   eventer_t e, *eptr;
69   const char *target;
70   unsigned short port;
71   int8_t family;
72   int rv;
73   union {
74     struct sockaddr_in sin4;
75     struct sockaddr_in6 sin6;
76   } a;
77
78   ci = get_ci(L);
79   assert(ci);
80
81   eptr = lua_touserdata(L, lua_upvalueindex(1));
82   e = *eptr;
83   target = lua_tostring(L, 1);
84   port = lua_tointeger(L, 2);
85
86   family = AF_INET;
87   rv = inet_pton(family, target, &a.sin4.sin_addr);
88   if(rv != 1) {
89     family = AF_INET6;
90     rv = inet_pton(family, target, &a.sin6.sin6_addr);
91     if(rv != 1) {
92       noitL(noit_stderr, "Cannot translate '%s' to IP\n", target);
93       memset(&a, 0, sizeof(a));
94       lua_pushinteger(L, -1);
95       lua_pushfstring(L, "Cannot translate '%s' to IP\n", target);
96       return 2;
97     }
98     else {
99       /* We've IPv6 */
100       a.sin6.sin6_family = AF_INET6;
101       a.sin6.sin6_port = htons(port);
102     }
103   }
104   else {
105     a.sin4.sin_family = family;
106     a.sin4.sin_port = htons(port);
107   }
108
109   rv = connect(e->fd, (struct sockaddr *)&a,
110                family==AF_INET ? sizeof(a.sin4) : sizeof(a.sin6));
111   if(rv == 0) {
112     lua_pushinteger(L, 0);
113     return 1;
114   }
115   if(rv == -1 && errno == EINPROGRESS) {
116     /* Need completion */
117     e->callback = noit_lua_socket_connect_complete;
118     e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
119     eventer_add(e);
120     return noit_lua_yield(ci, 0);
121   }
122   lua_pushinteger(L, -1);
123   lua_pushstring(L, strerror(errno));
124   return 2;
125 }
126
127 static int
128 noit_lua_socket_read_complete(eventer_t e, int mask, void *vcl,
129                               struct timeval *now) {
130   char buff[4096];
131   noit_lua_check_info_t *ci;
132   struct nl_slcl *cl = vcl;
133   int len;
134   int args = 0;
135
136   ci = get_ci(cl->L);
137   assert(ci);
138
139   if(mask & EVENTER_EXCEPTION) {
140     lua_pushnil(cl->L);
141     args = 1;
142     goto alldone;
143   }
144
145   while((len = e->opset->read(e->fd, buff, sizeof(buff), &mask, e)) > 0) {
146     if(cl->read_goal) {
147       int remaining = cl->read_goal - cl->read_sofar;
148       /* copy up to the goal into the inbuff */
149       luaL_addlstring(&cl->inbuff, buff, MIN(len, remaining));
150       cl->read_sofar += len;
151       if(cl->read_sofar >= cl->read_goal) { /* We're done */
152         luaL_pushresult(&cl->inbuff);
153         cl->read_sofar -= cl->read_goal;
154         if(cl->read_sofar > 0) { /* We have to buffer this for next read */
155           luaL_buffinit(cl->L, &cl->inbuff);
156           luaL_addlstring(&cl->inbuff,
157                           buff + remaining,
158                           cl->read_sofar);
159         }
160         args = 1;
161         break;
162       }
163     }
164     else if(cl->read_terminator) {
165       char *cp;
166       int remaining = len;
167       cp = strnstr(buff, cl->read_terminator, len);
168       if(cp) remaining = cp - buff + strlen(cl->read_terminator);
169       luaL_addlstring(&cl->inbuff, buff, MIN(len, remaining));
170       cl->read_sofar += len;
171       if(cp) {
172         luaL_pushresult(&cl->inbuff);
173         cl->read_sofar = len - remaining;
174         if(cl->read_sofar > 0) { /* We have to buffer this for next read */
175           luaL_buffinit(cl->L, &cl->inbuff);
176           luaL_addlstring(&cl->inbuff, buff + remaining, cl->read_sofar);
177         }
178         args = 1;
179         break;
180       }
181     }
182   }
183   if(len >= 0) {
184     /* We broke out, cause we read enough... */
185   }
186   else if(len == -1 && errno == EAGAIN) {
187     return EVENTER_READ | EVENTER_EXCEPTION;
188   }
189   else {
190     lua_pushnil(cl->L);
191     args = 1;
192   }
193  alldone:
194   noit_lua_check_deregister_event(ci, e, 0);
195   *(cl->eptr) = eventer_alloc();
196   memcpy(*cl->eptr, e, sizeof(*e));
197   noit_lua_check_register_event(ci, *cl->eptr);
198   noit_lua_resume(ci, args);
199   return 0;
200 }
201
202 static int
203 noit_lua_socket_read(lua_State *L) {
204   struct nl_slcl *cl;
205   noit_lua_check_info_t *ci;
206   eventer_t e, *eptr;
207
208   ci = get_ci(L);
209   assert(ci);
210
211   eptr = lua_touserdata(L, lua_upvalueindex(1));
212   e = *eptr;
213   cl = e->closure;
214   if(cl->read_sofar == 0) luaL_buffinit(L, &cl->inbuff);
215   cl->read_goal = 0;
216   cl->read_terminator = NULL;
217   fprintf(stderr, "initiating read... (%d bytes buffered)\n", cl->read_sofar);
218
219   if(lua_isnumber(L, 1)) {
220     cl->read_goal = lua_tointeger(L, 1);
221     fprintf(stderr, " read wants %d bytes\n", cl->read_goal);
222     if(cl->read_goal <= cl->read_sofar) {
223       const char *current_buff;
224       int base;
225       size_t len;
226      i_know_better:
227       base = lua_gettop(L);
228       /* We have enough, we can service this right here */
229       luaL_pushresult(&cl->inbuff);
230       current_buff = lua_tolstring(L, base + 1, &len);
231       assert(len == cl->read_sofar);
232       lua_pop(L, 1);
233       lua_pushlstring(L, current_buff, cl->read_goal);
234       cl->read_sofar -= cl->read_goal;
235       if(cl->read_sofar) {
236         luaL_buffinit(L, &cl->inbuff);
237         luaL_addlstring(&cl->inbuff, current_buff + cl->read_goal,
238                         cl->read_sofar);
239       }
240       return 1;
241     }
242   }
243   else {
244     cl->read_terminator = lua_tostring(L, 1);
245     fprintf(stderr, " read wants up to [%s]\n", cl->read_terminator);
246     if(cl->read_sofar) {
247       const char *cp;
248       /* Ugh... inernalism */
249       cp = strnstr(cl->inbuff.buffer, cl->read_terminator, cl->read_sofar);
250       if(cp) {
251         /* Here we matched... and we _know_ that someone actually wants:
252          * strlen(cl->read_terminator) + cp - cl->inbuff.buffer bytes...
253          * give it to them.
254          */
255         cl->read_goal = strlen(cl->read_terminator) + cp - cl->inbuff.buffer;
256         cl->read_terminator = NULL;
257         assert(cl->read_goal <= cl->read_sofar);
258         goto i_know_better;
259       }
260     }
261   }
262
263   e->callback = noit_lua_socket_read_complete;
264   e->mask = EVENTER_READ | EVENTER_EXCEPTION;
265   eventer_add(e);
266   return noit_lua_yield(ci, 0);
267 }
268 static int
269 noit_eventer_index_func(lua_State *L) {
270   int n;
271   const char *k;
272   eventer_t *udata, e;
273   n = lua_gettop(L);    /* number of arguments */
274   assert(n == 2);
275   if(!luaL_checkudata(L, 1, "eventer_t")) {
276     luaL_error(L, "metatable error, arg1 not a eventer_t!");
277   }
278   udata = lua_touserdata(L, 1);
279   e = *udata;
280   if(!lua_isstring(L, 2)) {
281     luaL_error(L, "metatable error, arg2 not a string!");
282   }
283   k = lua_tostring(L, 2);
284   switch(*k) {
285     case 'c':
286      if(!strcmp(k, "connect")) {
287        lua_pushlightuserdata(L, udata);
288        lua_pushcclosure(L, noit_lua_socket_connect, 1);
289        return 1;
290      }
291      break;
292     case 'r':
293      if(!strcmp(k, "read")) {
294        lua_pushlightuserdata(L, udata);
295        lua_pushcclosure(L, noit_lua_socket_read, 1);
296        return 1;
297      }
298      break;
299     default:
300       break;
301   }
302   luaL_error(L, "eventer_t no such element: %s", k);
303   return 0;
304 }
305
306 static eventer_t *
307 noit_lua_event(lua_State *L, eventer_t e) {
308   eventer_t *addr;
309   addr = (eventer_t *)lua_newuserdata(L, sizeof(e));
310   *addr = e;
311   if(luaL_newmetatable(L, "eventer_t") == 1) {
312     lua_pushcclosure(L, noit_eventer_index_func, 0);
313     lua_setfield(L, -2, "__index");
314   }
315   lua_setmetatable(L, -2);
316   return addr;
317 }
318
319 static int
320 nl_sleep_complete(eventer_t e, int mask, void *vcl, struct timeval *now) {
321   noit_lua_check_info_t *ci;
322   struct nl_slcl *cl = vcl;
323   struct timeval diff;
324   double p_int;
325
326   ci = get_ci(cl->L);
327   assert(ci);
328   noit_lua_check_deregister_event(ci, e, 0);
329
330   sub_timeval(*now, cl->start, &diff);
331   p_int = diff.tv_sec + diff.tv_usec / 1000000.0;
332   lua_pushnumber(cl->L, p_int);
333   free(cl);
334   noit_lua_resume(ci, 1);
335   return 0;
336 }
337
338 static int
339 nl_sleep(lua_State *L) {
340   noit_lua_check_info_t *ci;
341   struct nl_slcl *cl;
342   struct timeval diff;
343   eventer_t e;
344   double p_int;
345
346   ci = get_ci(L);
347   assert(ci);
348
349   p_int = lua_tonumber(L, 1);
350   cl = calloc(1, sizeof(*cl));
351   cl->L = L;
352   gettimeofday(&cl->start, NULL);
353
354   e = eventer_alloc();
355   e->mask = EVENTER_TIMER;
356   e->callback = nl_sleep_complete;
357   e->closure = cl;
358   memcpy(&e->whence, &cl->start, sizeof(cl->start));
359   diff.tv_sec = floor(p_int);
360   diff.tv_usec = (p_int - floor(p_int)) * 1000000;
361   add_timeval(e->whence, diff, &e->whence);
362   noit_lua_check_register_event(ci, e);
363   eventer_add(e);
364   return noit_lua_yield(ci, 0);
365 }
366
367 static int
368 nl_socket_tcp(lua_State *L, int family) {
369   struct nl_slcl *cl;
370   noit_lua_check_info_t *ci;
371   socklen_t on;
372   int fd;
373   eventer_t e;
374
375   fd = socket(family, SOCK_STREAM, 0);
376   if(fd < 0) {
377     lua_pushnil(L);
378     return 1;
379   }
380   on = 1;
381   if(ioctl(fd, FIONBIO, &on)) {
382     close(fd);
383     lua_pushnil(L);
384     return 1;
385   }
386
387   ci = get_ci(L);
388   assert(ci);
389
390   cl = calloc(1, sizeof(*cl));
391   cl->L = L;
392
393   e = eventer_alloc();
394   e->fd = fd;
395   e->mask = 0;
396   e->callback = NULL;
397   e->closure = cl;
398   cl->eptr = noit_lua_event(L, e);
399
400   noit_lua_check_register_event(ci, e);
401   return 1;
402 }
403 static int
404 nl_socket(lua_State *L) {
405   return nl_socket_tcp(L, AF_INET);
406 }
407 static int
408 nl_socket_ipv6(lua_State *L) {
409   return nl_socket_tcp(L, AF_INET6);
410 }
411
412 static const luaL_Reg noitlib[] = {
413   { "sleep", nl_sleep },
414   { "socket", nl_socket },
415   { "socket_ipv6", nl_socket_ipv6 },
416   { NULL, NULL }
417 };
418
419 int luaopen_noit(lua_State *L) {
420   luaL_register(L, "noit", noitlib);
421   return 0;
422 }
Note: See TracBrowser for help on using the browser.