Changeset db9f2be897dce90ece1f1f845c102c811a287b74

Show
Ignore:
Timestamp:
06/04/08 16:23:44 (6 years ago)
Author:
Theo Schlossnagle <jesus@omniti.com>
git-committer:
Theo Schlossnagle <jesus@omniti.com> 1212596624 +0000
git-parent:

[31453629167bfcf01c384803ac89a93e75e7391c]

git-author:
Theo Schlossnagle <jesus@omniti.com> 1212596624 +0000
Message:

read.... (with a pipelining bug), refs #28

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • src/modules/lua.c

    r9974078 rdb9f2be  
    3333  eventer_t e = *value; 
    3434  if(e->fd >= 0) e->opset->close(e->fd, &mask, e); 
    35   if(eventer_remove(e)) { 
    36     eventer_free(e); 
    37   } 
     35  if(e->mask && eventer_remove(e)) eventer_free(e); 
     36  if(!e->mask) eventer_free(e); 
    3837  free(ev); 
    3938} 
     
    4443  memcpy(eptr, &e, sizeof(*eptr)); 
    4544  if(!ci->events) ci->events = calloc(1, sizeof(*ci->events)); 
    46   noit_hash_store(ci->events, (const char *)eptr, sizeof(*eptr), eptr); 
     45  assert(noit_hash_store(ci->events, (const char *)eptr, sizeof(*eptr), eptr)); 
    4746} 
    4847void 
    49 noit_lua_check_deregister_event(noit_lua_check_info_t *ci, eventer_t e) { 
    50   if(ci->events) 
    51     noit_hash_delete(ci->events, (const char *)&e, sizeof(*e), 
    52                      NULL, noit_event_dispose); 
     48noit_lua_check_deregister_event(noit_lua_check_info_t *ci, eventer_t e, 
     49                                int tofree) { 
     50  assert(ci->events); 
     51  assert(noit_hash_delete(ci->events, (const char *)&e, sizeof(e), 
     52                          NULL, tofree ? noit_event_dispose : free)); 
    5353} 
    5454void 
     
    420420} 
    421421int 
     422noit_lua_yield(noit_lua_check_info_t *ci, int nargs) { 
     423  noitL(nldeb, "lua: %p yielding\n", ci->coro_state); 
     424  return lua_yield(ci->coro_state, nargs); 
     425} 
     426int 
    422427noit_lua_resume(noit_lua_check_info_t *ci, int nargs) { 
    423428  int result, base; 
     
    425430  check = ci->check; 
    426431 
     432  noitL(nldeb, "lua: %p resuming\n", ci->coro_state); 
    427433  result = lua_resume(ci->coro_state, nargs); 
    428434  switch(result) { 
     
    466472  noitL(nldeb, "lua: %p ->check_timeout\n", ci->coro_state); 
    467473  ci->timed_out = 1; 
    468   noit_lua_check_deregister_event(ci, e); 
     474  noit_lua_check_deregister_event(ci, e, 0); 
    469475  if(ci->coro_state) { 
    470476    /* Our coro is still "in-flight". To fix this we will unreference 
  • src/modules/lua_noit.c

    r5bf243f rdb9f2be  
    1616#include <assert.h> 
    1717#include <math.h> 
     18#include <errno.h> 
     19#include <unistd.h> 
     20#include <netinet/in.h> 
     21#include <arpa/inet.h> 
    1822 
    1923struct nl_slcl { 
    2024  struct timeval start; 
     25  luaL_Buffer inbuff; 
     26  int read_sofar; 
     27  int read_goal; 
     28  const char *read_terminator; 
     29  const char *outbuff; 
     30  int write_sofar; 
     31  int write_goal; 
     32  eventer_t *eptr; 
    2133  lua_State *L; 
    2234}; 
    2335 
    2436static int 
    25 nl_sleep_continue(eventer_t e, int mask, void *vcl, struct timeval *now) { 
     37noit_lua_socket_connect_complete(eventer_t e, int mask, void *vcl, 
     38                                 struct timeval *now) { 
     39  noit_lua_check_info_t *ci; 
     40  struct nl_slcl *cl = vcl; 
     41  int args = 0; 
     42 
     43  ci = get_ci(cl->L); 
     44  assert(ci); 
     45  noit_lua_check_deregister_event(ci, e, 0); 
     46 
     47  *(cl->eptr) = eventer_alloc(); 
     48  memcpy(*cl->eptr, e, sizeof(*e)); 
     49  noit_lua_check_register_event(ci, *cl->eptr); 
     50 
     51  if(!(mask & EVENTER_EXCEPTION) && 
     52     mask & EVENTER_WRITE) { 
     53    /* Connect completed successfully */ 
     54    lua_pushinteger(cl->L, 0); 
     55    args = 1; 
     56  } 
     57  else { 
     58    lua_pushinteger(cl->L, -1); 
     59    lua_pushstring(cl->L, strerror(errno)); 
     60    args = 2; 
     61  } 
     62  noit_lua_resume(ci, args); 
     63  return 0; 
     64
     65static int 
     66noit_lua_socket_connect(lua_State *L) { 
     67  noit_lua_check_info_t *ci; 
     68  eventer_t e, *eptr; 
     69  const char *target; 
     70  unsigned short port; 
     71  int8_t family; 
     72  int rv; 
     73  union { 
     74    struct sockaddr_in sin4; 
     75    struct sockaddr_in6 sin6; 
     76  } a; 
     77 
     78  ci = get_ci(L); 
     79  assert(ci); 
     80 
     81  eptr = lua_touserdata(L, lua_upvalueindex(1)); 
     82  e = *eptr; 
     83  target = lua_tostring(L, 1); 
     84  port = lua_tointeger(L, 2); 
     85 
     86  family = AF_INET; 
     87  rv = inet_pton(family, target, &a.sin4.sin_addr); 
     88  if(rv != 1) { 
     89    family = AF_INET6; 
     90    rv = inet_pton(family, target, &a.sin6.sin6_addr); 
     91    if(rv != 1) { 
     92      noitL(noit_stderr, "Cannot translate '%s' to IP\n", target); 
     93      memset(&a, 0, sizeof(a)); 
     94      lua_pushinteger(L, -1); 
     95      lua_pushfstring(L, "Cannot translate '%s' to IP\n", target); 
     96      return 2; 
     97    } 
     98    else { 
     99      /* We've IPv6 */ 
     100      a.sin6.sin6_family = AF_INET6; 
     101      a.sin6.sin6_port = htons(port); 
     102    } 
     103  } 
     104  else { 
     105    a.sin4.sin_family = family; 
     106    a.sin4.sin_port = htons(port); 
     107  } 
     108 
     109  rv = connect(e->fd, (struct sockaddr *)&a, 
     110               family==AF_INET ? sizeof(a.sin4) : sizeof(a.sin6)); 
     111  if(rv == 0) { 
     112    lua_pushinteger(L, 0); 
     113    return 1; 
     114  } 
     115  if(rv == -1 && errno == EINPROGRESS) { 
     116    /* Need completion */ 
     117    e->callback = noit_lua_socket_connect_complete; 
     118    e->mask = EVENTER_READ | EVENTER_WRITE | EVENTER_EXCEPTION; 
     119    eventer_add(e); 
     120    return noit_lua_yield(ci, 0); 
     121  } 
     122  lua_pushinteger(L, -1); 
     123  lua_pushstring(L, strerror(errno)); 
     124  return 2; 
     125
     126 
     127static int 
     128noit_lua_socket_read_complete(eventer_t e, int mask, void *vcl, 
     129                              struct timeval *now) { 
     130  char buff[4096]; 
     131  noit_lua_check_info_t *ci; 
     132  struct nl_slcl *cl = vcl; 
     133  int len; 
     134  int args = 0; 
     135 
     136  ci = get_ci(cl->L); 
     137  assert(ci); 
     138 
     139  if(mask & EVENTER_EXCEPTION) { 
     140    lua_pushnil(cl->L); 
     141    args = 1; 
     142    goto alldone; 
     143  } 
     144 
     145  while((len = e->opset->read(e->fd, buff, sizeof(buff), &mask, e)) > 0) { 
     146    if(cl->read_goal) { 
     147      int remaining = cl->read_goal - cl->read_sofar; 
     148      /* copy up to the goal into the inbuff */ 
     149      luaL_addlstring(&cl->inbuff, buff, MIN(len, remaining)); 
     150      cl->read_sofar += len; 
     151      if(cl->read_sofar >= cl->read_goal) { /* We're done */ 
     152        luaL_pushresult(&cl->inbuff); 
     153        cl->read_sofar -= cl->read_goal; 
     154        if(cl->read_sofar > 0) { /* We have to buffer this for next read */ 
     155          luaL_buffinit(cl->L, &cl->inbuff); 
     156          luaL_addlstring(&cl->inbuff, 
     157                          buff + remaining, 
     158                          cl->read_sofar); 
     159        } 
     160        args = 1; 
     161        break; 
     162      } 
     163    } 
     164    else if(cl->read_terminator) { 
     165      char *cp; 
     166      int remaining = len; 
     167      cp = strnstr(buff, cl->read_terminator, len); 
     168      if(cp) remaining = cp - buff + strlen(cl->read_terminator); 
     169      luaL_addlstring(&cl->inbuff, buff, MIN(len, remaining)); 
     170      cl->read_sofar += len; 
     171      if(cp) { 
     172        luaL_pushresult(&cl->inbuff); 
     173        cl->read_sofar = len - remaining; 
     174        if(cl->read_sofar > 0) { /* We have to buffer this for next read */ 
     175          luaL_buffinit(cl->L, &cl->inbuff); 
     176          luaL_addlstring(&cl->inbuff, buff + remaining, cl->read_sofar); 
     177        } 
     178        args = 1; 
     179        break; 
     180      } 
     181    } 
     182  } 
     183  if(len >= 0) { 
     184    /* We broke out, cause we read enough... */ 
     185  } 
     186  else if(len == -1 && errno == EAGAIN) { 
     187    return EVENTER_READ | EVENTER_EXCEPTION; 
     188  } 
     189  else { 
     190    lua_pushnil(cl->L); 
     191    args = 1; 
     192  } 
     193 alldone: 
     194  noit_lua_check_deregister_event(ci, e, 0); 
     195  *(cl->eptr) = eventer_alloc(); 
     196  memcpy(*cl->eptr, e, sizeof(*e)); 
     197  noit_lua_check_register_event(ci, *cl->eptr); 
     198  noit_lua_resume(ci, args); 
     199  return 0; 
     200
     201 
     202static int 
     203noit_lua_socket_read(lua_State *L) { 
     204  struct nl_slcl *cl; 
     205  noit_lua_check_info_t *ci; 
     206  eventer_t e, *eptr; 
     207 
     208  ci = get_ci(L); 
     209  assert(ci); 
     210 
     211  eptr = lua_touserdata(L, lua_upvalueindex(1)); 
     212  e = *eptr; 
     213  cl = e->closure; 
     214  if(cl->read_sofar == 0) luaL_buffinit(L, &cl->inbuff); 
     215  cl->read_goal = 0; 
     216  cl->read_terminator = NULL; 
     217 
     218  if(lua_isnumber(L, 1)) 
     219    cl->read_goal = lua_tointeger(L, 1); 
     220  else 
     221    cl->read_terminator = lua_tostring(L, 1); 
     222 
     223  e->callback = noit_lua_socket_read_complete; 
     224  e->mask = EVENTER_READ | EVENTER_EXCEPTION; 
     225  eventer_add(e); 
     226  return noit_lua_yield(ci, 0); 
     227
     228static int 
     229noit_eventer_index_func(lua_State *L) { 
     230  int n; 
     231  const char *k; 
     232  eventer_t *udata, e; 
     233  n = lua_gettop(L);    /* number of arguments */ 
     234  assert(n == 2); 
     235  if(!luaL_checkudata(L, 1, "eventer_t")) { 
     236    luaL_error(L, "metatable error, arg1 not a eventer_t!"); 
     237  } 
     238  udata = lua_touserdata(L, 1); 
     239  e = *udata; 
     240  if(!lua_isstring(L, 2)) { 
     241    luaL_error(L, "metatable error, arg2 not a string!"); 
     242  } 
     243  k = lua_tostring(L, 2); 
     244  switch(*k) { 
     245    case 'c': 
     246     if(!strcmp(k, "connect")) { 
     247       lua_pushlightuserdata(L, udata); 
     248       lua_pushcclosure(L, noit_lua_socket_connect, 1); 
     249       return 1; 
     250     } 
     251     break; 
     252    case 'r': 
     253     if(!strcmp(k, "read")) { 
     254       lua_pushlightuserdata(L, udata); 
     255       lua_pushcclosure(L, noit_lua_socket_read, 1); 
     256       return 1; 
     257     } 
     258     break; 
     259    default: 
     260      break; 
     261  } 
     262  luaL_error(L, "eventer_t no such element: %s", k); 
     263  return 0; 
     264
     265 
     266static eventer_t * 
     267noit_lua_event(lua_State *L, eventer_t e) { 
     268  eventer_t *addr; 
     269  addr = (eventer_t *)lua_newuserdata(L, sizeof(e)); 
     270  *addr = e; 
     271  if(luaL_newmetatable(L, "eventer_t") == 1) { 
     272    lua_pushcclosure(L, noit_eventer_index_func, 0); 
     273    lua_setfield(L, -2, "__index"); 
     274  } 
     275  lua_setmetatable(L, -2); 
     276  return addr; 
     277
     278 
     279static int 
     280nl_sleep_complete(eventer_t e, int mask, void *vcl, struct timeval *now) { 
    26281  noit_lua_check_info_t *ci; 
    27282  struct nl_slcl *cl = vcl; 
     
    31286  ci = get_ci(cl->L); 
    32287  assert(ci); 
    33   noit_lua_check_deregister_event(ci, e); 
     288  noit_lua_check_deregister_event(ci, e, 0); 
    34289 
    35290  sub_timeval(*now, cl->start, &diff); 
     
    59314  e = eventer_alloc(); 
    60315  e->mask = EVENTER_TIMER; 
    61   e->callback = nl_sleep_continue; 
     316  e->callback = nl_sleep_complete; 
    62317  e->closure = cl; 
    63318  memcpy(&e->whence, &cl->start, sizeof(cl->start)); 
     
    67322  noit_lua_check_register_event(ci, e); 
    68323  eventer_add(e); 
    69   return lua_yield(L, 0); 
    70 
     324  return noit_lua_yield(ci, 0); 
     325
     326 
     327static int 
     328nl_socket_tcp(lua_State *L, int family) { 
     329  struct nl_slcl *cl; 
     330  noit_lua_check_info_t *ci; 
     331  socklen_t on; 
     332  int fd; 
     333  eventer_t e; 
     334 
     335  fd = socket(family, SOCK_STREAM, 0); 
     336  if(fd < 0) { 
     337    lua_pushnil(L); 
     338    return 1; 
     339  } 
     340  on = 1; 
     341  if(ioctl(fd, FIONBIO, &on)) { 
     342    close(fd); 
     343    lua_pushnil(L); 
     344    return 1; 
     345  } 
     346 
     347  ci = get_ci(L); 
     348  assert(ci); 
     349 
     350  cl = calloc(1, sizeof(*cl)); 
     351  cl->L = L; 
     352 
     353  e = eventer_alloc(); 
     354  e->fd = fd; 
     355  e->mask = 0; 
     356  e->callback = NULL; 
     357  e->closure = cl; 
     358  cl->eptr = noit_lua_event(L, e); 
     359 
     360  noit_lua_check_register_event(ci, e); 
     361  return 1; 
     362
     363static int 
     364nl_socket(lua_State *L) { 
     365  return nl_socket_tcp(L, AF_INET); 
     366
     367static int 
     368nl_socket_ipv6(lua_State *L) { 
     369  return nl_socket_tcp(L, AF_INET6); 
     370
     371 
    71372static const luaL_Reg noitlib[] = { 
    72373  { "sleep", nl_sleep }, 
     374  { "socket", nl_socket }, 
     375  { "socket_ipv6", nl_socket_ipv6 }, 
    73376  { NULL, NULL } 
    74377}; 
  • src/modules/lua_noit.h

    r9974078 rdb9f2be  
    4242int luaopen_noit(lua_State *L); 
    4343noit_lua_check_info_t *get_ci(lua_State *L); 
     44int noit_lua_yield(noit_lua_check_info_t *ci, int nargs); 
    4445int noit_lua_resume(noit_lua_check_info_t *ci, int nargs); 
    4546void noit_lua_check_register_event(noit_lua_check_info_t *ci, eventer_t e); 
    46 void noit_lua_check_deregister_event(noit_lua_check_info_t *ci, eventer_t e); 
     47void noit_lua_check_deregister_event(noit_lua_check_info_t *ci, eventer_t e, 
     48                                     int tofree); 
    4749 
    4850#endif