Browse Source

add async dns for Socket

v0.7.4-release
Ryan 16 years ago
parent
commit
1542fc6a0b
  1. 2
      deps/libeio/xthread.h
  2. 141
      src/net.cc
  3. 5
      src/node.cc
  4. 4
      wscript

2
deps/libeio/xthread.h

@ -127,8 +127,10 @@ thread_create (thread_t *tid, void *(*proc)(void *), void *arg)
pthread_attr_init (&attr); pthread_attr_init (&attr);
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
/*
pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN < sizeof (long) * 4096 pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN < sizeof (long) * 4096
? sizeof (long) * 4096 : PTHREAD_STACK_MIN); ? sizeof (long) * 4096 : PTHREAD_STACK_MIN);
*/
#ifdef PTHREAD_SCOPE_PROCESS #ifdef PTHREAD_SCOPE_PROCESS
pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS); pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
#endif #endif

141
src/net.cc

@ -5,22 +5,28 @@
#include <oi_buf.h> #include <oi_buf.h>
#include <assert.h> #include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <netdb.h> #include <netdb.h>
#include <strings.h>
using namespace v8; using namespace v8;
static struct addrinfo tcp_hints = #define ON_CONNECT_SYMBOL String::NewSymbol("onConnect")
#define ON_READ_SYMBOL String::NewSymbol("onRead")
static const struct addrinfo tcp_hints =
/* ai_flags */ { AI_PASSIVE /* ai_flags */ { AI_PASSIVE
/* ai_family */ , AF_UNSPEC /* ai_family */ , AF_UNSPEC
/* ai_socktype */ , SOCK_STREAM /* ai_socktype */ , SOCK_STREAM
/* ai_protocol */ , 0 /* ai_protocol */ , 0
/* ai_addrlen */ , 0 /* ai_addrlen */ , 0
/* ai_addr */ , 0 /* ai_addr */ , NULL
/* ai_canonname */ , 0 /* ai_canonname */ , NULL
/* ai_next */ , 0 /* ai_next */ , NULL
}; };
@ -44,12 +50,18 @@ public:
static void OnClose (oi_socket *s); static void OnClose (oi_socket *s);
static void OnTimeout (oi_socket *s); static void OnTimeout (oi_socket *s);
char *host_;
char *port_;
private: private:
static int Resolve (eio_req *req);
static int AfterResolve (eio_req *req);
static Socket* Unwrap (Handle<Object> handle); static Socket* Unwrap (Handle<Object> handle);
static void MakeWeak (Persistent<Value> _, void *data); static void MakeWeak (Persistent<Value> _, void *data);
enum {UTF8, RAW} encoding_; enum {UTF8, RAW} encoding_;
oi_socket socket_; oi_socket socket_;
struct addrinfo *address_;
Persistent<Object> handle_; Persistent<Object> handle_;
}; };
@ -112,35 +124,81 @@ Socket::ConnectTCP (const Arguments& args)
Socket *socket = Socket::Unwrap(args.Holder()); Socket *socket = Socket::Unwrap(args.Holder());
String::AsciiValue port(args[0]); String::AsciiValue port(args[0]);
socket->port_ = strdup(*port);
char *host = NULL; char *host = NULL;
String::AsciiValue host_v(args[1]->ToString()); String::AsciiValue host_v(args[1]->ToString());
if(args[1]->IsString()) { if(args[1]->IsString()) {
host = *host_v; socket->host_ = strdup(*host_v);
} }
int r; if(args[2]->IsFunction()) {
socket->handle_->Set(ON_CONNECT_SYMBOL , args[2]);
/* FIXME Blocking DNS resolution. */
printf("resolving host: %s, port: %s\n", host, *port);
r = getaddrinfo (host, *port, &tcp_hints, &socket->address_);
if(r != 0) {
perror("getaddrinfo");
return Undefined();
} }
r = oi_socket_connect (&socket->socket_, socket->address_); /* For the moment I will do DNS lookups in the thread pool. This is
if(r != 0) { * sub-optimal and cannot handle massive numbers of requests but it is
perror("oi_socket_connect"); * quite portable.
return Undefined(); * In the future I will move to a system using adns or udns:
* http://lists.schmorp.de/pipermail/libev/2009q1/000632.html
*/
node_eio_warmup();
eio_req *req = eio_custom (Socket::Resolve, EIO_PRI_DEFAULT, Socket::AfterResolve, socket);
}
/* This function is executed in the thread pool. It cannot touch anything! */
int
Socket::Resolve (eio_req *req)
{
Socket *socket = static_cast<Socket*> (req->data);
struct addrinfo *address = NULL;
req->result = getaddrinfo(socket->host_, socket->port_, &tcp_hints, &address);
req->ptr2 = address;
free(socket->host_);
socket->host_ = NULL;
free(socket->port_);
socket->port_ = NULL;
return 0;
}
int
Socket::AfterResolve (eio_req *req)
{
Socket *socket = static_cast<Socket*> (req->data);
struct addrinfo *address = static_cast<struct addrinfo *>(req->ptr2);
int r = 0;
if (req->result == 0) {
r = oi_socket_connect (&socket->socket_, address);
} }
freeaddrinfo(address); // this was allocated in the thread pool
// no error. return.
if(r == 0 && req->result == 0) {
oi_socket_attach (&socket->socket_, node_loop()); oi_socket_attach (&socket->socket_, node_loop());
return 0;
}
freeaddrinfo(socket->address_); HandleScope scope;
socket->address_ = NULL; Handle<Value> onconnect_value = socket->handle_->Get(ON_READ_SYMBOL);
// TODO raise error if r != 0 if (!onconnect_value->IsFunction()) return 0;
Handle<Function> onconnect = Handle<Function>::Cast(onconnect_value);
return Undefined(); TryCatch try_catch;
const int argc = 1;
Local<Value> argv[argc];
argv[0] = Integer::New(r | req->result); // FIXME very stupid error code.
onconnect->Call(socket->handle_, argc, argv);
if(try_catch.HasCaught())
node_fatal_exception(try_catch);
return 0;
} }
Handle<Value> Handle<Value>
@ -176,12 +234,16 @@ Socket::Socket(Handle<Object> handle, double timeout)
handle_.MakeWeak (this, Socket::MakeWeak); handle_.MakeWeak (this, Socket::MakeWeak);
encoding_ = UTF8; encoding_ = UTF8;
host_ = NULL;
port_ = NULL;
} }
Socket::~Socket () Socket::~Socket ()
{ {
oi_socket_close(&socket_); oi_socket_close(&socket_);
oi_socket_detach(&socket_); oi_socket_detach(&socket_);
free(host_);
free(port_);
handle_.Dispose(); handle_.Dispose();
handle_.Clear(); // necessary? handle_.Clear(); // necessary?
} }
@ -230,14 +292,17 @@ Socket::OnConnect (oi_socket *s)
HandleScope scope; HandleScope scope;
Handle<Value> on_connect_value = socket->handle_->Get( String::NewSymbol("onConnect") ); Handle<Value> on_connect_value = socket->handle_->Get(ON_CONNECT_SYMBOL);
if (!on_connect_value->IsFunction()) if (!on_connect_value->IsFunction())
return; return;
Handle<Function> on_connect = Handle<Function>::Cast(on_connect_value); Handle<Function> on_connect = Handle<Function>::Cast(on_connect_value);
TryCatch try_catch; TryCatch try_catch;
const int argc = 1;
Local<Value> argv[argc];
argv[0] = Integer::New(0);
Handle<Value> r = on_connect->Call(socket->handle_, 0, NULL); Handle<Value> r = on_connect->Call(socket->handle_, argc, argv);
if(try_catch.HasCaught()) if(try_catch.HasCaught())
node_fatal_exception(try_catch); node_fatal_exception(try_catch);
@ -249,7 +314,7 @@ Socket::OnRead (oi_socket *s, const void *buf, size_t count)
Socket *socket = static_cast<Socket*> (s->data); Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Handle<Value> onread_value = socket->handle_->Get( String::NewSymbol("onRead") ); Handle<Value> onread_value = socket->handle_->Get(ON_READ_SYMBOL);
if (!onread_value->IsFunction()) return; if (!onread_value->IsFunction()) return;
Handle<Function> onread = Handle<Function>::Cast(onread_value); Handle<Function> onread = Handle<Function>::Cast(onread_value);
@ -306,13 +371,13 @@ Socket::OnDrain (oi_socket *s)
Socket *socket = static_cast<Socket*> (s->data); Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Handle<Value> onclose_value = socket->handle_->Get( String::NewSymbol("onDrain") ); Handle<Value> ondrain_value = socket->handle_->Get( String::NewSymbol("onDrain") );
if (!onclose_value->IsFunction()) return; if (!ondrain_value->IsFunction()) return;
Handle<Function> onclose = Handle<Function>::Cast(onclose_value); Handle<Function> ondrain = Handle<Function>::Cast(ondrain_value);
TryCatch try_catch; TryCatch try_catch;
Handle<Value> r = onclose->Call(socket->handle_, 0, NULL); Handle<Value> r = ondrain->Call(socket->handle_, 0, NULL);
if(try_catch.HasCaught()) if(try_catch.HasCaught())
node_fatal_exception(try_catch); node_fatal_exception(try_catch);
@ -325,13 +390,13 @@ Socket::OnError (oi_socket *s, oi_error e)
Socket *socket = static_cast<Socket*> (s->data); Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Handle<Value> onclose_value = socket->handle_->Get( String::NewSymbol("onError") ); Handle<Value> onerror_value = socket->handle_->Get( String::NewSymbol("onError") );
if (!onclose_value->IsFunction()) return; if (!onerror_value->IsFunction()) return;
Handle<Function> onclose = Handle<Function>::Cast(onclose_value); Handle<Function> onerror = Handle<Function>::Cast(onerror_value);
TryCatch try_catch; TryCatch try_catch;
Handle<Value> r = onclose->Call(socket->handle_, 0, NULL); Handle<Value> r = onerror->Call(socket->handle_, 0, NULL);
if(try_catch.HasCaught()) if(try_catch.HasCaught())
node_fatal_exception(try_catch); node_fatal_exception(try_catch);
@ -343,13 +408,13 @@ Socket::OnTimeout (oi_socket *s)
Socket *socket = static_cast<Socket*> (s->data); Socket *socket = static_cast<Socket*> (s->data);
HandleScope scope; HandleScope scope;
Handle<Value> onclose_value = socket->handle_->Get( String::NewSymbol("onTimeout") ); Handle<Value> ontimeout_value = socket->handle_->Get( String::NewSymbol("onTimeout") );
if (!onclose_value->IsFunction()) return; if (!ontimeout_value->IsFunction()) return;
Handle<Function> onclose = Handle<Function>::Cast(onclose_value); Handle<Function> ontimeout = Handle<Function>::Cast(ontimeout_value);
TryCatch try_catch; TryCatch try_catch;
Handle<Value> r = onclose->Call(socket->handle_, 0, NULL); Handle<Value> r = ontimeout->Call(socket->handle_, 0, NULL);
if(try_catch.HasCaught()) if(try_catch.HasCaught())
node_fatal_exception(try_catch); node_fatal_exception(try_catch);

5
src/node.cc

@ -156,11 +156,6 @@ thread_pool_want_poll (void)
ev_async_send(EV_DEFAULT_ &thread_pool_watcher); ev_async_send(EV_DEFAULT_ &thread_pool_watcher);
} }
static void
thread_pool_done_poll (void)
{
}
void void
node_eio_warmup (void) node_eio_warmup (void)
{ {

4
wscript

@ -63,8 +63,8 @@ def configure(conf):
# Configure default variant # Configure default variant
conf.setenv('default') conf.setenv('default')
conf.env.append_value('CCFLAGS', ['-DNDEBUG', '-O2']) conf.env.append_value('CCFLAGS', ['-DNDEBUG', '-O2', '-g'])
conf.env.append_value('CXXFLAGS', ['-DNDEBUG', '-O2']) conf.env.append_value('CXXFLAGS', ['-DNDEBUG', '-O2', '-g'])
conf.write_config_header("config.h") conf.write_config_header("config.h")
def build(bld): def build(bld):

Loading…
Cancel
Save