root/src/modules/lua_noit.c

Revision db9f2be897dce90ece1f1f845c102c811a287b74, 8.7 kB (checked in by Theo Schlossnagle <jesus@omniti.com>, 6 years ago)

read.... (with a pipelining bug), refs #28

  • Property mode set to 100644
Line 
1 /*
2  * Copyright (c) 2007, OmniTI Computer Consulting, Inc.
3  * All rights reserved.
4  */
5
6 #include "noit_defines.h"
7
8 #include "noit_conf.h"
9 #include "noit_module.h"
10 #include "noit_check.h"
11 #include "noit_check_tools.h"
12 #include "utils/noit_log.h"
13 #include "eventer/eventer.h"
14 #include "lua_noit.h"
15
16 #include <assert.h>
17 #include <math.h>
18 #include <errno.h>
19 #include <unistd.h>
20 #include <netinet/in.h>
21 #include <arpa/inet.h>
22
23 struct nl_slcl {
24   struct timeval start;
25   luaL_Buffer inbuff;
26   int read_sofar;
27   int read_goal;
28   const char *read_terminator;
29   const char *outbuff;
30   int write_sofar;
31   int write_goal;
32   eventer_t *eptr;
33   lua_State *L;
34 };
35
36 static int
37 noit_lua_socket_connect_complete(eventer_t e, int mask, void *vcl,
38                                  struct timeval *now) {
39   noit_lua_check_info_t *ci;
40   struct nl_slcl *cl = vcl;
41   int args = 0;
42
43   ci = get_ci(cl->L);
44   assert(ci);
45   noit_lua_check_deregister_event(ci, e, 0);
46
47   *(cl->eptr) = eventer_alloc();
48   memcpy(*cl->eptr, e, sizeof(*e));
49   noit_lua_check_register_event(ci, *cl->eptr);
50
51   if(!(mask & EVENTER_EXCEPTION) &&
52      mask & EVENTER_WRITE) {
53     /* Connect completed successfully */
54     lua_pushinteger(cl->L, 0);
55     args = 1;
56   }
57   else {
58     lua_pushinteger(cl->L, -1);
59     lua_pushstring(cl->L, strerror(errno));
60     args = 2;
61   }
62   noit_lua_resume(ci, args);
63   return 0;
64 }
65 static int
66 noit_lua_socket_connect(lua_State *L) {
67   noit_lua_check_info_t *ci;
68   eventer_t e, *eptr;
69   const char *target;
70   unsigned short port;
71   int8_t family;
72   int rv;
73   union {
74     struct sockaddr_in sin4;
75     struct sockaddr_in6 sin6;
76   } a;
77
78   ci = get_ci(L);
79   assert(ci);
80
81   eptr = lua_touserdata(L, lua_upvalueindex(1));
82   e = *eptr;
83   target = lua_tostring(L, 1);
84   port = lua_tointeger(L, 2);
85
86   family = AF_INET;
87   rv = inet_pton(family, target, &a.sin4.sin_addr);
88   if(rv != 1) {
89     family = AF_INET6;
90     rv = inet_pton(family, target, &a.sin6.sin6_addr);
91     if(rv != 1) {
92       noitL(noit_stderr, "Cannot translate '%s' to IP\n", target);
93       memset(&a, 0, sizeof(a));
94       lua_pushinteger(L, -1);
95       lua_pushfstring(L, "Cannot translate '%s' to IP\n", target);
96       return 2;
97     }
98     else {
99       /* We've IPv6 */
100       a.sin6.sin6_family = AF_INET6;
101       a.sin6.sin6_port = htons(port);
102     }
103   }
104   else {
105     a.sin4.sin_family = family;
106     a.sin4.sin_port = htons(port);
107   }
108
109   rv = connect(e->fd, (struct sockaddr *)&a,
110                family==AF_INET ? sizeof(a.sin4) : sizeof(a.sin6));
111   if(rv == 0) {
112     lua_pushinteger(L, 0);
113     return 1;
114   }
115   if(rv == -1 && errno == EINPROGRESS) {
116     /* Need completion */
117     e->callback = noit_lua_socket_connect_complete;
118     e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION;
119     eventer_add(e);
120     return noit_lua_yield(ci, 0);
121   }
122   lua_pushinteger(L, -1);
123   lua_pushstring(L, strerror(errno));
124   return 2;
125 }
126
127 static int
128 noit_lua_socket_read_complete(eventer_t e, int mask, void *vcl,
129                               struct timeval *now) {
130   char buff[4096];
131   noit_lua_check_info_t *ci;
132   struct nl_slcl *cl = vcl;
133   int len;
134   int args = 0;
135
136   ci = get_ci(cl->L);
137   assert(ci);
138
139   if(mask & EVENTER_EXCEPTION) {
140     lua_pushnil(cl->L);
141     args = 1;
142     goto alldone;
143   }
144
145   while((len = e->opset->read(e->fd, buff, sizeof(buff), &mask, e)) > 0) {
146     if(cl->read_goal) {
147       int remaining = cl->read_goal - cl->read_sofar;
148       /* copy up to the goal into the inbuff */
149       luaL_addlstring(&cl->inbuff, buff, MIN(len, remaining));
150       cl->read_sofar += len;
151       if(cl->read_sofar >= cl->read_goal) { /* We're done */
152         luaL_pushresult(&cl->inbuff);
153         cl->read_sofar -= cl->read_goal;
154         if(cl->read_sofar > 0) { /* We have to buffer this for next read */
155           luaL_buffinit(cl->L, &cl->inbuff);
156           luaL_addlstring(&cl->inbuff,
157                           buff + remaining,
158                           cl->read_sofar);
159         }
160         args = 1;
161         break;
162       }
163     }
164     else if(cl->read_terminator) {
165       char *cp;
166       int remaining = len;
167       cp = strnstr(buff, cl->read_terminator, len);
168       if(cp) remaining = cp - buff + strlen(cl->read_terminator);
169       luaL_addlstring(&cl->inbuff, buff, MIN(len, remaining));
170       cl->read_sofar += len;
171       if(cp) {
172         luaL_pushresult(&cl->inbuff);
173         cl->read_sofar = len - remaining;
174         if(cl->read_sofar > 0) { /* We have to buffer this for next read */
175           luaL_buffinit(cl->L, &cl->inbuff);
176           luaL_addlstring(&cl->inbuff, buff + remaining, cl->read_sofar);
177         }
178         args = 1;
179         break;
180       }
181     }
182   }
183   if(len >= 0) {
184     /* We broke out, cause we read enough... */
185   }
186   else if(len == -1 && errno == EAGAIN) {
187     return EVENTER_READ | EVENTER_EXCEPTION;
188   }
189   else {
190     lua_pushnil(cl->L);
191     args = 1;
192   }
193  alldone:
194   noit_lua_check_deregister_event(ci, e, 0);
195   *(cl->eptr) = eventer_alloc();
196   memcpy(*cl->eptr, e, sizeof(*e));
197   noit_lua_check_register_event(ci, *cl->eptr);
198   noit_lua_resume(ci, args);
199   return 0;
200 }
201
202 static int
203 noit_lua_socket_read(lua_State *L) {
204   struct nl_slcl *cl;
205   noit_lua_check_info_t *ci;
206   eventer_t e, *eptr;
207
208   ci = get_ci(L);
209   assert(ci);
210
211   eptr = lua_touserdata(L, lua_upvalueindex(1));
212   e = *eptr;
213   cl = e->closure;
214   if(cl->read_sofar == 0) luaL_buffinit(L, &cl->inbuff);
215   cl->read_goal = 0;
216   cl->read_terminator = NULL;
217
218   if(lua_isnumber(L, 1))
219     cl->read_goal = lua_tointeger(L, 1);
220   else
221     cl->read_terminator = lua_tostring(L, 1);
222
223   e->callback = noit_lua_socket_read_complete;
224   e->mask = EVENTER_READ | EVENTER_EXCEPTION;
225   eventer_add(e);
226   return noit_lua_yield(ci, 0);
227 }
228 static int
229 noit_eventer_index_func(lua_State *L) {
230   int n;
231   const char *k;
232   eventer_t *udata, e;
233   n = lua_gettop(L);    /* number of arguments */
234   assert(n == 2);
235   if(!luaL_checkudata(L, 1, "eventer_t")) {
236     luaL_error(L, "metatable error, arg1 not a eventer_t!");
237   }
238   udata = lua_touserdata(L, 1);
239   e = *udata;
240   if(!lua_isstring(L, 2)) {
241     luaL_error(L, "metatable error, arg2 not a string!");
242   }
243   k = lua_tostring(L, 2);
244   switch(*k) {
245     case 'c':
246      if(!strcmp(k, "connect")) {
247        lua_pushlightuserdata(L, udata);
248        lua_pushcclosure(L, noit_lua_socket_connect, 1);
249        return 1;
250      }
251      break;
252     case 'r':
253      if(!strcmp(k, "read")) {
254        lua_pushlightuserdata(L, udata);
255        lua_pushcclosure(L, noit_lua_socket_read, 1);
256        return 1;
257      }
258      break;
259     default:
260       break;
261   }
262   luaL_error(L, "eventer_t no such element: %s", k);
263   return 0;
264 }
265
266 static eventer_t *
267 noit_lua_event(lua_State *L, eventer_t e) {
268   eventer_t *addr;
269   addr = (eventer_t *)lua_newuserdata(L, sizeof(e));
270   *addr = e;
271   if(luaL_newmetatable(L, "eventer_t") == 1) {
272     lua_pushcclosure(L, noit_eventer_index_func, 0);
273     lua_setfield(L, -2, "__index");
274   }
275   lua_setmetatable(L, -2);
276   return addr;
277 }
278
279 static int
280 nl_sleep_complete(eventer_t e, int mask, void *vcl, struct timeval *now) {
281   noit_lua_check_info_t *ci;
282   struct nl_slcl *cl = vcl;
283   struct timeval diff;
284   double p_int;
285
286   ci = get_ci(cl->L);
287   assert(ci);
288   noit_lua_check_deregister_event(ci, e, 0);
289
290   sub_timeval(*now, cl->start, &diff);
291   p_int = diff.tv_sec + diff.tv_usec / 1000000.0;
292   lua_pushnumber(cl->L, p_int);
293   free(cl);
294   noit_lua_resume(ci, 1);
295   return 0;
296 }
297
298 static int
299 nl_sleep(lua_State *L) {
300   noit_lua_check_info_t *ci;
301   struct nl_slcl *cl;
302   struct timeval diff;
303   eventer_t e;
304   double p_int;
305
306   ci = get_ci(L);
307   assert(ci);
308
309   p_int = lua_tonumber(L, 1);
310   cl = calloc(1, sizeof(*cl));
311   cl->L = L;
312   gettimeofday(&cl->start, NULL);
313
314   e = eventer_alloc();
315   e->mask = EVENTER_TIMER;
316   e->callback = nl_sleep_complete;
317   e->closure = cl;
318   memcpy(&e->whence, &cl->start, sizeof(cl->start));
319   diff.tv_sec = floor(p_int);
320   diff.tv_usec = (p_int - floor(p_int)) * 1000000;
321   add_timeval(e->whence, diff, &e->whence);
322   noit_lua_check_register_event(ci, e);
323   eventer_add(e);
324   return noit_lua_yield(ci, 0);
325 }
326
327 static int
328 nl_socket_tcp(lua_State *L, int family) {
329   struct nl_slcl *cl;
330   noit_lua_check_info_t *ci;
331   socklen_t on;
332   int fd;
333   eventer_t e;
334
335   fd = socket(family, SOCK_STREAM, 0);
336   if(fd < 0) {
337     lua_pushnil(L);
338     return 1;
339   }
340   on = 1;
341   if(ioctl(fd, FIONBIO, &on)) {
342     close(fd);
343     lua_pushnil(L);
344     return 1;
345   }
346
347   ci = get_ci(L);
348   assert(ci);
349
350   cl = calloc(1, sizeof(*cl));
351   cl->L = L;
352
353   e = eventer_alloc();
354   e->fd = fd;
355   e->mask = 0;
356   e->callback = NULL;
357   e->closure = cl;
358   cl->eptr = noit_lua_event(L, e);
359
360   noit_lua_check_register_event(ci, e);
361   return 1;
362 }
363 static int
364 nl_socket(lua_State *L) {
365   return nl_socket_tcp(L, AF_INET);
366 }
367 static int
368 nl_socket_ipv6(lua_State *L) {
369   return nl_socket_tcp(L, AF_INET6);
370 }
371
372 static const luaL_Reg noitlib[] = {
373   { "sleep", nl_sleep },
374   { "socket", nl_socket },
375   { "socket_ipv6", nl_socket_ipv6 },
376   { NULL, NULL }
377 };
378
379 int luaopen_noit(lua_State *L) {
380   luaL_register(L, "noit", noitlib);
381   return 0;
382 }
Note: See TracBrowser for help on using the browser.