From 15d24d8002ae92734a478e592a01234dffd0df2b Mon Sep 17 00:00:00 2001 From: Ryan Date: Sat, 2 May 2009 16:34:24 +0200 Subject: [PATCH] Major refactor of network code Here I massively change both the external and internal API of the TCP sockets and servers. This change introduces the concept of a protocol object like is found in Twisted Python. I believe this allows for a much cleaner description of how a socket behaves. What was once a single object "client" or "connection" is now represented by two objects: a "connection" and a "protocol". Well - I don't want to ramble too much because neither API is yet public or documented. Look the diff of test/test-pingpong.js to see how things have changed. --- src/net.cc | 594 ++++++++++++++++++------------------------ src/net.h | 138 +++++++--- src/node.cc | 28 +- test/test-pingpong.js | 77 +++--- 4 files changed, 412 insertions(+), 425 deletions(-) diff --git a/src/net.cc b/src/net.cc index fe972a51da..80acd339e4 100644 --- a/src/net.cc +++ b/src/net.cc @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -12,9 +13,20 @@ using namespace v8; using namespace node; -#define ON_CONNECT_SYMBOL String::NewSymbol("onConnect") -#define ON_CONNECTION_SYMBOL String::NewSymbol("onConnection") -#define ON_READ_SYMBOL String::NewSymbol("onRead") +#define ON_RECEIVE_SYMBOL String::NewSymbol("onReceive") +#define ON_DISCONNECT_SYMBOL String::NewSymbol("onDisconnect") +#define ON_CONNECT_SYMBOL String::NewSymbol("onConnect") +#define ON_DRAIN_SYMBOL String::NewSymbol("onDrain") +#define ON_TIMEOUT_SYMBOL String::NewSymbol("onTimeout") +#define ON_ERROR_SYMBOL String::NewSymbol("onError") + +#define SEND_SYMBOL String::NewSymbol("send") +#define DISCONNECT_SYMBOL String::NewSymbol("disconnect") +#define CONNECT_SYMBOL String::NewSymbol("connect") +#define ENCODING_SYMBOL String::NewSymbol("encoding") +#define TIMEOUT_SYMBOL String::NewSymbol("timeout") + +#define PROTOCOL_SYMBOL String::NewSymbol("protocol") static const struct addrinfo tcp_hints = /* ai_flags */ { AI_PASSIVE @@ -27,325 +39,194 @@ static const struct addrinfo tcp_hints = /* ai_next */ , NULL }; +Persistent tcp_connection_constructor; -Server::Server (Handle handle, int backlog) - : ObjectWrap(handle) -{ - //HandleScope scope; - oi_server_init(&server_, backlog); - server_.on_connection = Server::OnConnection; -// server_.on_error = Server::OnError; - server_.data = this; -} - -Server::~Server () -{ - //HandleScope scope; - oi_server_close(&server_); - oi_server_detach(&server_); -} - -Handle -Server::New (const Arguments& args) +void +Connection::Initialize (v8::Handle target) { HandleScope scope; - int backlog = 1024; // default - if (args.Length() > 0 && args[0]->IsNumber()) - backlog = args[0]->IntegerValue(); + Local t = FunctionTemplate::New(Connection::v8New); + t->InstanceTemplate()->SetInternalFieldCount(1); + target->Set(String::NewSymbol("TCPConnection"), t->GetFunction()); - new Server(args.Holder(), backlog); + tcp_connection_constructor = Persistent::New(t->GetFunction()); - return args.This(); + NODE_SET_METHOD(t->InstanceTemplate(), "connect", Connection::v8Connect); + NODE_SET_METHOD(t->InstanceTemplate(), "disconnect", Connection::v8Disconnect); + NODE_SET_METHOD(t->InstanceTemplate(), "send", Connection::v8Send); } -Handle -Server::ListenTCP (const Arguments& args) +Local +Connection::NewInstance (Local protocol) { - if (args.Length() < 2) return Undefined(); HandleScope scope; + Handle argv[] = { protocol }; + Local instance = tcp_connection_constructor->NewInstance(1, argv); + return scope.Close(instance); +} - Server *server = NODE_UNWRAP(Server, args.Holder()); +Connection::Connection (Handle handle) + : ObjectWrap(handle) +{ + HandleScope scope; - String::AsciiValue port(args[0]); + Local protocol = GetProtocol(); - int callback_index = 1; - char *host = NULL; - if (args[1]->IsString()) { - callback_index = 2; - String::AsciiValue host_v(args[1]->ToString()); - if(args[1]->IsString()) host = *host_v; + encoding_ = RAW; + Local encoding_v = protocol->Get(ENCODING_SYMBOL); + if (encoding_v->IsString()) { + Local encoding_string = encoding_v->ToString(); + char buf[5]; // need enough room for "utf8" or "raw" + encoding_string->WriteAscii(buf, 0, 4); + buf[4] = '\0'; + if(strcasecmp(buf, "utf8") == 0) encoding_ = UTF8; } - // For servers call getaddrinfo inline. This is blocking but it shouldn't - // matter--ever. If someone actually complains then simply swap it out - // with a libeio call. - struct addrinfo *address = NULL; - int r = getaddrinfo(host, *port, &tcp_hints, &address); - if (r != 0) - return ThrowException(String::New("Error looking up hostname")); - - if (!args[callback_index]->IsFunction()) - return ThrowException(String::New("Must supply onConnection callback")); - - server->handle_->Set(ON_CONNECTION_SYMBOL, args[callback_index]); + double timeout = 0.0; // default + Local timeout_v = protocol->Get(TIMEOUT_SYMBOL); + if (encoding_v->IsInt32()) + timeout = timeout_v->Int32Value() / 1000.0; - r = oi_server_listen(&server->server_, address); - if (r != 0) - return ThrowException(String::New("Error listening on port")); - oi_server_attach(EV_DEFAULT_UC_ &server->server_); - - freeaddrinfo(address); - - return Undefined(); -} + host_ = NULL; + port_ = NULL; -Handle -Server::Close (const Arguments& args) -{ - HandleScope scope; - Server *server = NODE_UNWRAP(Server, args.Holder()); - oi_server_close(&server->server_); - return Undefined(); + oi_socket_init(&socket_, timeout); + socket_.on_connect = Connection::_OnConnect; + socket_.on_read = Connection::_OnReceive; + socket_.on_drain = Connection::_OnDrain; + socket_.on_error = Connection::_OnError; + socket_.on_close = Connection::_OnDisconnect; + socket_.on_timeout = Connection::_OnTimeout; + socket_.data = this; } -oi_socket* -Server::OnConnection (oi_server *s, struct sockaddr *remote_addr, socklen_t remote_addr_len) +Local +Connection::GetProtocol (void) { - Server *server = static_cast (s->data); HandleScope scope; - Socket *socket = Socket::NewConnection(60.0); - - Local callback_v = server->handle_->Get(ON_CONNECTION_SYMBOL); - if (!callback_v->IsFunction()) - return NULL; // produce error? - - Local callback = Local::Cast(callback_v); - const int argc = 1; - Local argv[argc]; - argv[0] = Local::New(socket->handle_); - callback->Call(server->handle_, argc, argv); + Local protocol_v = handle_->Get(PROTOCOL_SYMBOL); + if (protocol_v->IsObject()) { + Local protocol = protocol_v->ToObject(); + return scope.Close(protocol); + } - return &socket->socket_; + return Local(); } Handle -Socket::New(const Arguments& args) +Connection::v8New (const Arguments& args) { - if (args.Length() > 1) - return Undefined(); - HandleScope scope; - // Default options - double timeout = 60.0; // in seconds - enum {UTF8, RAW} encoding ; + if (args[0]->IsFunction() == false) + return ThrowException(String::New("Must pass a class as the first argument.")); - // Set options from argument. - if (args.Length() == 1 && args[0]->IsObject()) { - Local options = args[0]->ToObject(); - Local timeout_value = options->Get(String::NewSymbol("timeout")); - Local encoding_value = options->Get(String::NewSymbol("encoding")); + Handle protocol = Handle::Cast(args[0]); + Handle argv[] = { args.This() }; + Local protocol_instance = protocol->NewInstance(1, argv); + args.This()->Set(PROTOCOL_SYMBOL, protocol_instance); - if (timeout_value->IsNumber()) { - // timeout is specified in milliseconds like other time - // values in javascript - timeout = timeout_value->NumberValue() / 1000; - } - - if (encoding_value->IsString()) { - Local encoding_string = encoding_value->ToString(); - char buf[5]; // need enough room for "utf8" or "raw" - encoding_string->WriteAscii(buf, 0, 4); - buf[4] = '\0'; - if(strcasecmp(buf, "utf8") == 0) encoding = UTF8; - } - } - - new Socket(args.Holder(), timeout); + new Connection(args.This()); return args.This(); } -void -Socket::SetEncoding (Handle encoding_value) -{ - if (encoding_value->IsString()) { - HandleScope scope; - Local encoding_string = encoding_value->ToString(); - char buf[5]; // need enough room for "utf8" or "raw" - encoding_string->WriteAscii(buf, 0, 4); - buf[4] = '\0'; - if(strcasecmp(buf, "utf8") == 0) - encoding_ = UTF8; - else - encoding_ = RAW; - } -} - Handle -Socket::ConnectTCP (const Arguments& args) +Connection::v8Connect (const Arguments& args) { - if (args.Length() < 1) - return Undefined(); - HandleScope scope; - Socket *socket = NODE_UNWRAP(Socket, args.Holder()); + Connection *connection = NODE_UNWRAP(Connection, args.Holder()); - String::AsciiValue port(args[0]); - socket->port_ = strdup(*port); + if (args.Length() == 0 || args[0]->IsInt32() == false) + return ThrowException(String::New("Must specify a port.")); - assert(socket->host_ == NULL); - String::AsciiValue host_v(args[1]->ToString()); - if(args[1]->IsString()) { - socket->host_ = strdup(*host_v); - } + String::AsciiValue port_sv(args[0]->ToString()); + assert(connection->port_ == NULL); + connection->port_ = strdup(*port_sv); - if(args[2]->IsFunction()) { - socket->handle_->Set(ON_CONNECT_SYMBOL , args[2]); + assert(connection->host_ == NULL); + if (args.Length() > 1 && args[1]->IsString()) { + String::Utf8Value host_sv(args[1]->ToString()); + connection->host_ = strdup(*host_sv); } - - /* For the moment I will do DNS lookups in the thread pool. This is + + /* For the moment I will do DNS lookups in the eio thread pool. This is * sub-optimal and cannot handle massive numbers of requests but it is * quite portable. * In the future I will move to a system using adns or udns: * http://lists.schmorp.de/pipermail/libev/2009q1/000632.html */ eio_warmup(); - eio_req *req = eio_custom (Socket::Resolve, EIO_PRI_DEFAULT, Socket::AfterResolve, socket); - + eio_req *req = eio_custom( Connection::Resolve + , EIO_PRI_DEFAULT + , Connection::AfterResolve + , connection + ); return Undefined(); } -/* This function is executed in the thread pool. It cannot touch anything! */ int -Socket::Resolve (eio_req *req) +Connection::Resolve (eio_req *req) { - Socket *socket = static_cast (req->data); + Connection *connection = static_cast (req->data); struct addrinfo *address = NULL; - - req->result = getaddrinfo(socket->host_, socket->port_, &tcp_hints, &address); - + req->result = getaddrinfo(connection->host_, connection->port_, &tcp_hints, &address); req->ptr2 = address; - free(socket->host_); - socket->host_ = NULL; + free(connection->host_); + connection->host_ = NULL; - free(socket->port_); - socket->port_ = NULL; + free(connection->port_); + connection->port_ = NULL; return 0; } int -Socket::AfterResolve (eio_req *req) +Connection::AfterResolve (eio_req *req) { - Socket *socket = static_cast (req->data); + Connection *connection = static_cast (req->data); struct addrinfo *address = static_cast(req->ptr2); + req->ptr2 = NULL; int r = 0; if (req->result == 0) { - r = oi_socket_connect (&socket->socket_, address); + r = connection->Connect(address); } - if (address) - freeaddrinfo(address); + + if (address) freeaddrinfo(address); // no error. return. - if(r == 0 && req->result == 0) { - oi_socket_attach (EV_DEFAULT_UC_ &socket->socket_); + if (r == 0 && req->result == 0) { + oi_socket_attach (EV_DEFAULT_UC_ &connection->socket_); return 0; } - HandleScope scope; - Handle onconnect_value = socket->handle_->Get(ON_CONNECT_SYMBOL); - if (!onconnect_value->IsFunction()) return 0; - Handle onconnect = Handle::Cast(onconnect_value); - - TryCatch try_catch; - const int argc = 1; - Local argv[argc]; - argv[0] = Integer::New(r | req->result); // FIXME very stupid error code. - - onconnect->Call(socket->handle_, argc, argv); - if(try_catch.HasCaught()) - fatal_exception(try_catch); - - return 0; -} - -Handle -Socket::Close (const Arguments& args) -{ - HandleScope scope; - Socket *socket = NODE_UNWRAP(Socket, args.Holder()); - oi_socket_close(&socket->socket_); - return Undefined(); -} - -Socket::Socket(Handle handle, double timeout) - : ObjectWrap(handle) -{ - //HandleScope scope; - oi_socket_init(&socket_, timeout); - socket_.on_connect = Socket::OnConnect; - socket_.on_read = Socket::OnRead; -// socket_.on_drain = Socket::OnDrain; -// socket_.on_error = Socket::OnError; - socket_.on_close = Socket::OnClose; - socket_.on_timeout = Socket::OnTimeout; - socket_.data = this; - - encoding_ = UTF8; // default encoding. - host_ = NULL; - port_ = NULL; -} + // return error? -Socket* -Socket::NewConnection (double timeout) -{ - HandleScope scope; - - Local socket_handle = socket_template->GetFunction()->NewInstance(); - Socket *socket = new Socket(socket_handle, 60.0); - socket->handle_->Delete(String::NewSymbol("connectTCP")); - - return socket; -} - -Socket::~Socket () -{ - oi_socket_close(&socket_); - oi_socket_detach(&socket_); - free(host_); - free(port_); - - //HandleScope scope; - handle_->Delete(String::NewSymbol("write")); - handle_->Delete(String::NewSymbol("close")); + return r | req->result; } Handle -Socket::SetEncoding (const Arguments& args) +Connection::v8Disconnect (const Arguments& args) { HandleScope scope; - Socket *socket = NODE_UNWRAP(Socket, args.Holder()); - socket->SetEncoding(args[0]); + Connection *connection = NODE_UNWRAP(Connection, args.Holder()); + connection->Disconnect(); return Undefined(); } Handle -Socket::Write (const Arguments& args) +Connection::v8Send (const Arguments& args) { HandleScope scope; + Connection *connection = NODE_UNWRAP(Connection, args.Holder()); - Socket *socket = NODE_UNWRAP(Socket, args.Holder()); - - // TODO support a callback using buf->on_release - if (args[0] == Null()) { - oi_socket_write_eof(&socket->socket_); + oi_socket_write_eof(&connection->socket_); } else if (args[0]->IsString()) { // utf8 encoding @@ -353,191 +234,216 @@ Socket::Write (const Arguments& args) size_t length = s->Utf8Length(); oi_buf *buf = oi_buf_new2(length); s->WriteUtf8(buf->base, length); - oi_socket_write(&socket->socket_, buf); + connection->Send(buf); } else if (args[0]->IsArray()) { // raw encoding Handle array = Handle::Cast(args[0]); size_t length = array->Length(); oi_buf *buf = oi_buf_new2(length); - for (int i = 0; i < length; i++) { + for (size_t i = 0; i < length; i++) { Local int_value = array->Get(Integer::New(i)); buf->base[i] = int_value->IntegerValue(); } - oi_socket_write(&socket->socket_, buf); + connection->Send(buf); } else return ThrowException(String::New("Bad argument")); - return Undefined(); + return Undefined(); } -void -Socket::OnConnect (oi_socket *s) +void +Connection::OnReceive (const void *buf, size_t len) { - Socket *socket = static_cast (s->data); - HandleScope scope; - Handle on_connect_value = socket->handle_->Get(ON_CONNECT_SYMBOL); - if (!on_connect_value->IsFunction()) - return; - Handle on_connect = Handle::Cast(on_connect_value); - - TryCatch try_catch; - const int argc = 1; - Local argv[argc]; - argv[0] = Integer::New(0); - - Handle r = on_connect->Call(socket->handle_, argc, argv); + Local protocol = GetProtocol(); + Handle callback_v = protocol->Get(ON_RECEIVE_SYMBOL); + if (!callback_v->IsFunction()) return; - if(try_catch.HasCaught()) - fatal_exception(try_catch); -} - -void -Socket::OnRead (oi_socket *s, const void *buf, size_t count) -{ - Socket *socket = static_cast (s->data); - HandleScope scope; - - Handle onread_value = socket->handle_->Get(ON_READ_SYMBOL); - if (!onread_value->IsFunction()) return; - Handle onread = Handle::Cast(onread_value); + Handle callback = Handle::Cast(callback_v); const int argc = 1; Handle argv[argc]; - if(count) { - if(socket->encoding_ == UTF8) { + if(len) { + if(encoding_ == UTF8) { // utf8 encoding - Handle chunk = String::New((const char*)buf, count); + Handle chunk = String::New((const char*)buf, len); argv[0] = chunk; + } else { // raw encoding - Local array = Array::New(count); - for (int i = 0; i < count; i++) { + Local array = Array::New(len); + for (size_t i = 0; i < len; i++) { char val = static_cast(buf)[i]; array->Set(Integer::New(i), Integer::New(val)); } argv[0] = array; } } else { - argv[0] = Local::New(Null()); + argv[0] = Local::New(Null()); } TryCatch try_catch; + callback->Call(protocol, argc, argv); - Handle r = onread->Call(socket->handle_, argc, argv); - - if(try_catch.HasCaught()) - fatal_exception(try_catch); + if (try_catch.HasCaught()) + fatal_exception(try_catch); // XXX is this the right action to take? } void -Socket::OnClose (oi_socket *s) +Connection::OnError (oi_error e) { - Socket *socket = static_cast (s->data); - HandleScope scope; - - Handle onclose_value = socket->handle_->Get( String::NewSymbol("onClose") ); - if (!onclose_value->IsFunction()) { - delete socket; - return; - } - Handle onclose = Handle::Cast(onclose_value); +} - TryCatch try_catch; +#define DEFINE_SIMPLE_CALLBACK(name, symbol) \ +void name () \ +{ \ + HandleScope scope; \ + Local protocol = GetProtocol();\ + Local callback_v = protocol->Get(symbol); \ + if (!callback_v->IsFunction()) return; \ + Handle callback = Handle::Cast(callback_v); \ + callback->Call(protocol, 0, NULL); \ +} - Handle r = onclose->Call(socket->handle_, 0, NULL); +DEFINE_SIMPLE_CALLBACK(Connection::OnConnect, ON_CONNECT_SYMBOL) +DEFINE_SIMPLE_CALLBACK(Connection::OnDrain, ON_DRAIN_SYMBOL) +DEFINE_SIMPLE_CALLBACK(Connection::OnDisconnect, ON_DISCONNECT_SYMBOL) +DEFINE_SIMPLE_CALLBACK(Connection::OnTimeout, ON_TIMEOUT_SYMBOL) - if(try_catch.HasCaught()) - fatal_exception(try_catch); +void +Acceptor::Initialize (Handle target) +{ + HandleScope scope; - delete socket; + Local tcp_server_template = + FunctionTemplate::New(Acceptor::v8New); + tcp_server_template->InstanceTemplate()->SetInternalFieldCount(1); + target->Set(String::NewSymbol("TCPServer"), tcp_server_template->GetFunction()); + + NODE_SET_METHOD( tcp_server_template->InstanceTemplate() + , "listen" + , Acceptor::v8Listen + ); + NODE_SET_METHOD( tcp_server_template->InstanceTemplate() + , "close" + , Acceptor::v8Close + ); } -void -Socket::OnDrain (oi_socket *s) +Acceptor::Acceptor (Handle handle, Handle options) + : ObjectWrap(handle) { - Socket *socket = static_cast (s->data); HandleScope scope; - Handle ondrain_value = socket->handle_->Get( String::NewSymbol("onDrain") ); - if (!ondrain_value->IsFunction()) return; - Handle ondrain = Handle::Cast(ondrain_value); - - TryCatch try_catch; + int backlog = 1024; // default value + Local backlog_v = options->Get(String::NewSymbol("backlog")); + if (backlog_v->IsInt32()) { + backlog = backlog_v->IntegerValue(); + } - Handle r = ondrain->Call(socket->handle_, 0, NULL); + Local on_error_v = options->Get(ON_ERROR_SYMBOL); + if (on_error_v->IsFunction()) { + handle_->Set(ON_ERROR_SYMBOL, on_error_v); + } - if(try_catch.HasCaught()) - fatal_exception(try_catch); + oi_server_init(&server_, backlog); + server_.on_connection = Acceptor::_OnConnection; + server_.on_error = Acceptor::_OnError; + server_.data = this; } - -void -Socket::OnError (oi_socket *s, oi_error e) +Connection* +Acceptor::OnConnection (struct sockaddr *addr, socklen_t len) { - Socket *socket = static_cast (s->data); HandleScope scope; + + Local protocol_v = handle_->GetHiddenValue(PROTOCOL_SYMBOL); + if (!protocol_v->IsFunction()) { + Close(); + return NULL; + } + Local protocol = Local::Cast(protocol_v); - Handle onerror_value = socket->handle_->Get( String::NewSymbol("onError") ); - if (!onerror_value->IsFunction()) return; - Handle onerror = Handle::Cast(onerror_value); + Local connection_handle = Connection::NewInstance(protocol); - TryCatch try_catch; + Connection *connection = new Connection(connection_handle); + return connection; +} - Handle r = onerror->Call(socket->handle_, 0, NULL); +void +Acceptor::OnError (struct oi_error error) +{ + HandleScope scope; - if(try_catch.HasCaught()) - fatal_exception(try_catch); + Local callback_v = handle_->Get(String::NewSymbol("onError")); + if (!callback_v->IsFunction()) return; + Local callback = Local::Cast(callback_v); + callback->Call(handle_, 0, NULL); // TODO args } -void -Socket::OnTimeout (oi_socket *s) +Handle +Acceptor::v8New (const Arguments& args) { - Socket *socket = static_cast (s->data); HandleScope scope; - Handle ontimeout_value = socket->handle_->Get( String::NewSymbol("onTimeout") ); - if (!ontimeout_value->IsFunction()) return; - Handle ontimeout = Handle::Cast(ontimeout_value); + if (args.Length() < 1 || args[0]->IsFunction() == false) + return ThrowException(String::New("Must at give connection handler as the first argument")); - TryCatch try_catch; + /// set the handler + args.Holder()->SetHiddenValue(PROTOCOL_SYMBOL, args[0]); - Handle r = ontimeout->Call(socket->handle_, 0, NULL); + Local options; - if(try_catch.HasCaught()) - fatal_exception(try_catch); -} + if (args.Length() > 1 && args[1]->IsObject()) { + options = args[1]->ToObject(); + } else { + options = Object::New(); + } -void -Socket::Initialize (Handle target) -{ - HandleScope scope; + new Acceptor(args.Holder(), options); - Local template_local = FunctionTemplate::New(Socket::New); - socket_template = Persistent::New(template_local); - socket_template->InstanceTemplate()->SetInternalFieldCount(1); - target->Set(String::NewSymbol("Socket"), socket_template->GetFunction()); + Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder()); - NODE_SET_METHOD(socket_template->InstanceTemplate(), "connectTCP", Socket::ConnectTCP); - //NODE_SET_METHOD(socket_template->InstanceTemplate(), "connectUNIX", Socket::ConnectUNIX); - NODE_SET_METHOD(socket_template->InstanceTemplate(), "write", Socket::Write); - NODE_SET_METHOD(socket_template->InstanceTemplate(), "close", Socket::Close); - NODE_SET_METHOD(socket_template->InstanceTemplate(), "setEncoding", Socket::SetEncoding); + return args.This(); } -void -Server::Initialize (Handle target) +Handle +Acceptor::v8Listen (const Arguments& args) { + Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder()); + + if (args.Length() < 1) + return ThrowException(String::New("Must give at least a port as argument.")); + + char *host = NULL; + HandleScope scope; + String::AsciiValue port(args[0]->ToString()); + + if (args[1]->IsString()) { + String::Utf8Value host_sv(args[1]->ToString()); + host = *host_sv; + } - Local server_template = FunctionTemplate::New(Server::New); - server_template->InstanceTemplate()->SetInternalFieldCount(1); - target->Set(String::NewSymbol("Server"), server_template->GetFunction()); + // For servers call getaddrinfo inline. This is blocking but it shouldn't + // matter--ever. If someone actually complains then simply swap it out + // with a libeio call. + struct addrinfo *address = NULL; + int r = getaddrinfo(host, *port, &tcp_hints, &address); + if (r != 0) + return ThrowException(String::New(strerror(errno))); - NODE_SET_METHOD(server_template->InstanceTemplate(), "listenTCP", Server::ListenTCP); - NODE_SET_METHOD(server_template->InstanceTemplate(), "close", Server::Close); + acceptor->Listen(address); + return Undefined(); } +Handle +Acceptor::v8Close (const Arguments& args) +{ + Acceptor *acceptor = NODE_UNWRAP(Acceptor, args.Holder()); + acceptor->Close(); + return Undefined(); +} diff --git a/src/net.h b/src/net.h index f335b74a16..e1fdecd1c3 100644 --- a/src/net.h +++ b/src/net.h @@ -7,59 +7,127 @@ namespace node { -class Server : ObjectWrap { +class Connection : public ObjectWrap { public: - Server (v8::Handle handle, int backlog); - ~Server (); + static void Initialize (v8::Handle target); - static v8::Handle New (const v8::Arguments& args); - static v8::Handle ListenTCP (const v8::Arguments& args); - static v8::Handle Close (const v8::Arguments& args); + Connection (v8::Handle handle); + ~Connection () { + Disconnect(); + } - static void Initialize (v8::Handle target); -private: - static oi_socket* OnConnection (oi_server *, struct sockaddr *, socklen_t); - oi_server server_; -}; + int Connect (struct addrinfo *address) { + return oi_socket_connect (&socket_, address); + } -static v8::Persistent socket_template; + void Send (oi_buf *buf) { + oi_socket_write (&socket_, buf); + } -class Socket : ObjectWrap { -public: - Socket (v8::Handle handle, double timeout); - ~Socket (); - // also a constructor - static Socket* NewConnection (double timeout); + void Disconnect () { + oi_socket_close (&socket_); + } +protected: + static v8::Handle v8New (const v8::Arguments& args); + static v8::Handle v8Connect (const v8::Arguments& args); + static v8::Handle v8Send (const v8::Arguments& args); + static v8::Handle v8Disconnect (const v8::Arguments& args); - void SetEncoding (v8::Handle); - void SetTimeout (double); + void OnConnect (void); + void OnReceive (const void *buf, size_t len); + void OnDrain (void); + void OnDisconnect (void); + void OnError (oi_error e); + void OnTimeout (void); - static v8::Handle New (const v8::Arguments& args); - static v8::Handle Write (const v8::Arguments& args); - static v8::Handle Close (const v8::Arguments& args); - static v8::Handle ConnectTCP (const v8::Arguments& args); - static v8::Handle SetEncoding (const v8::Arguments& args); + v8::Local GetProtocol (void); + static v8::Local NewInstance (v8::Local protocol); - static void Initialize (v8::Handle target); private: - static void OnConnect (oi_socket *socket); - static void OnRead (oi_socket *s, const void *buf, size_t count); - static void OnDrain (oi_socket *s); - static void OnError (oi_socket *s, oi_error e); - static void OnClose (oi_socket *s); - static void OnTimeout (oi_socket *s); + static void _OnConnect (oi_socket *s) { + Connection *connection = static_cast (s->data); + connection->OnConnect(); + } + + static void _OnReceive (oi_socket *s, const void *buf, size_t len) { + Connection *connection = static_cast (s->data); + connection->OnReceive(buf, len); + } + + static void _OnDrain (oi_socket *s) { + Connection *connection = static_cast (s->data); + connection->OnDrain(); + } + + static void _OnError (oi_socket *s, oi_error e) { + Connection *connection = static_cast (s->data); + connection->OnError(e); + } + + static void _OnDisconnect (oi_socket *s) { + Connection *connection = static_cast (s->data); + connection->OnDisconnect(); + } + + static void _OnTimeout (oi_socket *s) { + Connection *connection = static_cast (s->data); + connection->OnTimeout(); + } static int Resolve (eio_req *req); static int AfterResolve (eio_req *req); enum encoding encoding_; - oi_socket socket_; - char *host_; char *port_; + oi_socket socket_; + + friend class Acceptor; +}; - friend class Server; +class Acceptor : public ObjectWrap { +public: + static void Initialize (v8::Handle target); + + Acceptor (v8::Handle handle, v8::Handle options); + ~Acceptor () { Close(); } + + int Listen (struct addrinfo *address) { + int r = oi_server_listen (&server_, address); + if(r != 0) return r; + oi_server_attach (EV_DEFAULT_ &server_); + return 0; + } + + void Close ( ) { + oi_server_close (&server_); + } + +protected: + Connection* OnConnection (struct sockaddr *addr, socklen_t len); + void OnError (struct oi_error error); + + static v8::Handle v8New (const v8::Arguments& args); + static v8::Handle v8Listen (const v8::Arguments& args); + static v8::Handle v8Close (const v8::Arguments& args); + +private: + static oi_socket* _OnConnection (oi_server *s, struct sockaddr *addr, socklen_t len) { + Acceptor *acceptor = static_cast (s->data); + Connection *connection = acceptor->OnConnection (addr, len); + if (connection) + return &connection->socket_; + else + return NULL; + } + + static void _OnError (oi_server *s, struct oi_error error) { + Acceptor *acceptor = static_cast (s->data); + acceptor->OnError (error); + } + + oi_server server_; }; } // namespace node diff --git a/src/node.cc b/src/node.cc index 9f425394bd..b328737a1b 100644 --- a/src/node.cc +++ b/src/node.cc @@ -20,23 +20,26 @@ using namespace std; static int exit_code = 0; +ObjectWrap::~ObjectWrap ( ) +{ + if (!handle_.IsEmpty()) { + handle_->SetInternalField(0, Undefined()); + handle_.Dispose(); + handle_.Clear(); + } +} + ObjectWrap::ObjectWrap (Handle handle) { - v8::HandleScope scope; - handle_ = v8::Persistent::New(handle); + // TODO throw exception if it's already set + HandleScope scope; + handle_ = Persistent::New(handle); - v8::Handle external = v8::External::New(this); + Handle external = External::New(this); handle_->SetInternalField(0, external); handle_.MakeWeak(this, ObjectWrap::MakeWeak); } -ObjectWrap::~ObjectWrap ( ) -{ - handle_->SetInternalField(0, Undefined()); - handle_.Dispose(); - handle_.Clear(); -} - void* ObjectWrap::Unwrap (v8::Handle handle) { @@ -56,7 +59,6 @@ ObjectWrap::MakeWeak (Persistent _, void *data) - // Extracts a C string from a V8 Utf8Value. const char* ToCString(const v8::String::Utf8Value& value) @@ -246,8 +248,8 @@ main (int argc, char *argv[]) // BUILT-IN MODULES - Socket::Initialize(g); - Server::Initialize(g); + Acceptor::Initialize(g); + Connection::Initialize(g); node::Init_timer(g); node::Init_file(g); node::Init_http(g); diff --git a/test/test-pingpong.js b/test/test-pingpong.js index 1c53f1f57e..727ca89897 100644 --- a/test/test-pingpong.js +++ b/test/test-pingpong.js @@ -1,52 +1,63 @@ include("mjsunit"); + +var port = 12123; var N = 100; -function onLoad() { - server = new Server(1024); - var count = 0; - server.listenTCP(12123, function (connection) { - puts("got connection."); - connection.onRead = function (data) { - assertTrue(count <= N); - if (data === null) { - server.close(); - connection.close(); - return; - } - stdout.print ("-"); - if (/QUIT/.exec(data)) { - server.close(); - connection.close(); - } else if (/PING/.exec(data)) { - connection.write("PONG"); - } +var count = 0; + +var server; + +function Ponger (socket) { + this.encoding = "UTF8"; + + this.onConnect = function () { + puts("got socket."); + }; + + this.onReceive = function (data) { + assertTrue(count <= N); + stdout.print ("-"); + if (/QUIT/.exec(data)) { + socket.disconnect(); + //server.close(); + } else if (/PING/.exec(data)) { + socket.send("PONG"); } - }); + }; +} + +function Pinger (socket) { + this.encoding = "UTF8"; - socket = new Socket; - socket.onRead = function (data) { - stdout.print ("."); + this.onConnect = function () { + socket.send("PING"); + }; + + this.onReceive = function (data) { + stdout.print("."); assertEquals("PONG", data); setTimeout(function() { count += 1; if (count < N) { - socket.write("PING"); + socket.send("PING"); } else { - stdout.write ("\n"); - socket.write("QUIT\n"); - socket.close(); + stdout.write("\n"); + socket.send("QUIT\n"); + socket.disconnect(); } }, 10); }; - socket.onClose = function () { + this.onDisconnect = function () { puts("socket close."); assertEquals(N, count); + server.close(); }; +} - socket.connectTCP(12123, "127.0.0.1", function (status) { - if(status != 0) - exit(1); +function onLoad() { + server = new TCPServer(Ponger); + server.listen(port); - socket.write("PING"); - }); + var client = new TCPConnection(Pinger); + client.connect(port); }